前言:
- 需要異步操作MySQL,又要用orm,使用sqlalchemy需要加celery,覺得比較麻煩,選擇了peewee-async
開發環境 python3.6.8+peewee-async0.5.12+peewee2.10.2
-
數據庫:MySQL,使用peewee-async需要依賴庫 pip install aiomysql
-
peewee-async,對peewee版本只支持peewee<=2.10.2,>=2.8.0
-
python3.5以后使用async和await關鍵字實現原生協程,也可以使用tornado的gen模塊下coroutine實現協程,或者asyncio模塊實現協程,下文統一使用async和await
tornado 異步調用MySQL
-
爬坑
-
最初遇到的坑,使用了最新版的peewee,連接池連接,也使用了async和await協程,但是怎么調用都會阻塞,后來發現不是阻塞單個協程,是阻塞了整個進程,因為tornado是單進程,必須數據庫也使用異步操作,才能不阻塞整個進程
-
pip install peewee-async 的時候默認會安裝符合版本要求的peewee,想用最新的peewee模塊可以使用--pre
-
查看peewee-async 模塊的MySQLDatabase,繼承了AsyncDatabase和peewee.MySQLDatabase,AsyncDatabase方法全部使用協程實現異步
-
-
peewee-async 連接MySQL,返回database對象
-
單連接
import peewee_async # db = peewee_async.MySQLDatabase(database_name, host, port, user, password) # 或者,將自己的數據庫信息封裝到字典中 db_setting = { "user": "root", "password": "xxxxxx", "host": "127.0.0.1", "port": 3306, "database": "test" } db = peewee_async.MySQLDatabase(**db_setting)
-
連接池
from playhouse.shortcuts import RetryOperationalError from peewee_async import PooledMySQLDatabase # 可以自動重新連接的連接池 class RetryMySQLDatabase(RetryOperationalError, PooledMySQLDatabase): _instance = None @staticmethod def get_db_instance(): if not RetryMySQLDatabase._instance: RetryMySQLDatabase._instance = RetryMySQLDatabase(database_name, host, port, user, password, max_connections=10) return RetryMySQLDatabase._instance db = RetryMySQLDatabase.get_db_instance()
-
-
返回的database對象的一些常用方法
-
get_tables() 返回列表,當前數據庫的所有表名
-
get_columns(table_name) 傳參表名,返回列表,包含ColumnMetadata對象,字段信息
-
create_tables()
第一個參數為列表,包含要創建的表model
第二個參數safe, 不傳默認為False,建表的時候如果表已經存在會報錯,可以加safe=True -
is_closed() 判斷當前連接是否關閉
-
close() 關閉連接
-
-
peewee
- peewee 模塊可以參照官方文檔用法,和sqlalchemy差別不大,為了下面的操作,暫時建一個model
book.py
# 集中寫一個basemodel,將數據庫對象綁定在model上,類才能映射到數據庫中的表 class BaseModel(Model): class Meta: database = db class Book(BaseModel): book_id = PrimaryKeyField() # int 主鍵自增,在peewee3.10 版本中新增了字段AutoField,表示主鍵自增 book_name = CharField(max_length=100, verbose_name="書名") # 鑒於篇幅, 作者表不寫,外鍵第一個參數為model類名,to_field表示關聯的字段 book_auth = ForeignKeyField(User, to_field="user_id", verbose_name="作者id")
-
peewee-async Manager
- 管理數據庫操作,實現異步操作數據庫必須使用Manager,查看源碼可以看到,類中的get, create, execute等方法都是使用裝飾器@asyncio.coroutine加yield from,在原有的數據庫操作基礎上做了封裝
- 初始化傳入數據庫連接對象,生成manager對象,使用該對象完成數據庫操作,在tornado中一般選擇綁定到app上
from tornado.web import RequestHandler from tornado import gen import tornado.ioloop from book import Book class RegHandler(RequestHandler): async def get(self): # 在handler類中可以使用綁定到app上的manager對象執行操作,因為是異步操作需要使用await關鍵字 # 以下兩種查詢方式返回結果對象格式不同 book_res = await self.application.objects.get(Book, book_name="簡愛") id = book_res.book_id # 只有調用了execute方法才是執行,query打印可以看到只是生成了sql語句 # 為保證異步調用必須使用peewee-async manager生成的對象執行操作函數,不能使用model的execute執行,會同步阻塞 query = Book.select().where(Book.username=="簡愛") # execute執行返回AsyncQueryWrapper對象,如果有值可以通過下標取出每個book對象 # query 對象執行前可以調用dicts()方法,返回對象內容為字典格式 # query.tuples() 返回對象內容為元組格式,相當於sqlalchemy,fetchall() # 其他方法或者屬性有需要的可以使用dir()方法和getattr()方法查看屬性,以及屬性調用后返回值 book_res = await self.application.objects.execute(query.dicts()) pass async def post(self): from tornado.escape import json_decode body = json_decode(self.request.body) # 增 # 如果參數是字典格式,且key值對應字段名稱,可以使用peewee model里面的insert方法 await self.application.objects.execute(Book.insert(body)) # 或者使用封裝的create方法,create方法源碼還是調用了model類的insert方法 await self.application.objects.create(Book, boo_name=body.get("book"), book_auth=2) pass app = tornado.web.Application([ (r"/book", BookHandler) ]) if __name__ == '__main__': app = tornado.web.Application(urlpaten) import peewee_async # 將manager對象綁定到app上 app.objects = peewee_async.Manager(database) server = httpserver.HTTPServer(app, xheaders=True) server.listen(80) tornado.ioloop.IOLoop.current().start()
-
初步介紹先先寫到這里,通過上述介紹使用peewee和peewee-async沒有大問題,后面會通過詳細功能具體詳細介紹使用,細小的api建議看官方文檔
-
有問題歡迎指出,隨時修正