aiomysql異步操作mysql


一、概述

aiomysql是一個從asyncio(PEP-3156/tulip)框架訪問MySQL數據庫的庫。它依賴並重用PyMySQL的大部分部分。aiomysql試圖成為一個很棒的aiopg庫,並保留相同的api、外觀和感覺。

在內部aimysql是PyMySQL的副本,底層io調用切換到async,基本上是等待並在適當的位置添加async def coroutine。從aiopg移植的sqlalchemy支持。

 

安裝模塊

pip3 install aiomysql

 

簡單示例

import asyncio
import aiomysql

loop = asyncio.get_event_loop()


async def test_example():
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                       user='root', password='', db='mysql',
                                       loop=loop)

    cur = await conn.cursor()
    await cur.execute("SELECT Host,User FROM user")
    print(cur.description)
    r = await cur.fetchall()
    print(r)
    await cur.close()
    conn.close()

loop.run_until_complete(test_example())

 

二、demo演示

環境說明

操作系統:centos 7.6

mysql版本:5.7

數據庫名:test

數據庫默認編碼:utf8mb4

具體表結構以及數據,請參考鏈接:

https://www.cnblogs.com/xiao987334176/p/12721498.html 

這里面有2個表

 

單次執行

 執行select和update

#!/usr/bin/env python3
# coding: utf-8
"""
mysql 異步版本
"""
import traceback
import logging
import aiomysql
import asyncio
import time

logobj = logging.getLogger('mysql')


class Pmysql:
    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self):
        try:
            logobj.debug("will connect mysql~")
            __pool = await aiomysql.create_pool(
                minsize=5,  # 連接池最小值
                maxsize=10,  # 連接池最大值
                host='192.168.31.230',
                port=3306,
                user='root',
                password='abcd1234',
                db='test',
                autocommit=True,  # 自動提交模式
            )
            return __pool
        except:
            logobj.error('connect error.', exc_info=True)

    async def getCurosr(self):
        conn = await self.pool.acquire()
        # 返回字典格式
        cur = await conn.cursor(aiomysql.DictCursor)
        return conn, cur

    async def query(self, query, param=None):
        """
        查詢操作
        :param query: sql語句
        :param param: 參數
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            return await cur.fetchall()
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 釋放掉conn,將連接放回到連接池中
            await self.pool.release(conn)

    async def execute(self, query, param=None):
        """
        增刪改 操作
        :param query: sql語句
        :param param: 參數
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            if cur.rowcount == 0:
                return False
            else:
                return True
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 釋放掉conn,將連接放回到連接池中
            await self.pool.release(conn)


async def getAmysqlobj():
    mysqlobj = Pmysql()
    pool = await mysqlobj.initpool()
    mysqlobj.pool = pool
    return mysqlobj


async def test_select():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '張三豐' WHERE (`id` = '3');
    exeRtn = await mysqlobj.query("select * from users")
    # print("查詢結果",exeRtn)
    return exeRtn


async def test_update():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '張三豐' WHERE (`id` = '3');
    exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'")
    # print("exeRtn", exeRtn, type(exeRtn))
    if exeRtn:
        # print('操作成功')
        return '操作成功'
    else:
        # print('操作失敗')
        return '操作失敗'


async def main():  # 調用方
    tasks = [test_select(), test_update()]  # 把所有任務添加到task中
    done, pending = await asyncio.wait(tasks)  # 子生成器
    for r in done:  # done和pending都是一個任務,所以返回結果需要逐個調用result()
        # print('協程無序返回值:'+r.result())
        print(r.result())


if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop()  # 創建一個事件循環對象loop
    try:
        loop.run_until_complete(main())  # 完成事件循環,直到最后一個任務結束
    finally:
        loop.close()  # 結束事件循環
    print('所有IO任務總耗時%.5f秒' % float(time.time() - start))
View Code

執行輸出:

操作成功
[{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '123@qq.com', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}]
所有IO任務總耗時0.03948秒

 

 

批量插入

批量插入使用executemany

插入3萬條數據

#!/usr/bin/env python3
# coding: utf-8

import time
import asyncio
import aiomysql

start = time.time()
loop = asyncio.get_event_loop()

async def test_example():
    conn = await aiomysql.connect(host='192.168.31.230', port=3306,
                                       user='root', password='abcd1234',
                                       db='test', loop=loop)

    # create default cursor
    cursor = await conn.cursor()

    # execute sql query
    data = []
    for i in range(1,30000):
        data.append(('xiao%s'%i, '123', '12345678910', '123@qq.com', '2020-04-10 01:22:07'),)

    stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);"

    await cursor.executemany(stmt, data)
    await conn.commit()
    # detach cursor from connection
    await cursor.close()

    # close connection
    conn.close()

loop.run_until_complete(test_example())
print('所有IO任務總耗時%.5f秒' % float(time.time() - start))
View Code

執行輸出:

所有IO任務總耗時11.96885秒

 

本文參考鏈接:

https://www.cnblogs.com/ygy1997/p/11753335.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM