pip install DBUtils==1.3
pip install mysqlclient==2.0.1
import time
import threading
import MySQLdb
import queue
from MySQLdb.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
def mysql_connection():
host = 'host'
user = 'user'
port = 3306
password = 'pwd'
db = 'mysql'
charset = 'utf8'
limit_count = 3 # 最低預啟動數據庫連接數量
pool = PooledDB(MySQLdb, limit_count, maxconnections=15, host=host, user=user, port=port, passwd=password, db=db,
charset=charset,
use_unicode=True, cursorclass=DictCursor)
return pool
def tread_connection_db():
con = pool.connection()
cur = con.cursor()
sql = """
select * from ....
"""
cur.execute(sql)
result = cur.fetchall()
con.close()
return result
class MyThread(threading.Thread):
def __init__(self, func, args):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None
if __name__ == '__main__':
start = time.time()
# 創建線程連接池
pool = mysql_connection()
# 創建隊列,隊列的最大個數及限制線程個數
q = queue.Queue(maxsize=12)
# 測試數據,多線程查詢數據庫
for i in range(12):
# 創建線程並放入隊列中
# t = MyThread(target=tread_connection_db, args=(id,))
t = MyThread(tread_connection_db, args=(i,))
q.put(t)
# 隊列隊滿
if q.qsize() == 12:
# 用於記錄線程,便於終止線程
join_thread = []
# 從對列取出線程並開始線程,直到隊列為空
while q.empty() != True:
t = q.get()
join_thread.append(t)
t.start()
# 終止上一次隊滿時里面的所有線程
for t in join_thread:
t.join()
for t in join_thread:
print(t.get_result())
end = time.time() - start
print(end)