雜項之pymysql連接池
本節內容
- 本文的誕生
- 連接池及單例模式
- 多線程提升
- 協程提升
- 后記
1.本文的誕生
由於前幾天接觸了pymysql,在測試數據過程中,使用普通的pymysql插入100W條數據,消耗時間很漫長,實測990s也就是16.5分鍾左右才能插完,於是,腦海中誕生了一個想法,能不能造出一個連接池出來,提升數據呢?就像一根管道太小,那就多加幾根管道看效果如何呢?於是。。。前前后后折騰了將近一天時間,就有了本文的誕生。。。
2.連接池及單例模式
先說單例模式吧,為什么要在這使用單例模式呢?使用單例模式能夠節省資源。
其實單例模式沒有什么神秘的,簡單的單例模式實現其實就是在類里面定義一個變量,再定義一個類方法,這個類方法用來為調用者提供這個類的實例化對象。(ps:個人對單例模式的一點淺薄理解...)
那么連接池是怎么回事呢?原來使用pymysql創建一個conn對象的時候,就已經和mysql之間創建了一個tcp的長連接,只要不調用這個對象的close方法,這個長連接就不會斷開,這樣,我們創建了一組conn對象,並將這些conn對象放到隊列里面去,這個隊列現在就是一個連接池了。
現在,我們先用一個連接,往數據庫中插入100W條數據,下面是源碼:
1 import pymysql 2 import time 3 start=time.time() 4 conn = pymysql.connect(host="192.168.10.103",port=3306,user="root",passwd="123456",db="sql_example",charset="utf8") 5 conn.autocommit(True) # 設置自動commit 6 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 設置返回的結果集用字典來表示,默認是元祖 7 data=(("男",i,"張小凡%s" %i) for i in range(1000000)) # 偽造數據,data是個生成器 8 cursor.executemany("insert into tb1(gender,class_id,sname) values(%s,%s,%s)",data) # 可以使用executemany執行多條sql 9 # conn.commit() 10 cursor.close() 11 conn.close() 12 print("totol time:",time.time()-start)
執行結果為:
totol time: 978.7649309635162
3.多線程提升
使用多線程,在啟動時創建一組線程,每個線程去連接池里面獲取一個連接,然后插入數據,這樣將會大大提升執行sql的速度,下面是使用多線程實現的連接池源碼:

1 from gevent import monkey 2 monkey.patch_all() 3 4 import threading 5 6 import pymysql 7 from queue import Queue 8 import time 9 10 class Exec_db: 11 12 __v=None 13 14 def __init__(self,host=None,port=None,user=None,passwd=None,db=None,charset=None,maxconn=5): 15 self.host,self.port,self.user,self.passwd,self.db,self.charset=host,port,user,passwd,db,charset 16 self.maxconn=maxconn 17 self.pool=Queue(maxconn) 18 for i in range(maxconn): 19 try: 20 conn=pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset) 21 conn.autocommit(True) 22 # self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor) 23 self.pool.put(conn) 24 except Exception as e: 25 raise IOError(e) 26 27 @classmethod 28 def get_instance(cls,*args,**kwargs): 29 if cls.__v: 30 return cls.__v 31 else: 32 cls.__v=Exec_db(*args,**kwargs) 33 return cls.__v 34 35 def exec_sql(self,sql,operation=None): 36 """ 37 執行無返回結果集的sql,主要有insert update delete 38 """ 39 try: 40 conn=self.pool.get() 41 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 42 response=cursor.execute(sql,operation) if operation else cursor.execute(sql) 43 except Exception as e: 44 print(e) 45 cursor.close() 46 self.pool.put(conn) 47 return None 48 else: 49 cursor.close() 50 self.pool.put(conn) 51 return response 52 53 54 def exec_sql_feach(self,sql,operation=None): 55 """ 56 執行有返回結果集的sql,主要是select 57 """ 58 try: 59 conn=self.pool.get() 60 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 61 response=cursor.execute(sql,operation) if operation else cursor.execute(sql) 62 except Exception as e: 63 print(e) 64 cursor.close() 65 self.pool.put(conn) 66 return None,None 67 else: 68 data=cursor.fetchall() 69 cursor.close() 70 self.pool.put(conn) 71 return response,data 72 73 def exec_sql_many(self,sql,operation=None): 74 """ 75 執行多個sql,主要是insert into 多條數據的時候 76 """ 77 try: 78 conn=self.pool.get() 79 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 80 response=cursor.executemany(sql,operation) if operation else cursor.executemany(sql) 81 except Exception as e: 82 print(e) 83 cursor.close() 84 self.pool.put(conn) 85 else: 86 cursor.close() 87 self.pool.put(conn) 88 return response 89 90 def close_conn(self): 91 for i in range(self.maxconn): 92 self.pool.get().close() 93 94 obj=Exec_db.get_instance(host="192.168.10.103",port=3306,user="root",passwd="012615",db="sql_example",charset="utf8",maxconn=10) 95 96 def test_func(num): 97 data=(("男",i,"張小凡%s" %i) for i in range(num)) 98 sql="insert into tb1(gender,class_id,sname) values(%s,%s,%s)" 99 print(obj.exec_sql_many(sql,data)) 100 101 job_list=[] 102 for i in range(10): 103 t=threading.Thread(target=test_func,args=(100000,)) 104 t.start() 105 job_list.append(t) 106 for j in job_list: 107 j.join() 108 obj.close_conn() 109 print("totol time:",time.time()-start)
開啟10個連接池插入100W數據的時間:
totol time: 242.81142950057983
開啟50個連接池插入100W數據的時間:
totol time: 192.49499201774597
開啟100個線程池插入100W數據的時間:
totol time: 191.73923873901367
4.協程提升
使用協程的話,在I/O阻塞時,將會切換到其他任務去執行,這樣理論上來說消耗的資源應該會比多線程要少。下面是協程實現的連接池源代碼:

