Python Flask 框架
..............
數據庫鏈接池
pip3 install pymysql dbutils
簡單實現
'''
@Date : 2020-11-12 20:02:49
@LastEditors : Pineapple
@LastEditTime : 2020-11-13 21:01:53
@FilePath : /database_pool/連接池.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from threading import Thread
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql, # 指定創建連接的包
maxconnections=6, # 最大連接數
mincached=2, # 初始鏈接數
blocking=True, # 阻塞時是否等待
ping=0, # 測試連接是否正常, 不同情況的值不同
# 連接mysql的必備參數
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def task(num):
# 去連接池獲取連接
conn = POOL.connection()
cursor = conn.cursor()
# cursor.execute('select * from job51')
cursor.execute('select sleep(3)')
result = cursor.fetchall()
cursor.close()
# 將連接放回到連接池
conn.close()
print(num, '----------->', result)
for i in range(40):
t = Thread(target=task, args=(i,))
t.start()
創建POOL對象時, 連接數為零, 只是創建好了一個容量為6的空池子.
按照不同的需求, 參數ping可以指定為0, 1, 2, 4, 7
源碼注釋是這樣的:
ping: determines when the connection should be checked with ping()
(0 = None = never, 1 = default = whenever fetched from the pool,
2 = when a cursor is created, 4 = when a query is executed,
7 = always, and all other bit combinations of these values)
實現相關查詢功能
基於函數實現sqlhelper
'''
@Date : 2020-11-12 20:59:10
@LastEditors : Pineapple
@LastEditTime : 2020-11-12 21:09:27
@FilePath : /flask_test/database_pool/sqlhelper.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def fetchall(sql, *args):
"""獲取所有數據"""
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
def fetchone(sql, *args):
"""獲取一條數據"""
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
cursor.close()
conn.close()
return result
pymysql的execute方法會將args添加到sql語句中, 所以在編寫函數的時候可以使用*args打包參數的功能, 打包成一個元組傳入execute方法
這樣就簡單的實現了常用的 獲取全部數據fetchall方法, 和獲取一條數據fetchone方法
編寫pool_test.py 來測試一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-13 21:33:32
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from job51 where name=%s', '前端開發工程師'))
return 'index'
@app.route('/order')
def order():
return 'order'
if __name__ == "__main__":
app.run(debug=True)
基於類實現sqlhelper
既然設計了數據庫連接池, 所以我們希望全局只有一個連接池, 那么這個類必須是一個單例模式, 好在Python的類很輕松就能實現這種功能, 在導包的時候會生成.pyc Python字節碼文件, 之后再次使用就會執行這個.pyc 字節碼文件. 所以如果在同一個問價中, 我們可以通過導入模塊的方式輕松的實現一個單例類.
# s1.py 文件中
class Foo(object):
def test(self):
print("123")
v = Foo()
# v是Foo的實例
------
# s2.py 文件中
from s1 import v as v1
print(v1,id(v1)) #<s1.Foo object at 0x0000000002221710> 35788560
from s1 import v as v2
print(v1,id(v2)) #<s1.Foo object at 0x0000000002221710> 35788560
# 兩個的內存地址是一樣的
# 文件加載的時候,第一次導入后,再次導入時不會再重新加載。
Python幫了我們這么多, 我們就可以專心的設計sqlhelper類了.
'''
@Date : 2020-11-13 16:46:20
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:10:00
@FilePath : /database_pool/sqlhelper2.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn, cursor
def close(self, conn, cursor):
cursor.close()
conn.close()
def fetchall(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn, cursor)
return result
def fetchone(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
db = SqlHelper()
編寫類和編寫函數一樣簡單, 新增的open和close方法會實現數據庫的連接和關閉, 不僅去掉了重復的代碼, 在使用sqlhelper的時候輕松調用open和close並在中間加上其他的功能.
編寫sqlhelper來測試一下:
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:47:20
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper2 import db
app = Flask(__name__)
@app.route('/login')
def login():
print(db.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(db.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
conn, cursor = db.open()
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
cursor.execute(sql, (author, txt, tags))
conn.commit()
db.close(conn, cursor)
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
在login和index函數里還是調用了sqlhelper已經寫好的fetchall fetchone方法, 在order中通過調用sqlhelper的open和close方法,在其中間實現了插入的功能, 這樣的sqlhelper不僅用起來方便, 而且拓展性強
上下文管理
在Python中使用with關鍵字實現上下文管理器
class Foo:
def __enter__(self):
return 123
def __exit__(self, exc_type, exc_val, exc_tb):
pass
foo = Foo()
with foo as f:
print(f)
在使用with關鍵字時會調用類的 __enter__方法, 此方法返回的內容就是as后的對象f, 在退出時會調用類的 __exit__方法
我們最熟悉的文件操作, open()方法就是這樣實現的, 若不使用上下文管理器, 每打開一個文件都要調用此文件對象的close方法進行關閉
數據庫的連接和關閉操作也可以使用上下文管理器的方式
'''
@Date : 2020-11-14 10:14:38
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:36:33
@FilePath : /database_pool/sqlhelper3.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True, # 阻塞時是否等待
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def __enter__(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
def fetchall(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchall()
return result
def fetchone(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchone()
return result
sqlhelper = SqlHelper()
用__enter__ 和__exit__方法實現open和close的操作
編寫pool_test.py 測試一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:29:44
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper3 import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
with sqlhelper as db:
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
db.cursor.execute(sql, (author, txt, tags))
db.conn.commit()
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
用上下文管理器實現了自定義數據插入的操作, 比用之前方便了很多
