雜項之pymysql連接池


雜項之pymysql連接池

本節內容

  1. 本文的誕生
  2. 連接池及單例模式
  3. 多線程提升
  4. 協程提升
  5. 后記

1.本文的誕生

由於前幾天接觸了pymysql,在測試數據過程中,使用普通的pymysql插入100W條數據,消耗時間很漫長,實測990s也就是16.5分鍾左右才能插完,於是,腦海中誕生了一個想法,能不能造出一個連接池出來,提升數據呢?就像一根管道太小,那就多加幾根管道看效果如何呢?於是。。。前前后后折騰了將近一天時間,就有了本文的誕生。。。

2.連接池及單例模式

先說單例模式吧,為什么要在這使用單例模式呢?使用單例模式能夠節省資源。

其實單例模式沒有什么神秘的,簡單的單例模式實現其實就是在類里面定義一個變量,再定義一個類方法,這個類方法用來為調用者提供這個類的實例化對象。(ps:個人對單例模式的一點淺薄理解...)

那么連接池是怎么回事呢?原來使用pymysql創建一個conn對象的時候,就已經和mysql之間創建了一個tcp的長連接,只要不調用這個對象的close方法,這個長連接就不會斷開,這樣,我們創建了一組conn對象,並將這些conn對象放到隊列里面去,這個隊列現在就是一個連接池了。

現在,我們先用一個連接,往數據庫中插入100W條數據,下面是源碼:

 1 import pymysql
 2 import time
 3 start=time.time()
 4 conn = pymysql.connect(host="192.168.10.103",port=3306,user="root",passwd="123456",db="sql_example",charset="utf8")
 5 conn.autocommit(True)  # 設置自動commit
 6 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 設置返回的結果集用字典來表示,默認是元祖
 7 data=(("",i,"張小凡%s" %i) for i in range(1000000))  # 偽造數據,data是個生成器
 8 cursor.executemany("insert into tb1(gender,class_id,sname) values(%s,%s,%s)",data)  # 可以使用executemany執行多條sql
 9 # conn.commit()
10 cursor.close()
11 conn.close()
12 print("totol time:",time.time()-start)

 

執行結果為:

totol time: 978.7649309635162

3.多線程提升

使用多線程,在啟動時創建一組線程,每個線程去連接池里面獲取一個連接,然后插入數據,這樣將會大大提升執行sql的速度,下面是使用多線程實現的連接池源碼:

  1 from gevent import monkey
  2 monkey.patch_all()
  3 
  4 import threading
  5 
  6 import pymysql
  7 from queue import Queue
  8 import time
  9 
 10 class Exec_db:
 11 
 12     __v=None
 13 
 14     def __init__(self,host=None,port=None,user=None,passwd=None,db=None,charset=None,maxconn=5):
 15         self.host,self.port,self.user,self.passwd,self.db,self.charset=host,port,user,passwd,db,charset
 16         self.maxconn=maxconn
 17         self.pool=Queue(maxconn)
 18         for i in range(maxconn):
 19             try:
 20                 conn=pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)
 21                 conn.autocommit(True)
 22                 # self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor)
 23                 self.pool.put(conn)
 24             except Exception as e:
 25                 raise IOError(e)
 26 
 27     @classmethod
 28     def get_instance(cls,*args,**kwargs):
 29         if cls.__v:
 30             return cls.__v
 31         else:
 32             cls.__v=Exec_db(*args,**kwargs)
 33             return cls.__v
 34 
 35     def exec_sql(self,sql,operation=None):
 36         """
 37             執行無返回結果集的sql,主要有insert update delete
 38         """
 39         try:
 40             conn=self.pool.get()
 41             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 42             response=cursor.execute(sql,operation) if operation else cursor.execute(sql)
 43         except Exception as e:
 44             print(e)
 45             cursor.close()
 46             self.pool.put(conn)
 47             return None
 48         else:
 49             cursor.close()
 50             self.pool.put(conn)
 51             return response
 52 
 53 
 54     def exec_sql_feach(self,sql,operation=None):
 55         """
 56             執行有返回結果集的sql,主要是select
 57         """
 58         try:
 59             conn=self.pool.get()
 60             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 61             response=cursor.execute(sql,operation) if operation else cursor.execute(sql)
 62         except Exception as e:
 63             print(e)
 64             cursor.close()
 65             self.pool.put(conn)
 66             return None,None
 67         else:
 68             data=cursor.fetchall()
 69             cursor.close()
 70             self.pool.put(conn)
 71             return response,data
 72 
 73     def exec_sql_many(self,sql,operation=None):
 74         """
 75             執行多個sql,主要是insert into 多條數據的時候
 76         """
 77         try:
 78             conn=self.pool.get()
 79             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 80             response=cursor.executemany(sql,operation) if operation else cursor.executemany(sql)
 81         except Exception as e:
 82             print(e)
 83             cursor.close()
 84             self.pool.put(conn)
 85         else:
 86             cursor.close()
 87             self.pool.put(conn)
 88             return response
 89 
 90     def close_conn(self):
 91         for i in range(self.maxconn):
 92             self.pool.get().close()
 93 
 94 obj=Exec_db.get_instance(host="192.168.10.103",port=3306,user="root",passwd="012615",db="sql_example",charset="utf8",maxconn=10)
 95 
 96 def test_func(num):
 97     data=(("",i,"張小凡%s" %i) for i in range(num))
 98     sql="insert into tb1(gender,class_id,sname) values(%s,%s,%s)"
 99     print(obj.exec_sql_many(sql,data))