1 from gevent import monkey 2 monkey.patch_all() 3 import gevent 4 5 import pymysql 6 from queue import Queue 7 import time 8 9 class Exec_db: 10 11 __v=None 12 13 def __init__(self,host=None,port=None,user=None,passwd=None,db=None,charset=None,maxconn=5): 14 self.host,self.port,self.user,self.passwd,self.db,self.charset=host,port,user,passwd,db,charset 15 self.maxconn=maxconn 16 self.pool=Queue(maxconn) 17 for i in range(maxconn): 18 try: 19 conn=pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset) 20 conn.autocommit(True) 21 # self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor) 22 self.pool.put(conn) 23 except Exception as e: 24 raise IOError(e) 25 26 @classmethod 27 def get_instance(cls,*args,**kwargs): 28 if cls.__v: 29 return cls.__v 30 else: 31 cls.__v=Exec_db(*args,**kwargs) 32 return cls.__v 33 34 def exec_sql(self,sql,operation=None): 35 """ 36 執行無返回結果集的sql,主要有insert update delete 37 """ 38 try: 39 conn=self.pool.get() 40 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 41 response=cursor.execute(sql,operation) if operation else cursor.execute(sql) 42 except Exception as e: 43 print(e) 44 cursor.close() 45 self.pool.put(conn) 46 return None 47 else: 48 cursor.close() 49 self.pool.put(conn) 50 return response 51 52 53 def exec_sql_feach(self,sql,operation=None): 54 """ 55 執行有返回結果集的sql,主要是select 56 """ 57 try: 58 conn=self.pool.get() 59 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 60 response=cursor.execute(sql,operation) if operation else cursor.execute(sql) 61 except Exception as e: 62 print(e) 63 cursor.close() 64 self.pool.put(conn) 65 return None,None 66 else: 67 data=cursor.fetchall() 68 cursor.close() 69 self.pool.put(conn) 70 return response,data 71 72 def exec_sql_many(self,sql,operation=None): 73 """ 74 執行多個sql,主要是insert into 多條數據的時候 75 """ 76 try: 77 conn=self.pool.get() 78 cursor=conn.cursor(cursor=pymysql.cursors.DictCursor) 79 response=cursor.executemany(sql,operation) if operation else cursor.executemany(sql) 80 except Exception as e: 81 print(e) 82 cursor.close() 83 self.pool.put(conn) 84 else: 85 cursor.close() 86 self.pool.put(conn) 87 return response 88 89 def close_conn(self): 90 for i in range(self.maxconn): 91 self.pool.get().close() 92 93 obj=Exec_db.get_instance(host="192.168.10.103",port=3306,user="root",passwd="123456",db="sql_example",charset="utf8",maxconn=10) 94 95 def test_func(num): 96 data=(("男",i,"張小凡%s" %i) for i in range(num)) 97 sql="insert into tb1(gender,class_id,sname) values(%s,%s,%s)" 98 print(obj.exec_sql_many(sql,data)) 99 100 start=time.time() 101 job_list=[] 102 for i in range(10): 103 job_list.append(gevent.spawn(test_func,100000)) 104 105 gevent.joinall(job_list) 106 107 obj.close_conn() 108 109 print("totol time:",time.time()-start)
開啟10個連接池插入100W數據的時間:
totol time: 240.16892313957214
開啟50個連接池插入100W數據的時間:
totol time: 202.82087111473083
開啟100個線程池插入100W數據的時間:
totol time: 196.1710569858551
5.后記
統計結果如下:
單線程一個連接使用時間:978.76s
10個連接池 | 50個連接池 | 100個連接池 | |
多線程版 | 242.81s | 192.49s | 191.74s |
協程版 | 240.17s | 202.82s | 196.17s |
通過統計結果顯示,通過協程和多線程操作連接池插入相同數據,相對一個連接提升速度明顯,但是在將連接池開到50以及100時,性能提升並沒有想象中那么大,這時候,瓶頸已經不在網絡I/O上了,而在數據庫中,mysql在大量連接寫入數據時,也會有鎖的產生,這時候就需要優化數據庫的相關設置了。
在對比中顯示多線程利用線程池和協程利用線程池的性能差不多,但是多線程的開銷比協程要大。
和大神討論過,在項目開發中需要考慮到不同情況使用不同的技術,多線程適合使用在連接量較大,但每個連接處理時間很短的情況下,而協程適用於處理大量連接,但同時活躍的鏈接比較少,並且每個連接的時間量比較大的情況下。
在實際生產應用中,創建連接池可以按需分配,當連接不夠用時,在連接池沒達到上限的情況下,在連接池里面加入新的連接,在連接池比較空閑的情況下,關閉一些連接,實現這一個操作的原理是通過queue里面的超時時間來控制,當等待時間超過了超時時間時,說明連接不夠用了,需要加入新的連接。