[Python]peewee 使用經驗


peewee 使用經驗

本文使用案例是基於 python2.7 實現

以下內容均為個人使用 peewee 的經驗和遇到的坑,不會涉及過多的基本操作。所以,沒有使用過 peewee,可以先閱讀文檔

正確性和覆蓋面有待提高,如果遇到新的問題歡迎討論。

一、介紹

Peewee 是一個簡單、輕巧的 Python ORM

  • 簡單、輕巧、富有表現力(原詞 expressive )的ORM
  • 支持python版本 2.6+ 和 3.2+
  • 支持數據庫包括:sqlite, mysql and postgresql
  • 包含一堆實用的擴展在 playhouse 模塊中

總而言之,peewee 可以完全可以應付個人或企業的中小型項目的 Model 層,上手容易,功能很強大。

二、基本使用方法

from peewee import *

db = SqliteDatabase('people.db')
class BaseModel(Model):
    class Meta:
        database = db # This model uses the "people.db" database.

class Person(BaseModel):
    name = CharField()
    birthday = DateField()
    is_relative = BooleanField()    

基本的使用方法,推薦閱讀文檔--quickstart

三、推薦使用姿勢

下面介紹一些我在使用過程的經驗和遇到的坑,希望可以幫助大家更好的使用 peewee。

3.1 連接數據庫

連接數據庫時,推薦使用 playhouse 中的 db_url 模塊。db_url 的 connect 方法可以通過傳入的 URL 字符串,生成數據庫連接。

3.1.1 connect(url, **connect_params)

通過傳入的 url 字符串,創建一個數據庫實例

url形如

  • mysql://user:passwd@ip:port/my_db 將創建一個 本地 MySQL 的 my_db 數據庫的實例(will create a MySQLDatabase instance)
  • mysql+pool://user:passwd@ip:port/my_db?charset=utf8&max_connections=20&stale_timeout=300 將創建一個本地 MySQL 的 my_db 的連接池,最大連接數為20(In a multi-threaded application, up to max_connections will be opened. Each thread (or, if using gevent, greenlet) will have it’s own connection.),超時時間為300秒(will create a PooledMySQLDatabase instance)
    注意:charset 默認為utf8。如需要支持 emoji ,charset 設置為utf8mb4,同時保證創建數據庫時的字符集設置正確CREATE DATABASE mydatabase CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

支持的 schemes

  • apsw: APSWDatabase
  • mysql: MySQLDatabase
  • mysql+pool: PooledMySQLDatabase
  • postgres: PostgresqlDatabase
  • postgres+pool: PooledPostgresqlDatabase
  • postgresext: PostgresqlExtDatabase
  • postgresext+pool: PooledPostgresqlExtDatabase
  • sqlite: SqliteDatabase
  • sqliteext: SqliteExtDatabase
  • sqlite+pool: PooledSqliteDatabase
  • sqliteext+pool: PooledSqliteExtDatabase

3.1.2 推薦姿勢

from playhouse.db_url import connect

from dock.common import config

# url: mysql+pool://root:root@127.0.0.1:3306/appmanage?max_connections=300&stale_timeout=300
mysql_config_url = config_dict.get('config').get('mysql').get('url')
db = connect(url=mysql_config_url)

查看更多詳情請移步官方文檔:db-url

3.2 連接池的使用

peewee 的連接池,使用時需要顯式的關閉連接。下面先說下為什么,最后會給出推薦的使用方法,避免進坑

3.2.1 為什么要顯式的關閉連接

Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.

這里引用官方文檔的提示。大致說:“超時連接不會自動關閉,只會在有新的請求時是才會關閉”。這里的request是指‘web 框架處理的請求’,peewee 源碼片段:

def _connect(self, *args, **kwargs):
    while True:
        try:
            # Remove the oldest connection from the heap.
            ts, conn = heapq.heappop(self._connections)  # _connections是連接實例的list(pool)
            key = self.conn_key(conn)
        except IndexError:
            ts = conn = None
            logger.debug('No connection available in pool.')
            break
        else:
            if self._is_closed(key, conn):
                # This connecton was closed, but since it was not stale
                # it got added back to the queue of available conns. We
                # then closed it and marked it as explicitly closed, so
                # it's safe to throw it away now.
                # (Because Database.close() calls Database._close()).
                logger.debug('Connection %s was closed.', key)
                ts = conn = None
                self._closed.discard(key)
            elif self.stale_timeout and self._is_stale(ts):
                # If we are attempting to check out a stale connection,
                # then close it. We don't need to mark it in the "closed"
                # set, because it is not in the list of available conns
                # anymore.
                logger.debug('Connection %s was stale, closing.', key)
                self._close(conn, True)
                self._closed.discard(key)
                ts = conn = None
            else:
                break
    if conn is None:
        if self.max_connections and (
                len(self._in_use) >= self.max_connections):
            raise ValueError('Exceeded maximum connections.')
        conn = super(PooledDatabase, self)._connect(*args, **kwargs)
        ts = time.time()
        key = self.conn_key(conn)
        logger.debug('Created new connection %s.', key)

    self._in_use[key] = ts  # 使用中的數據庫連接實例dict
    return conn

