本文实例讲述了flask 框架操作MySQL数据库。分享给大家供大家参考,具体如下:
一、创建数据库表格
"""
Created on 19-10-8
@requirement:Anaconda 4.3.0 (64-bit) Python3.6
@description:创建表格
"""
import pymysql
server = '127.0.0.1'
user = 'root'
password = 'password'
# 连接数据库
conn = pymysql.connect(server, user, password, database='information_collection') # 获取连接
cursor = conn.cursor() # 获取游标
# "**ENGINE=InnoDB DEFAULT CHARSET=utf8**"-创建表的过程中增加这条,中文就不是乱码
# 创建表
cursor.execute("""
CREATE TABLE if not exists user(
user_id INT NOT NULL auto_increment primary key,
user_name VARCHAR(100),
user_password VARCHAR(100),
user_nickname VARCHAR(100),
user_email VARCHAR(100)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8
""")
# 查询数据库表user内容
cursor.execute('SELECT * FROM user')
# 查看一行 多行:cursor.fetchall()
row = cursor.fetchone()
print(row)
# if row[0] is None:
# row0 = list(row)
# row0[0] = 0
# row = tuple(row0)
# # 插入数据,注:与sqlserver有些区别
cursor.execute("INSERT INTO user VALUES('%s','%s','%s','%s')" % ('xiaoming','qwe','ming','@163.com'))
# 提交数据,才会写入表格
conn.commit()
# 关闭游标关闭数据库
cursor.close()
conn.close()
二、flask操作mysql
"""
Created on 19-10-8
@requirement:Anaconda 4.3.0 (64-bit) Python3.6
@description:
"""
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, jsonify, request
import configparser
import os
app = Flask(__name__)
# 使用ConfigParser 首选需要初始化实例,并读取配置文件:
my_config = configparser.ConfigParser()
my_config.read('db.conf')
# 连接数据库information_collection
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or \
"mysql+pymysql://root:password@127.0.0.1:3306/information_collection"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
mydb = SQLAlchemy()
mydb.init_app(app)
# 用户模型
class User(mydb.Model):
user_id = mydb.Column(mydb.Integer, primary_key=True)
user_name = mydb.Column(mydb.String(60), nullable=False)
user_password = mydb.Column(mydb.String(30), nullable=False)
user_nickname = mydb.Column(mydb.String(50))
user_email = mydb.Column(mydb.String(30), nullable=False)
def __repr__(self):
return '<User %r>' % self.user_name
# 获取用户列表,所有数据
@app.route('/users', methods=['GET'])
def getUsers():
data = User.query.all()
datas = []
for user in data:
datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})
return jsonify(data=datas)
# 添加用户数据,一条一条添加
@app.route('/user', methods=['POST'])
def addUser():
user_name = request.form.get('user_name')
user_password = request.form.get('user_password')
user_nickname = request.form.get('user_nickname')
user_email = request.form.get('user_email')
user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
try:
mydb.session.add(user)
mydb.session.commit()
except:
mydb.session.rollback()
mydb.session.flush()
userId = user.user_id
if (user.user_id is None):
result = {'msg': '添加失败'}
return jsonify(data=result)
data = User.query.filter_by(user_id=userId).first()
result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
return jsonify(data=result)
# 获取单条数据
@app.route('/user/<int:userId>', methods=['GET'])
def getUser(userId):
user = User.query.filter_by(user_id=userId).first()
if (user is None):
result = {'msg': '找不到数据'}
else:
result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
return jsonify(data=result)
# 修改用户数据
@app.route('/user/<int:userId>', methods=['PATCH'])
def updateUser(userId):
user_name = request.form.get('user_name')
user_password = request.form.get('user_password')
user_nickname = request.form.get('user_nickname')
user_email = request.form.get('user_email')
try:
user = User.query.filter_by(user_id=userId).first()
if (user is None):
result = {'msg': '找不到要修改的记录'}
return jsonify(data=result)
else:
user.user_name = user_name
user.user_password = user_password
user.user_nickname = user_nickname
user.user_email = user_email
mydb.session.commit()
except:
mydb.session.rollback() # 回滚
mydb.session.flush() # 重置
userId = user.user_id
data = User.query.filter_by(user_id=userId).first()
result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
return jsonify(data=result)
# 删除用户数据
@app.route('/user/<int:userId>', methods=['DELETE'])
def deleteUser(userId):
User.query.filter_by(user_id=userId).delete()
mydb.session.commit()
return getUsers()
if __name__ == '__main__':
app.run()
三、返回数据的样式
{
"data": {
"user_email": "@126.com",
"user_id": 6,
"user_name": "xiaoli",
"user_nickname": "lili"
}
}
希望本文所述对大家基于flask框架的Python程序设计有所帮助。
标签:flask
Powered By python教程网 鲁ICP备18013710号
python博客 - 小白学python最友好的网站!