深入研究sqlalchemy連接池


簡介:

相對於最新的MySQL5.6,MariaDB在性能、功能、管理、NoSQL擴展方面包含了更豐富的特性。比如微秒的支持、線程池、子查詢優化、組提交、進度報告等。

本文就主要探索MariaDB當中連接池的一些特性,配置。來配合我們的sqlalchemy。

一:起因

本來是不會寫這個東西的,但是,寫好了python--flask程序,使用sqlalchemy+mariadb,部署以后總是出問題,500錯誤之類的。

使用默認連接參數

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)

錯誤提示是:

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)

http://sqlalche.me/e/e3q8:

OperationalError:

Exception raised for errors that are related to the database’s operation andnot necessarily under the control of the programmer, e.g. an unexpecteddisconnect occurs, the data source name is not found, a transaction could notbe processed, a memory allocation error occurred during processing, etc.

This error is aDBAPI Errorand originates fromthe database driver (DBAPI), not SQLAlchemy itself.

TheOperationalErroris the most common (but not the only) error class usedby drivers in the context of the database connection being dropped, or notbeing able to connect to the database. For tips on how to deal with this, seethe sectionDealing with Disconnects.

意思是沒有正確斷開和數據庫的連接。

二:處理斷開

http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

官方給了三種方案來解決這個問題:

1.悲觀處理

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

pool_pre_ping=True

表示每次連接從池中檢查,如果有錯誤,監測為斷開的狀態,連接將被立即回收。

2.自定義悲觀的ping

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

說實話,沒怎么看明白。

像是try一個select 語句,如果沒問題就關閉。

 

3.樂觀處理

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")

這個看懂了,try一個select語句,如果無效,就返回Connection was invalidated!,然后開一個新的連接,再去執行select。這個應該寫個裝飾器,放在每個查詢前面。

4.使用連接池回收

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

這種方式就比較簡單了,在連接參數中寫上連接超時時間即可。

5.這是自己看文檔找到的方法

from sqlalchemy.pool import QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool

在sqlalchemy.pool下有已經配置好的連接池,直接使用這些連接池也應該可以。

三:測試

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

為避免因數據庫交叉連接,首先開啟5個MARIADB

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_recycle=3600)

用這5種連接參數進行連接測試。

如果你願意,也可以繼續開,QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool,把這幾種都測試一下。

 

8801 8805 均會不同程度的出現500錯誤,8801頻率還高點。

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)


 

Internal Server Error

The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.

等會兒看看8802  8803 8804如何。

四:深入研究sqlalchemy源碼

VENV\Flask_Base\Lib\site-packages\sqlalchemy\engine\__init__.py

看起來,沒有默認值。所以engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)報錯頻率比較高。

五:研究pool源碼

VENV\Flask_Base\Lib\site-packages\sqlalchemy\pool.py

看來poolclass的類型都定義在這里了。

1.SingletonThreadPool

A Pool that maintains one connection per thread

每個線程維護一個連接的池。

2.QueuePool

A :class:`.Pool` that imposes a limit on the number of open connections.

這種方式限制了連接數量,QueuePool是默認的連接池方式,除非使用了方言,也就是第三方鏈接庫。

難怪我使用MySQL-connector-python時老出錯呢,沒打開連接池啊。

3.NullPool

A Pool which does not pool connections...

不使用連接池

4.StaticPool

A Pool of exactly one connection, used for all requests.

一個完整的連接池,用於所有的連接。

5.AssertionPool

A :class:`.Pool` that allows at most one checked out connection at any given time.

任何時間只給一個簽出連接?為了debug模式?不懂了。

看的官方說明也沒這么詳細。

這么看來,如果我使用默認鏈接庫,可以不加參數試試。

mysql-python是sqlalchemy默認的mysql鏈接庫,我在windows下裝不上。放棄測試默認鏈接庫,手動指定連接池為QueuePool。

或者指定連接池類型為:QueuePool   StaticPool   SingletonThreadPool(多線程的時候)

六:連接池類型測試

修改測試docker

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_06 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True))
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=SingletonThreadPool)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=StaticPool)
Flask_Plan_06   8806       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=AssertionPool)

七:編寫測試腳本

 

import requests
import time
i = 1
while True:
    try:
        r=requests.get('http://192.168.0.104:8801',timeout=5)
        if  r.status_code==200:
            print(time.strftime('%Y-%m-%d %H:%M:%S')+'---'+str(i)+'---'+str(r.status_code)+'---ok')
        else:
            print(time.strftime('%Y-%m-%d %H:%M:%S') + '---' + str(i) + '---' + str(r.status_code) + '-----------badr')
            break
        time.sleep(1)
        i+=1
    except:
        print('except')
        print(time.strftime('%Y-%m-%d %H:%M:%S') +'---'+str(i)+'-----------bad')
        break

修改地址,把幾個測試服務都開始跑。

出錯就會停了。

代碼很爛,湊活測試而已。

從晚上22:30睡覺到早上6:10起床,pool_pre_ping=True,SingletonThreadPool,QueuePool,NullPool,StaticPool,AssertionPool,都很穩定,訪問代碼都是200

八:繼續研究相關代碼

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#using-connection-pools-with-multiprocessing

使用連接池進行多重處理

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#api-documentation-available-pool-implementations

api文檔--連接池的實現

classsqlalchemy.pool.Pool(creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)

 

Parameters:    
creator–可調用的函數返回對象。
recycle– 超時回收時間。如果連接超過這個時間,連接就被關閉,換一個新的連接
logging_name - 日志標識名稱
echo– 是否打印sql語句
use_threadlocal–是否使用線程,在同一應用程序的線程使用相同的連接對象
reset_on_return–在返回前的操作
    rollback,大概是自動回滾
    True 同為回滾
    commit 大概是自動提交的意思
    None 無操作
    none 無操作
    False 無操作
events– 列表元組,每個表單會傳遞給listen………………沒搞懂
listeners - 棄用,被listen取代
dialect–鏈接庫,使用create_engine時不使用,由引擎創建時處理
pre_ping–是否測試連接

基本上這些參數都在engine-creation-api中

http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#engine-creation-api

Pool                  (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
StaticPool         (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
NullPool            (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
QueuePool          (creator,pool_size=5,max_overflow=10,timeout=30,**kw)
SingletonThreadPool(creator,pool_size=5,**kw)
AssertionPool      (*args,**kw)

這下清楚了,Pool,StaicPool,NullPool,都一樣,直接回收,效率一定低了。

我們就指定默認的QueuePool好了。以后觀察着服務器的負載,負載大了以后,調整就好了。

自定義方法如下:

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',
                       pool_size=5,
                       max_overflow=10,
                       pool_timeout=30,
                       pool_pre_ping=True)

九:總結

曲折的道路,終於找到了解決方案。

sqlalchemy的教程當中,很少有講如何部署的。很多又是linux開發。可能在linux下很容易裝默認鏈接庫,部署的時候就自動使用了QueuePool連接池。所以這種問題很少出現。

我在windows下開發,部署在linux,開發和部署都使用了非默認鏈接庫,導致沒有使用默認連接池。

那么隨着深入研究,找到了連接池的配置,並掌握這一知識,為以后的開發部署工作,掃除了障礙。

雖然源碼里面還有很多看不懂,但是讀書百遍其義自見,還是要多讀(我是懶蛋,遇到問題,再去解決,下一個問題是什么呢?)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM