1、python多線程
理論部分大部分參考他人,不過想寫簡單點,也就不夠全面,更詳細的可查閱參考鏈接的前三個。
1.1 python多線程
為什么要使用多線程?是為了提高程序運行效率,也希望一些耗時過長的操作(如數據庫訪問、網絡訪問、文件操作等)不會阻塞主線程。
但是由於GIL鎖,python的多線程並不能算作真正的多線程,GIL(Global Interpreter Lovk,全局解釋器)限制在任意時刻只能由一個線程在解釋器中運行。所以python多線程並不適合CPU密集型的程序(反而會因為線程切換耗費更多的時間),在I/O密集型程序中比較適用。
python線程不算作並行運行在多核cpu中,只能算作並發運行,並不能充分利用cpu資源。若是要實現真正的並行,則需要考慮multiprocessing--
1.2 threading模塊
在Python中,我們一般使用threading模塊來實現多線程操作。
threading模塊中包含了關於線程操作的豐富功能,包括:常用的線程函數、線程對象(Thread)、鎖對象(Lock)、遞歸鎖對象(RLock)、事件對象(Event)、條件變量對象(Condition)、信號量對象(Semaphore)、定時器對象(Timer)、柵欄對象(Barrier)等。
threading中常用的線程函數包含:
-
threading.currentThread(): 返回當前的線程變量。
-
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
-
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
其他相關的對象,這里僅涉及Thread、Lock和RLock,以及同步、線程安全的隊列Queue。
1.3 threading.Thread
可以使用threading.Thread來創建線程,有兩種方式:
-
直接創建
-
繼承創建
1.3.1 直接創建
可以直接向threading.Thread傳入可調用函數。
#!/usr/bin/pyhon #coding=utf-8 import time import random import threading def func(name): s = random.randint(1, 5) print(f'current thread is {name}, sleeping {s}s.') time.sleep(s) print(f'thread {name} is over') if __name__ == '__main__': for i in range(1, 5): t = threading.Thread(target=func, args=(i,)) t.start() print('Main Thread')
1.3.2 繼承創建
繼承threading.Thread,重寫run方法。
import time import random import threading class Func(threading.Thread): def __init__(self, name): super().__init__() self.name = name def run(self): s = random.randint(1, 5) print(f'current thread is {self.name}, sleeping {s}s.') time.sleep(s) print(f'thread {self.name} is over') if __name__ == '__main__': for i in range(1, 5): t = Func(str(i)) t.start() print('Main Thread')
1.3.3 Thread構造參數
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
下面是Thread的參數說明
-
group:默認為None(該參數是為了以后實現ThreadGroup類而保留的)
-
target:在run方法中調用的可調用對象,即需要開啟線程的可調用對象,比如函數或方法。
-
name:線程名稱,默認為“Thread-N”形式的名稱,N為較小的十進制數。
-
args:在參數target中傳入的可調用對象的參數元組,默認為空元組()。
-
kwargs:在參數target中傳入的可調用對象的關鍵字參數字典,默認為空字典{}。
-
daemon:默認為None,即繼承當前調用者線程(即開啟線程的線程,一般就是主線程)的守護模式屬性,如果不為None,則無論該線程是否為守護模式,都會被設置為“守護模式”。
1.3.4 Thread常用方法
-
start():開啟線程活動。它將使得run()方法在一個獨立的控制線程中被調用,需要注意的是同一個線程對象的start()方法只能被調用一次,如果調用多次,則會報RuntimeError錯誤。
-
run():此方法代表線程活動。
-
join(timeout=None):讓當前調用者線程(一般為主線程)等待,直到線程結束。
-
daemon:表示該線程是否為守護線程,True或者False。設置一個線程的daemon必須在線程的start()方法之前,否則會報RuntimeError錯誤。這個值默認繼承自創建它的線程,主線程默認是非守護線程,所以在主線程中創建的線程默認都是非守護線程的,即daemon=False。
1.3.5 守護線程
有一種線程,它是在后台運行的,它的任務就是為其他線程提供服務,這種線程被稱為“”后台線程(Daemon Thread),又稱為“守護線程”或“精靈線程”。python解釋器的垃圾回收線程就是典型的后台線程。
后台線程有一個特征,如果所有的前台線程都死亡了,那么后台線程會自動死亡。
主線程默認是前台線程,由前台線程創建的未設置daemon為True的線程也是前台線程(默認)。設置了daemon的線程是后台線程,由后台線程創建的子線程也默認是后台線程。
1.4 Lock和RLock
threading 模塊提供了 Lock 和 RLock 兩個類,它們都提供了如下兩個方法來加鎖和釋放鎖:
-
acquire(blocking=True, timeout=-1):請求對 Lock 或 RLock 加鎖,其中 timeout 參數指定加鎖多少秒。
-
release():釋放鎖。
Lock 和 RLock 的區別如下:
-
threading.Lock:它是一個基本的鎖對象,每次只能鎖定一次,其余的鎖請求,需等待鎖釋放后才能獲取。
-
threading.RLock:它代表可重入鎖(Reentrant Lock)。對於可重入鎖,在同一個線程中可以對它進行多次鎖定,也可以多次釋放。如果使用 RLock,那么 acquire() 和 release() 方法必須成對出現。如果調用了 n 次 acquire() 加鎖,則必須調用 n 次 release() 才能釋放鎖。
Lock 和RLock 的使用方法相似,但是常用的是RLock(可重入鎖),因為相比於Lock,Rlock在同一線程中函數嵌套調用同一個鎖鎖定區域的情況下,可以防止死鎖,具體例子如下。
import time import random import threading class My_Thread(threading.Thread): def __init__(self,lock): super().__init__() self.setDaemon(True) self.lock = lock def run(self): self.lock.acquire() print(self.name,'獲取鎖,並進入共享區域f()') self.g() self.lock.release() print(self.name,'退出共享區域f(),並釋放鎖') def g(self): self.lock.acquire() print(self.name,'獲取鎖,並進入共享區域g()') self.lock.release() print(self.name,'退出共享區域g(),並釋放鎖') if __name__ == '__main__': #lock = threading.Lock() lock = threading.RLock() t = My_Thread(lock) t.start() print('主線程結束')
結果如下所示,在一個線程中嵌套請求鎖資源,當使用Lock()時線程形成死鎖,並未返回正常的結果。而使用Rlock()會允許同一個線程進入已經持有鎖的區域不需要請求鎖、等待鎖,只是維持一個計數。在一個線程中維持好請求鎖和釋放鎖的對應關系,在該線程完成所有鎖定資源的操作后計數為0就可以釋放鎖,供其他線程使用。
##lock = threading.Lock() Thread-1 獲取鎖,並進入共享區域f() 主線程結束 ##lock = threading.RLock() Thread-1 獲取鎖,並進入共享區域f() Thread-1 獲取鎖,並進入共享區域g() Thread-1 退出共享區域g(),並釋放鎖 Thread-1 退出共享區域f(),並釋放鎖 主線程結束
為什么要使用守護線程?
如果代碼中形成死鎖會不會對電腦造成永久影響,在線等挺急的!
也不急,反正應該進程結束的時候所有線程都會結束,再不濟關個機也差不多了。
所以還是安全為上。
1.5 線程優先級隊列Queue
python的Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue、LIFO(后入先出)隊列LifoQueue,和優先級隊列PriorityQueue。
這些隊列都實現了鎖原語,能夠在多線程中直接使用,可以使用隊列來實現線程間的同步。
Queue 模塊中的常用方法:
-
Queue.qsize() 返回隊列的大小
-
Queue.empty() 如果隊列為空,返回True,反之False
-
Queue.full() 如果隊列滿了,返回True,反之False
-
Queue.full 與 maxsize 大小對應
-
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
-
Queue.get_nowait() 相當Queue.get(False)
-
Queue.put(item) 寫入隊列,timeout等待時間
-
Queue.put_nowait(item) 相當Queue.put(item, False)
-
Queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
-
Queue.join() 實際上意味着等到隊列為空,再執行別的操作
#!/usr/bin/python3 import queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print ("開啟線程:" + self.name) process_data(self.name, self.q) print ("退出線程:" + self.name) def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print ("%s processing %s" % (threadName, data)) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1 # 創建新線程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充隊列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待隊列清空 while not workQueue.empty(): pass # 通知線程是時候退出 exitFlag = 1 # 等待所有線程完成 for t in threads: t.join() print ("退出主線程")
1.6 python線程池
1.6.1 線程池
系統啟動一個新縣城的成本是很高的,因為它涉及與操作系統的交互。在這種情況下,使用線程池可以很好地提升性能,尤其是當程序中小創建大量生存期很短暫的線程時,更應該考慮使用線程池。
線程池在系統啟動時即創建大量空閑的線程,程序只要將一個函數提交給線程池,線程池就會啟動一個空閑的線程來執行它。當該函數執行結束后,該線程並不會死亡,而是再次返回線程池中編程空閑狀態,等待下一個函數。
此外,使用線程池可以有效地控制系統中並發線程的數量,當系統中包含有大量的並發線程時,會導致系統性能急劇下降,甚至導致python解釋器崩潰,而線程池的最大線程數參數可以控制系統中並發線程的數量不超過此數。
1.6.1 線程池的使用
在過去,我們可以使用第三方模塊threadpool來創建線程池,但是現在主流的使用線程池的模塊是python3中自帶模塊concurrent.futures模塊中的ThreadPoolExecutor。
以下是ThreadPoolExcutor的使用方法。
import time import random from concurrent.futures import ThreadPoolExecutor def func(name): s = random.randint(1, 5) print(f'current thread is {name}, sleeping {s}s.') time.sleep(s) print(f'thread {name} is over') if __name__ == '__main__': with ThreadPoolExecutor(max_workers=3) as t: for i in range(1, 6): t.submit(func, i)
2、python連接mysql數據庫
java連接mysql數據庫在:
python連接Oracle數據庫在:
2.1 環境配置-PyMySQL
PyMySQL 是在 Python3.x 版本中用於連接 MySQL 服務器的一個庫,Python2中則使用mysqldb。如果習慣使用mysqldb,可以使用下面的語句導入使用。這里僅討論PyMySQL 。
PyMySQL參考文檔:
import pymysql pymysql.install_as_MySQLdb()
PyMySQL安裝
pip3 install pymysql
疑惑:一開始我用的是pip3 install PyMySQL,在pip list里的名稱也是PyMySQL(安裝時使用兩種沒差別),但是如果使用import PyMySQL則會報沒有模塊,只能使用import pymysql,這是為什么?對python的模塊引入規則還比較陌生。
myslq數據庫配置,建表如下:
CREATE TABLE `b_table` ( `id` int(11) NOT NULL, `item` varchar(255) DEFAULT NULL, `time` varchar(255) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8
已有的數據量如下,id是遞增序列,item是隨意字符,time是當時時間串+id
select count(*) from b_table;--1000000
2.2 數據庫操作
2.2.1 數據庫連接對象
通過print(help(pymysql.connect))
命令或者print(help(pymysql.connections.Connection))
,可以看到pymysql的數據庫連接pymysql.connections.Connection對象,其通過調用pymysql.connect()來獲取,可傳入參數如下所示。
class Connection(builtins.object) | Connection(*, user=None, password='', host=None, database=None, unix_socket=None, port=0, charset='', sql_mode=None, read_default_file=None, conv=None, use_unicode=True, client_flag=0, cursorclass=<class 'pymysql.cursors.Cursor'>, init_command=None, connect_timeout=10, read_default_group=None, autocommit=False, local_infile=False, max_allowed_packet=16777216, defer_connect=False, auth_plugin_map=None, read_timeout=None, write_timeout=None, bind_address=None, binary_prefix=False, program_name=None, server_public_key=None, ssl=None, ssl_ca=None, ssl_cert=None, ssl_disabled=None, ssl_key=None, ssl_verify_cert=None, ssl_verify_identity=None, compress=None, named_pipe=None, passwd=None, db=None)
參數很多,但是主要關注的參數包含:
-
host:數據庫服務器所在的主機
-
user:登錄的用戶名
-
password:登錄用戶的密碼
-
database:要使用的數據庫,None不使用特定數據庫
-
port:訪問的端口,默認值是3306
-
charset:字符集
#!/usr/bin/python3 import pymysql # 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" ) # do somethings .... # 關閉數據庫連接 db.close()
常用的方法包含:
-
close():關閉數據庫
-
commit():提交事務
-
rollback():回滾事務。
-
cursor(cursor=None):創建光標對象,光標類型包含Cursor(默認)、SSCursor、DictCursor、SSDictCursor。
-
ping(reconnect=True):測試連接服務是否還活動,reconnect為True的話,當連接關閉則嘗試重連接,否則當連接關閉時就拋出異常。
-
select_db(db):設置數據庫
-
open:返回連接是否打開
-
show_warnings():發送"SHOW WARNINGS"的SQL命令。
-
autocommit(value):設置是否自動提交,默認為False。
2.2.2 光標對象
光標對象有四種
#1、 最常用的光標,是用於與數據庫交互的對象 class pymysql.cursors.Cursor(connection) #2、Unbuffered Cursor,主要用於返回大量數據的查詢,或用於通過慢速網絡連接到遠程服務器,無法向后滾動 class pymysql.cursors.SSCursor(connection) # 3、將結果作為字典返回的游標 class pymysql.cursors.DictCursor(connection) # 4、Unbuffered Cursor,將結果作為字典返回的游標 class pymysql.cursors.SSDictCursor(connection)
光標一般不直接創建,而是調用connections.Connection.cursor()創建。
def test_connect(user_name='root',password='123456',db='learn',url='localhost'): # 創建連接 conn = pymysql.connect(host=url,user=user_name,password=password,database=db) # 創建字典光標 cur = conn.cursor(cursor=pymysql.cursors.DictCursor) # 查詢數據 cur.execute('select b_table from hero limit 10') # 獲取全部查詢結果 rs = cur.fetchall() # 輸出結果 print(rs) # 關閉光標 cur.close() # 關閉連接 conn.close()
下面主要講一下pymysql.cursors.Cursor的方法:
-
colse():關閉光標只會耗盡所有剩余數據。
-
execute(query,args=None):運行語句,可傳入綁定變量。
-
executemany(query,args):針對一個語句格式運行多個綁定數據,args是tuple或者list格式的一連串數據。此方法提高了多行 INSERT 和 REPLACE 的性能。
-
fetchall():返回查詢的全部數據
-
fetchmany(size=None):返回指定數量的數據
-
fetchone():返回下一行數據
-
max_stmt_length=1024000:executemany()生成的最大語句大小。
-
mogridy(query,args=None):返回將通過調用 execute() 方法發送到數據庫的確切字符串。可用於檢測輸入是否正確。
2.2.3 查詢數據
查詢數據只能使用execute,獲取查詢結果的方法有三種fetchall()、fetchmany(size=None)和fetchone()。
def test_connect(user_name='root',password='123456',db='learn',url='localhost'): # 創建連接 conn = pymysql.connect(host=url,user=user_name,password=password,database=db) # 創建光標 cur = conn.cursor() # 進行查詢:fetchall應用 print('返回全部:fetchall') cur.execute('select * from b_table limit 10') rs = cur.fetchall() print(rs) # 進行查詢: fetchone 應用 print('逐個返回:fetchone') cur.execute('select * from b_table limit 10') while(1): rs = cur.fetchone() if rs == None: break print(rs) # 進行查詢: fetchmany 應用 print('返回部分:fetchmany') cur.execute('select * from b_table limit 10') while(1): rs = cur.fetchmany(3) if len(rs) == 0: break print(rs) # 關閉光標 cur.close() # 關閉連接 conn.close()
運行結果:
返回全部:fetchall ((0, 'aaa', '202108311525550'), (1, 'aaa', '202108311525551'), (2, 'aaa', '202108311525552'), (3, 'aaa', '202108311525553'), (4, 'aaa', '202108311525554'), (5, 'aaa', '202108311525555'), (6, 'aaa', '202108311525556'), (7, 'aaa', '202108311525557'), (8, 'aaa', '202108311525558'), (9, 'aaa', '202108311525559')) 逐個返回:fetchone (0, 'aaa', '202108311525550') (1, 'aaa', '202108311525551') (2, 'aaa', '202108311525552') (3, 'aaa', '202108311525553') (4, 'aaa', '202108311525554') (5, 'aaa', '202108311525555') (6, 'aaa', '202108311525556') (7, 'aaa', '202108311525557') (8, 'aaa', '202108311525558') (9, 'aaa', '202108311525559') 返回部分:fetchmany ((0, 'aaa', '202108311525550'), (1, 'aaa', '202108311525551'), (2, 'aaa', '202108311525552')) ((3, 'aaa', '202108311525553'), (4, 'aaa', '202108311525554'), (5, 'aaa', '202108311525555')) ((6, 'aaa', '202108311525556'), (7, 'aaa', '202108311525557'), (8, 'aaa', '202108311525558')) ((9, 'aaa', '202108311525559'),)
2.2.4 數據插入
數據的增刪改涉及事務的提交和回滾,pymysql的自動提交可以通過conn.autocommit(True)來進行設置,默認是False。當自動提交為False的時候需要在合適的位置手動調用conn.commit()和conn.rollback()。
pymysql的數據插入可通過Cursor.execute(sql),和Cursor.executemany(sql)來運行,Cursor.execute(sql)可直接運行拼接好的sql語句,但為了防止SQL注入,建議使用綁定變量。
樣例代碼如下,設置了自動提交為True,使用了綁定變量。
def test_insert(user_name='root',password='123456',db='learn',url='localhost'): # 創建連接 conn = pymysql.connect(host=url,user=user_name,password=password,database=db) # 設置自動提交,默認是Flase conn.autocommit(True) print(conn.get_autocommit()) # 創建光標 cur = conn.cursor(cursor=pymysql.cursors.DictCursor) # 查詢目標表最大id cur.execute('select max(id) max_id from b_table') id = cur.fetchone()['max_id'] id_t = id # 1--單個數據插入 execute cur_insert = conn.cursor() id_t = id_t+1 time1 = datetime.datetime.now().strftime('%Y%m%d%H%M%H')+str(id_t) # 綁定變量對象可以是元組,也可以是列表 #line = (id_t,'222',time1) line = [id_t,'222',time1] # 查看execute要運行的語句 # print(cur_insert.mogrify('insert into b_table(id,item,time) values(%s,%s,%s)',line)) # execute cur_insert.execute('insert into b_table(id,item,time) values(%s,%s,%s)',line) # 2---多個插入語句運行 executemany lines = [] for i in range(10): id_t = id_t+1 time1 = datetime.datetime.now().strftime('%Y%m%d%H%M%H')+str(id_t) line = (id_t,'222',time1) lines.append(line) # 查看execute要運行的語句 #print(cur_insert.mogrify('insert into b_table(id,item,time) values(%s,%s,%s)',lines[0])) # executemany cur_insert.executemany('insert into b_table(id,item,time) values(%s,%s,%s)',lines) # 查看結果 cur.execute('select * from b_table where id between %s and %s',[id+1,id_t]) rs = cur.fetchall() print("插入結果:") for r in rs: print(r) # 關閉光標 cur_insert.close() cur.close() # 關閉連接 conn.close()
2.2.5 數據更新
數據更新也可以用execute或者executemany。樣例代碼設置了自動提交的值為False,配置了拋出異常處理。
def test_update(user_name='root',password='123456',db='learn',url='localhost'): # 創建連接 conn = pymysql.connect(host=url,user=user_name,password=password,database=db) # 自動提交,默認是Flase print(conn.get_autocommit()) # 創建光標 cur = conn.cursor(cursor=pymysql.cursors.DictCursor) cur_update = conn.cursor() # 更新前的數據 cur.execute('select * from b_table where id<=11') print('更新前的數據:') rs = cur.fetchall() for r in rs: print(r) # 1--單個數據更新 execute try: sql = 'update b_table set item=%s where id=%s' line = ('00',0) cur_update.execute(sql,line) # 提交 conn.commit() except: print(cur_update.mogrify(sql,line),' 更新失敗') # 回滾 conn.rollback() # 2--多個數據更新 executemany # 需要更新的id列表 id_list = (1,2,3,4,5,6,7,8,9,9,10,11) item_list = ('11','22','33','44','55','66','77','88','99','999','1010','1111') lines = [] try: sql = 'update b_table set item=%s where id=%s' # 生成更新列表 for id,item in zip(id_list,item_list): lines.append((item,id)) print(lines) # 運行多個更新 cur_update.executemany(sql,lines) conn.commit() except: print(cur_update.mogrify(sql,lines[0]),' 更新失敗') conn.rollback() # 更新后的數據 cur.execute('select * from b_table where id<=11') print('更新后的數據:') rs = cur.fetchall() for r in rs: print(r) # 關閉光標 cur_update.close() cur.close() # 關閉連接 conn.close()
2.2.5 executemany與execute及執行效率討論
在寫cx_oracle的那一篇的批處理語句時,有提到過cx_oracle提供的獲取executemany運行結果的方法,因為executemany不會拋出異常。
而在pymysql中,的確也有這種情況:也就是說,executemany中有一條或者全部輸入報錯,並不會跳轉到異常處理(上面的異常處理語句是擺設,沒有用)。executemany會返回運行成功的語句數量,僅此而已。
我查看了下help(pymysql.cursors.Cursor)下並不包含相關方法來獲取錯誤信息。
我又使用help查看了一下executemany的定義,只是最后一句“這個方法提升了多行插入和替換的性能。否則它跟循環執行execute()差不多”。
我感覺不同方法運行同樣規模的數據庫語句語句運行的效率在一般情況下是差不多的,區別在於提交commi的時機。如果不進行提交的話,數據占據着內存,如果操作大量數據的時候頻繁提交,則會拉低執行效率,需要自己進行取舍。
所以我感覺executemany和execute的底層是一致的,在使用合適地方法調用execute運行循環的時候其執行效率與executemany是一致的。如果的確找不到executemany數據出錯時報錯的返回,應該會對兩種方式的使用進行取舍。
| executemany(self, query, args) | Run several data against one query | | :param query: query to execute on server | :param args: Sequence of sequences or mappings. It is used as parameter. | :return: Number of rows affected, if any. | | This method improves performance on multiple-row INSERT and | REPLACE. Otherwise it is equivalent to looping over args with | execute().
3、實踐:多線程讀寫數據庫
實現多線程對同一張表查詢數據並拼接結果,將結果寫入同一個文件內
3.1 多線程進行數據庫操作
一般提供的數據庫操作對象會包含連接對象和光標對象。在多線程進行操作的時候,需要考慮以下哪種方式比較合適:
-
多線程訪問同一個cursor
-
多線程訪問同一個連接connect
-
多線程創建不同的連接connect
第一種共享同一個cursor的時候還需要進行加鎖,否則會報錯,其實際上在數據庫訪問中還是進行的單線程訪問。第二種從一個連接創建多個cursor交給多線程使用,我查到的說法是並不推薦,好像會有約束性的問題?所以一般推薦第三種方法,即創建多個連接進行數據庫訪問,再延伸一下可以創建連接池方便使用。
3.2 表分頁處理
既然要分多個連接訪問數據庫,要怎么保證多線程查詢到的數據無重復並且能夠完整獲取?一般考慮使用分頁處理,直接從SQL語句中進行處理。mysql和oracle不同,支持limit語句。
select * from b_table limit 100,20
3.3 數據庫連接池
面對大量的web請求和插入與查詢請求,mysql連接會不穩定,出現錯誤’Lost connection to MySQL server during query ([Errno 104] Connection reset by peer)’。
而且原理跟線程池相似,多線程頻繁創建和關閉數據庫連接會出現許多不必要的開銷,所以維持一個連接池是一個很好的選擇。
DBUtils是Python的一個用於實現數據庫連接池的模塊。
使用pip install安裝模塊后導入使用。
PooledDB 提供線程間可共享的數據庫連接,把連接放回連接池而不是真正的關閉,可重用。
PooledDB(creator, # 使用鏈接數據庫的模塊 mincached=0, # 最少的空閑連接數 maxcached=0, # 最大的空閑鏈接 maxshared=0, #當連接數達到這個數時,新請求的連接會分享已經分配出去的連接 maxconnections=0,# 最大的連接數 blocking=False, #當這個為True時,查過最大連接數后新的請求會等待 maxusage=None, setsession=None, reset=True, failures=None, ping=1, *args, **kwargs)
3.4 多線程訪問同一個文件
之前考慮過多線程寫入不同文件后拼接。但好像,至少對於python來說這樣子多線程就無意義了。我這里對寫入行的前后並無要求,可以使用加鎖的方法進行文件追加寫入。好像也有方法指定寫入位置,只是沒有深入了解。
# 寫法一:並發寫入指定文件並加鎖:追加 def write1(self,text): self.lock.acquire() #加鎖 with open(self.write_file,'a+') as fo: fo.write(text+'\n') self.lock.release() # 寫法二(錯誤寫法):並發寫入指定文件並加鎖:追加 def write2(self,text): with open(self.write_file,'a+') as fo: self.lock.acquire() #加鎖 fo.write(text+'\n') self.lock.release()
上面的兩種寫法,我一開始采用的是第一種,結果文件中數據量對不上。我在排查問題的過程中,看到了有部分行只有半截,最終結果也是比預期結果少,所以換用了寫法一。
是否是因為方法二代碼中進行了打開文件后,就相當於我們已經進行了文件打開,即使加鎖等待,沒有馬上寫入。但其實在等待其他線程寫入完成並保存,該等待的線程拿到寫權限的那一刻,還是往舊的文件寫入,而不是在上一個寫入線程的文件基礎上追加,造成了內容丟失。
所以說,方法二在結果上無異於沒加鎖。
3.5 主線程獲取多線程返回數據:Queue的使用
python中多線程向主線程傳回數據的方式,可以通過隊列插入數據的方式。數據插入過程中應該是不需要加鎖的,在其介紹中說 這些隊列都實現了鎖原語,能夠在多線程中直接使用,可以使用隊列來實現線程間的同步。但是我還是加了鎖,因為前面文件加鎖寫順手了。
# 結果隊列 self.res = queue.Queue(10) # 隊列鎖 self.qlock = threading.Lock() # 將運行結果寫入隊列 def write_res(self,begin,num,c): res = '線程【{},{}】運行結束,寫入總數:{},結束時間:{}'.format(begin,num,c,datetime.datetime.now().strftime('%Y%m%d%H%M%S')) self.qlock.acquire() self.res.put(res) self.qlock.release()
3.6 代碼

import threading import pymysql from dbutils.pooled_db import PooledDB import datetime,time,math import queue #python 關於多進程與多線程且寫入同一文件情況 http://sunsunsir.cn/detail/6 class processing: def __init__(self,write_file,host='localhost',user_name='root',password='123456',db='learn',maxconnections=5,thread_num=5): # 創建數據庫連接池 self.pool = PooledDB(creator = pymysql, maxconnections=maxconnections,maxshared=maxconnections, host=host, user=user_name, passwd=password, db=db, port=3306, charset="utf8") # 線程數 self.thread_num = thread_num # 寫文件 self.write_file = write_file # 鎖 self.lock = threading.Lock() # 結果隊列 self.res = queue.Queue(10) # 隊列鎖 self.qlock = threading.Lock() # 每個線程運行:從數據庫讀取分頁數據,對每條數據進行加工,寫入同一個文件 # begin,num 分頁 def thread_doing(self,begin,num): conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute('select * from b_table limit %s,%s',[begin,num]) c = 0 while(1): rs = cursor.fetchone() if rs == None: break text = '[{},{}]id:{},item:{},time:{},nowtime:{}'.format(begin,num, rs['id'],rs['item'],rs['time'], datetime.datetime.now().strftime('%Y%m%d%H%M%S')) self.write(text) c = c+1 self.write_res(begin,num,c) cursor.close() conn.close() # 將連接放回連接池 # 並發寫入指定文件並加鎖:追加 def write(self,text): self.lock.acquire() #加鎖 with open(self.write_file,'a+') as fo: fo.write(text+'\n') self.lock.release() # 將運行結果寫入隊列 def write_res(self,begin,num,c): res = '線程【{},{}】運行結束,寫入總數:{},結束時間:{}'.format(begin,num,c,datetime.datetime.now().strftime('%Y%m%d%H%M%S')) self.qlock.acquire() self.res.put(res) self.qlock.release() def test(self): start_time = datetime.datetime.now() print('開始時間:',start_time.strftime('%Y%m%d%H%M%S')) # 查找表中全部數據量 conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute('select * from b_table limit 0,10') while(1): rs = cursor.fetchone() if rs==None: break print(rs) cursor.close() conn.close() end_time = datetime.datetime.now() print('{} 完成!耗時:{} '.format(end_time.strftime('%Y%m%d%H%M%S'),end_time-start_time)) def run(self): start_time = datetime.datetime.now() print('開始時間:',start_time.strftime('%Y%m%d%H%M%S')) # 查找表中全部數據量 conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute('select count(*) count from b_table') count = cursor.fetchone()['count'] cursor.close() conn.close() # 分頁,向上取整 page = math.ceil(count/self.thread_num) print('表數據量:{},線程數:{},分頁大小:{}'.format(count,self.thread_num,page)) # 清空文件 with open(self.write_file, 'w') as fo: fo.seek(0) fo.truncate() # 多線程 ths = [] # 創建線程 for i in range(self.thread_num): #print(page*i,',',page) ths.append(threading.Thread(target=self.thread_doing,args=(page*i,page,))) # 啟動線程 for i in range(self.thread_num): ths[i].start() print('等待中........') # 等待線程完成 for i in range(self.thread_num): ths[i].join() end_time = datetime.datetime.now() print('{} 完成!耗時:{} '.format(end_time.strftime('%Y%m%d%H%M%S'),end_time-start_time)) while not self.res.empty(): print(self.res.get()) if __name__=='__main__': p = processing('a.txt') #p.test() p.run()
3.7 結果
開始時間: 20211004141329 表數據量:1000057,線程數:5,分頁大小:200012 等待中........ 20211004141919 完成!耗時:0:05:50.015016 線程【800048,200012】運行結束,寫入總數:200009,結束時間:20211004141917 線程【0,200012】運行結束,寫入總數:200012,結束時間:20211004141918 線程【200012,200012】運行結束,寫入總數:200012,結束時間:20211004141918 線程【400024,200012】運行結束,寫入總數:200012,結束時間:20211004141919 線程【600036,200012】運行結束,寫入總數:200012,結束時間:20211004141919
4、總結
4.1 模塊導入的一些問題
就一些小問題。
在數據庫連接涉及兩個模塊PyMysql和DBUtils,這個兩個模塊在pip list和import模塊名的時候,名稱大小寫是不一致的,這個給使用過程中造成了些許不便。有說明是版本原因造成的。我想這個有機會再進行后續了解吧。
然后我又發現了一個奇怪的東西,僅僅記錄下來先吧。
import pymysql #from dbutils.pooled_db import PooledDB import dbutils if __name__=='__main__': print(dir(dbutils))
上面的語句注不注釋“from dbutils.pooled_db import PooledDB”這一句,結果是不一樣的。
## 注釋第二句 ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__'] ## 不注釋第二句 ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', 'pooled_db', 'steady_db']
因為不明就里,我覺得略顯奇怪,所以使用方法只能是“from dbutils.pooled_db import PooledDB”。
DBUtils下應該還有其他類型的數據庫連接池,后續有機會再看看。
4.2 [報錯]dbutils.pooled_db.TooManyConnections
不少方法給的其實都是mysql方面的解決方法,因為mysql這邊連接上限默認是100,但我這邊的設置肯定是沒有達到這個數的。
原因僅僅是因為線程中獲取連接的數據量大於數據庫連接池設置的最大連接數。可以調整大小。
參考