这篇文章主要介绍了Python实现自定义读写分离代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
思路
from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
class RoutingSession(SignallingSession):
def get_bind(self, mapper=None, clause=None):
state = get_state(self.app)
# 判断读写操作
if self._flushing: # 写操作 ,使用主数据库
print("写入数据")
return state.db.get_engine(self.app, bind='master')
else: # 读操作, 使用从数据库
print('读取数据')
return state.db.get_engine(self.app, bind='slave')
class RoutingSQLAlchemy(SQLAlchemy):
def create_session(self, options):
return orm.sessionmaker(class_=RoutingSession, db=self, **options)
app = Flask(__name__)
# 设置数据库的连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.134:3306/demo'
# 设置数据库的绑定地址
app.config['SQLALCHEMY_BINDS'] = {
'master': "mysql://root:mysql@192.168.105.134:3306/demo",
'slave': "mysql://root:mysql@192.168.105.134:8306/demo"
}
# 设置是否追踪数据库变化 一般不会开启, 影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 设置是否打印底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = False
# 创建数据库连接对象
db = RoutingSQLAlchemy(app)
# 用户表 一
class User(db.Model):
__tablename__ = 't_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
@app.route('/')
def index():
# 增加数据
user1 = User(name='zs')
db.session.add(user1)
db.session.commit()
# 查询数据
users = User.query.all()
print(users)
return "index"
if __name__ == '__main__':
# 删除所有继承自db.Model的表
db.drop_all()
# 创建所有继承自db.Model的表
db.create_all()
app.run(debug=True)
不太好,自动选择不能控制
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持python博客。
Powered By python教程网 鲁ICP备18013710号
python博客 - 小白学python最友好的网站!