根據 pool 庫中的 _connect 方法的代碼可知:每次在建立數據庫連接時,會檢查連接實例是否超時。但是需要注意一點:使用中的數據庫連接實例(_in_use dict中的數據庫連接實例),是不會在創建數據庫連接時,檢查是否超時的

因為這段代碼中,每次創建連接實例,都是在 _connections(pool) 取實例,如果有的話就判斷是否超時;如果沒有的話就新建。

然而,使用中的數據庫連接並不在 _connections 中,所以每次創建數據庫連接實例時,並沒有檢測使用中的數據庫連接實例是否超時。

只有調用連接池實例的 _close 方法。執行這個方法后,才會把使用后的連接實例放回到 _connections (pool)。

def _close(self, conn, close_conn=False):
    key = self.conn_key(conn)
    if close_conn:
        self._closed.add(key)
        super(PooledDatabase, self)._close(conn)  # 關閉數據庫連接的方法
    elif key in self._in_use:
        ts = self._in_use[key]
        del self._in_use[key]
        if self.stale_timeout and self._is_stale(ts):   # 到這里才會判斷_in_use中的連接實例是否超時
            logger.debug('Closing stale connection %s.', key)
            super(PooledDatabase, self)._close(conn)   # 超時的話,關閉數據庫連接
        else:
            logger.debug('Returning %s to pool.', key)
            heapq.heappush(self._connections, (ts, conn))  # 沒有超時的話,放回到pool中

3.2.2 如果不顯式的關閉連接,會出現的問題

如果不調用_close方法的話,使用后 的數據庫連接就一直不會關閉(兩個含義:回到pool中和關閉數據庫連接),這樣會造成兩個問題:

  1. 每次都是新建數據庫連接,因為 pool 中沒有數據庫連接實例。會導致稍微有一點並發量就會返回Exceeded maximum connections.錯誤
  2. MySQL也是有 timeout 的,如果一個連接長時間沒有請求的話,MySQL Server 就會關閉這個連接,但是,peewee的已建立(后面會解釋為什么特指已建立的)的連接實例,並不知道 MySQL Server 已經關閉了,再去通過這個連接請求數據的話,就會返回 Error 2006: “MySQL server has gone away” 錯誤,根據官方文檔

3.2.3 推薦姿勢

所以,每次操作完數據庫就關閉連接實例。

  • 用法1:使用with

    def send_rule():
        with db.execution_context():
        # A new connection will be opened or, if using a connection pool,
        # pulled from the pool of available connections. Additionally, a
        # transaction will be started.
            for user in get_all_user():
                user_id = user['id']
                rule = Rule(user_id)
                rule_dict = rule.slack_rule(index)
                .....do something.....
    
  • 用法2:使用Flask hook

    @app.before_request
    def _db_connect():
        database.connect()
    #
    # This hook ensures that the connection is closed when we've finished
    # processing the request.
    @app.teardown_request
    def _db_close(exc):
        if not database.is_closed():
            database.close()
    #
    #
    # 更優雅的用法:
    from playhouse.flask_utils import FlaskDB
    from dock_fastgear.model.base import db
    #
    app = Flask(__name__)
    FlaskDB(app, db)  # 這樣就自動做了上面的事情(具體實現可查看http://docs.peewee-orm.com/en/latest/peewee/playhouse.html?highlight=Flask%20DB#flask-utils)
    

查看更多詳情請移步官方文檔:pool-apis

3.3 處理查詢結果

這里沒有什么大坑,就是有兩點需要注意:

首先,查詢的結果都是該 Model 的 object,注意不是 dict。如果想讓結果為 dict,需要 playhouse 模塊的工具方法進行轉化:from playhouse.shortcuts import model_to_dict

其次,get方法只會返回一條記錄

3.3.1 推薦姿勢

from playhouse.shortcuts import model_to_dict
from model import HelloGitHub

def read_from_db(input_vol):
    content_list = []
    category_object_list = HelloGitHub.select(HelloGitHub.category).where(HelloGitHub.vol == input_vol)\
        .group_by(HelloGitHub.category).order_by(HelloGitHub.category)

    for fi_category_object in category_object_list:
        hellogithub = HelloGitHub.select()\
            .where((HelloGitHub.vol == input_vol)
                   & (HelloGitHub.category == fi_category_object.category))\
            .order_by(HelloGitHub.create_time)
        for fi_hellogithub in hellogithub:
            content_list.append(model_to_dict(fi_hellogithub))
    return content_list

四、常見錯誤及解決辦法

4.1 'buffer' object has no attribute 'translate'

  • 錯誤信息: "'buffer' object has no attribute 'translate'"
  • 場景:BlobField 字段存儲zlib compress壓縮的數據
  • 解決辦法:需要指定pymysql的版本小於0.6.7 否則會報錯
  • 參考

4.2 Can't connect to MySQL server Lost connection to MySQL server during query

  • 錯誤信息:Can't connect to MySQL server Lost connection to MySQL server during query
  • 場景:向 RDS 中插入數據
  • 解決辦法:因為請求的連接數過多,達到了 RDS 設置的連接數,所以需要調高 RDS 連接數
  • 參考

4.3 打印執行的 SQL

query = News.select().where(News.url == 'test')
sql, param = query.sql()
print sql.replace("?","{}").format(*param)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM