Python Flask 框架
..............
数据库链接池
pip3 install pymysql dbutils
简单实现
'''
@Date : 2020-11-12 20:02:49
@LastEditors : Pineapple
@LastEditTime : 2020-11-13 21:01:53
@FilePath : /database_pool/连接池.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from threading import Thread
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql, # 指定创建连接的包
maxconnections=6, # 最大连接数
mincached=2, # 初始链接数
blocking=True, # 阻塞时是否等待
ping=0, # 测试连接是否正常, 不同情况的值不同
# 连接mysql的必备参数
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def task(num):
# 去连接池获取连接
conn = POOL.connection()
cursor = conn.cursor()
# cursor.execute('select * from job51')
cursor.execute('select sleep(3)')
result = cursor.fetchall()
cursor.close()
# 将连接放回到连接池
conn.close()
print(num, '----------->', result)
for i in range(40):
t = Thread(target=task, args=(i,))
t.start()
创建POOL对象时, 连接数为零, 只是创建好了一个容量为6的空池子.
按照不同的需求, 参数ping可以指定为0, 1, 2, 4, 7
源码注释是这样的:
ping: determines when the connection should be checked with ping()
(0 = None = never, 1 = default = whenever fetched from the pool,
2 = when a cursor is created, 4 = when a query is executed,
7 = always, and all other bit combinations of these values)
实现相关查询功能
基于函数实现sqlhelper
'''
@Date : 2020-11-12 20:59:10
@LastEditors : Pineapple
@LastEditTime : 2020-11-12 21:09:27
@FilePath : /flask_test/database_pool/sqlhelper.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def fetchall(sql, *args):
"""获取所有数据"""
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
def fetchone(sql, *args):
"""获取一条数据"""
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
cursor.close()
conn.close()
return result
pymysql的execute方法会将args添加到sql语句中, 所以在编写函数的时候可以使用*args打包参数的功能, 打包成一个元组传入execute方法
这样就简单的实现了常用的 获取全部数据fetchall
方法, 和获取一条数据fetchone
方法
编写pool_test.py 来测试一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-13 21:33:32
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from job51 where name=%s', '前端开发工程师'))
return 'index'
@app.route('/order')
def order():
return 'order'
if __name__ == "__main__":
app.run(debug=True)
基于类实现sqlhelper
既然设计了数据库连接池, 所以我们希望全局只有一个连接池, 那么这个类必须是一个单例模式, 好在Python的类很轻松就能实现这种功能, 在导包的时候会生成.pyc Python字节码文件, 之后再次使用就会执行这个.pyc 字节码文件. 所以如果在同一个问价中, 我们可以通过导入模块的方式轻松的实现一个单例类.
# s1.py 文件中
class Foo(object):
def test(self):
print("123")
v = Foo()
# v是Foo的实例
------
# s2.py 文件中
from s1 import v as v1
print(v1,id(v1)) #<s1.Foo object at 0x0000000002221710> 35788560
from s1 import v as v2
print(v1,id(v2)) #<s1.Foo object at 0x0000000002221710> 35788560
# 两个的内存地址是一样的
# 文件加载的时候,第一次导入后,再次导入时不会再重新加载。
Python帮了我们这么多, 我们就可以专心的设计sqlhelper类了.
'''
@Date : 2020-11-13 16:46:20
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:10:00
@FilePath : /database_pool/sqlhelper2.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn, cursor
def close(self, conn, cursor):
cursor.close()
conn.close()
def fetchall(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn, cursor)
return result
def fetchone(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
db = SqlHelper()
编写类和编写函数一样简单, 新增的open和close方法会实现数据库的连接和关闭, 不仅去掉了重复的代码, 在使用sqlhelper的时候轻松调用open和close并在中间加上其他的功能.
编写sqlhelper来测试一下:
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:47:20
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper2 import db
app = Flask(__name__)
@app.route('/login')
def login():
print(db.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(db.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
conn, cursor = db.open()
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
cursor.execute(sql, (author, txt, tags))
conn.commit()
db.close(conn, cursor)
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
在login和index函数里还是调用了sqlhelper已经写好的fetchall fetchone方法, 在order中通过调用sqlhelper的open和close方法,在其中间实现了插入的功能, 这样的sqlhelper不仅用起来方便, 而且拓展性强
上下文管理
在Python中使用with关键字实现上下文管理器
class Foo:
def __enter__(self):
return 123
def __exit__(self, exc_type, exc_val, exc_tb):
pass
foo = Foo()
with foo as f:
print(f)
在使用with关键字时会调用类的 __enter__
方法, 此方法返回的内容就是as后的对象f, 在退出时会调用类的 __exit__
方法
我们最熟悉的文件操作, open()方法就是这样实现的, 若不使用上下文管理器, 每打开一个文件都要调用此文件对象的close方法进行关闭
数据库的连接和关闭操作也可以使用上下文管理器的方式
'''
@Date : 2020-11-14 10:14:38
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:36:33
@FilePath : /database_pool/sqlhelper3.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True, # 阻塞时是否等待
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def __enter__(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
def fetchall(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchall()
return result
def fetchone(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchone()
return result
sqlhelper = SqlHelper()
用__enter__
和__exit__
方法实现open和close的操作
编写pool_test.py 测试一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:29:44
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper3 import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
with sqlhelper as db:
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
db.cursor.execute(sql, (author, txt, tags))
db.conn.commit()
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
用上下文管理器实现了自定义数据插入的操作, 比用之前方便了很多