100 
101 job_list=[]
102 for i in range(10):
103     t=threading.Thread(target=test_func,args=(100000,))
104     t.start()
105     job_list.append(t)
106 for j in job_list:
107     j.join()
108 obj.close_conn()
109 print("totol time:",time.time()-start)
顯示代碼

 

 

開啟10個連接池插入100W數據的時間:

totol time: 242.81142950057983

開啟50個連接池插入100W數據的時間:

totol time: 192.49499201774597

開啟100個線程池插入100W數據的時間:

totol time: 191.73923873901367

4.協程提升

使用協程的話,在I/O阻塞時,將會切換到其他任務去執行,這樣理論上來說消耗的資源應該會比多線程要少。下面是協程實現的連接池源代碼:

  1 from gevent import monkey
  2 monkey.patch_all()
  3 import gevent
  4 
  5 import pymysql
  6 from queue import Queue
  7 import time
  8 
  9 class Exec_db:
 10 
 11     __v=None
 12 
 13     def __init__(self,host=None,port=None,user=None,passwd=None,db=None,charset=None,maxconn=5):
 14         self.host,self.port,self.user,self.passwd,self.db,self.charset=host,port,user,passwd,db,charset
 15         self.maxconn=maxconn
 16         self.pool=Queue(maxconn)
 17         for i in range(maxconn):
 18             try:
 19                 conn=pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)
 20                 conn.autocommit(True)
 21                 # self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor)
 22                 self.pool.put(conn)
 23             except Exception as e:
 24                 raise IOError(e)
 25 
 26     @classmethod
 27     def get_instance(cls,*args,**kwargs):
 28         if cls.__v:
 29             return cls.__v
 30         else:
 31             cls.__v=Exec_db(*args,**kwargs)
 32             return cls.__v
 33 
 34     def exec_sql(self,sql,operation=None):
 35         """
 36             執行無返回結果集的sql,主要有insert update delete
 37         """
 38         try:
 39             conn=self.pool.get()
 40             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 41             response=cursor.execute(sql,operation) if operation else cursor.execute(sql)
 42         except Exception as e:
 43             print(e)
 44             cursor.close()
 45             self.pool.put(conn)
 46             return None
 47         else:
 48             cursor.close()
 49             self.pool.put(conn)
 50             return response
 51 
 52 
 53     def exec_sql_feach(self,sql,operation=None):
 54         """
 55             執行有返回結果集的sql,主要是select
 56         """
 57         try:
 58             conn=self.pool.get()
 59             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 60             response=cursor.execute(sql,operation) if operation else cursor.execute(sql)
 61         except Exception as e:
 62             print(e)
 63             cursor.close()
 64             self.pool.put(conn)
 65             return None,None
 66         else:
 67             data=cursor.fetchall()
 68             cursor.close()
 69             self.pool.put(conn)
 70             return response,data
 71 
 72     def exec_sql_many(self,sql,operation=None):
 73         """
 74             執行多個sql,主要是insert into 多條數據的時候
 75         """
 76         try:
 77             conn=self.pool.get()
 78             cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
 79             response=cursor.executemany(sql,operation) if operation else cursor.executemany(sql)
 80         except Exception as e:
 81             print(e)
 82             cursor.close()
 83             self.pool.put(conn)
 84         else:
 85             cursor.close()
 86             self.pool.put(conn)
 87             return response
 88 
 89     def close_conn(self):
 90         for i in range(self.maxconn):
 91             self.pool.get().close()
 92 
 93 obj=Exec_db.get_instance(host="192.168.10.103",port=3306,user="root",passwd="123456",db="sql_example",charset="utf8",maxconn=10)
 94 
 95 def test_func(num):
 96     data=(("",i,"張小凡%s" %i) for i in range(num))
 97     sql="insert into tb1(gender,class_id,sname) values(%s,%s,%s)"
 98     print(obj.exec_sql_many(sql,data))
 99 
100 start=time.time()
101 job_list=[]
102 for i in range(10):
103     job_list.append(gevent.spawn(test_func,100000))
104 
105 gevent.joinall(job_list)
106 
107 obj.close_conn()
108 
109 print("totol time:",time.time()-start)
顯示代碼

 

 

開啟10個連接池插入100W數據的時間:

totol time: 240.16892313957214

開啟50個連接池插入100W數據的時間:

totol time: 202.82087111473083

開啟100個線程池插入100W數據的時間:

totol time: 196.1710569858551

5.后記

統計結果如下:

單線程一個連接使用時間:978.76s

  10個連接池 50個連接池 100個連接池
多線程版 242.81s 192.49s 191.74s
協程版 240.17s 202.82s 196.17s

通過統計結果顯示,通過協程和多線程操作連接池插入相同數據,相對一個連接提升速度明顯,但是在將連接池開到50以及100時,性能提升並沒有想象中那么大,這時候,瓶頸已經不在網絡I/O上了,而在數據庫中,mysql在大量連接寫入數據時,也會有鎖的產生,這時候就需要優化數據庫的相關設置了。

在對比中顯示多線程利用線程池和協程利用線程池的性能差不多,但是多線程的開銷比協程要大。

和大神討論過,在項目開發中需要考慮到不同情況使用不同的技術,多線程適合使用在連接量較大,但每個連接處理時間很短的情況下,而協程適用於處理大量連接,但同時活躍的鏈接比較少,並且每個連接的時間量比較大的情況下。

在實際生產應用中,創建連接池可以按需分配,當連接不夠用時,在連接池沒達到上限的情況下,在連接池里面加入新的連接,在連接池比較空閑的情況下,關閉一些連接,實現這一個操作的原理是通過queue里面的超時時間來控制,當等待時間超過了超時時間時,說明連接不夠用了,需要加入新的連接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM