Python之MySQL數據庫連接驅動aiomysql的使用


  在上一篇博文介紹了MySQL數據庫取得pymysql的使用,參考:https://www.cnblogs.com/minseo/p/15597428.html

  本文介紹異步MySQL異步驅動aiomysql的使用

  1,安裝異步模塊

  如果沒有模塊則先使用pip安裝模塊

pip3 install asyncio
pip3 install aiomysql

  2,創建MySQL數據庫連接池

  和同步方式不一樣的是使用異步不能直接創建數據庫連接conn,需要先創建一個數據庫連接池對象__pool通過這個數據庫連接池對象來創建數據庫連接

  數據庫配置信息和介紹pymysql同步使用的數據庫是一樣的

import asyncio,aiomysql,time
# 數據庫配置dict
db_config = {
    'host': 'localhost',
    'user': 'www-data',
    'password': 'www-data',
    'db': 'awesome'
}

# 創建數據庫連接池協程函數
async def create_pool(**kw):
    global __pool
    __pool = await aiomysql.create_pool(
        host=kw.get('host', 'localhost'),
        port=kw.get('port', 3306),
        user=kw['user'],
        password=kw['password'],
        db=kw['db']
    )

loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循環中運行了協程函數則生成了全局變量__pool是一個連接池對象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>
print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8>

  3,創建執行sql語句的協程函數

  因為是異步模塊,只能在事件循環中通過await關鍵字調用,使用需要創建執行sql語句的協程函數

  在協程函數內使用全局上一步創建的連接池對象來創建連接conn和浮標對象cur,通過浮標對象來執行sql語句,執行方法和pymysql模塊的執行方法是一樣的

cursor.execute(sql,args)
sql # 需要執行的sql語句例如'select * from table_name'
args # 替換sql語句的格式化字符串,即sql語句可以使用%s代表一個字符串,然后在args中使用對應的變量或參數替換,args為一個list或元組,即是一個有序的序列需要和sql中的%s一一對應
# 例如sql='select * from table_name where id=%s'  args=['12345']
# 相當於使用args中的參數替換sql中的%s
# select * from table_name where id='12345'

  下面分別創建兩個協程函數select execute一個用來執行搜索操作,一個用來執行insert,update,delete等修改操作

# 執行select函數
async def select(sql,args,size=None):
    with await __pool as conn:
        cur = await conn.cursor(aiomysql.DictCursor)
        await cur.execute(sql.replace('?','?s'),args or ())
        if size:
            rs = await cur.fetchmany(size)
        else:
            rs = await cur.fetchall()
        await cur.close()
        return rs


# 執行insert update delete函數
async def execute(sql,args):
    with await __pool as conn:
        try:
            cur = await conn.cursor()
            await cur.execute(sql.replace('?','%s'),args)
            affetced = cur.rowcount
            await conn.commit()
            await cur.close()
        except BaseException as e:
            raise
        return affetced

  4,實踐執行sql語句

  實踐執行sql語句前我們首先在本機創建一個數據庫和對應的表用於測試

  數據庫對應的主機,用戶名,密碼,庫名,表名如下

host: localhost
user: www-data
password: www-data
db:awesome
table_name: users

  創建表名的sql語句如下,需要在數據庫中創建好對應的表

CREATE TABLE `users` (
  `id` varchar(50) NOT NULL,
  `email` varchar(50) NOT NULL,
  `passwd` varchar(50) NOT NULL,
  `admin` tinyint(1) NOT NULL,
  `name` varchar(50) NOT NULL,
  `image` varchar(500) NOT NULL,
  `created_at` double NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_email` (`email`),
  KEY `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

  創建好的表對應的結構如下

mysql> desc users;
+------------+--------------+------+-----+---------+-------+
| Field      | Type         | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| id         | varchar(50)  | NO   | PRI | NULL    |       |
| email      | varchar(50)  | NO   | UNI | NULL    |       |
| passwd     | varchar(50)  | NO   |     | NULL    |       |
| admin      | tinyint(1)   | NO   |     | NULL    |       |
| name       | varchar(50)  | NO   |     | NULL    |       |
| image      | varchar(500) | NO   |     | NULL    |       |
| created_at | double       | NO   | MUL | NULL    |       |
+------------+--------------+------+-----+---------+-------+
7 rows in set (2.68 sec)

  ①執行insert操作

# insert start
import time
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']
async def insert():
    await execute(sql,args)
loop.run_until_complete(insert())
# insert end

  執行方式和pymysql沒有區別,不同的是需要在事件循環中使用關鍵字await調用

  執行完畢在MySQL中查看插入的數據

mysql> select * from users;
+--------+-------------+----------+-------+------+-------------+------------------+
| id     | email       | passwd   | admin | name | image       | created_at       |
+--------+-------------+----------+-------+------+-------------+------------------+
| 111111 | test@qq.com | password |     1 | test | about:blank | 1637738541.48629 |
+--------+-------------+----------+-------+------+-------------+------------------+
1 row in set (0.00 sec)

  ②執行update操作

  直接在loop事件循環中執行execute協程函數也可以

# update start
import time
sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'
args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']
loop.run_until_complete(execute(sql,args))
# update end

  執行以后把email和name都修改了

  ③執行delete操作

# delete start
sql = 'delete from `users` where `id`=?'
args = ['111111'] 
loop.run_until_complete(execute(sql,args))
# delete end

  同樣根據關鍵字id指定的值刪除了這條數據

  ④執行selete操作

  在執行select操作前我們保證數據庫里面至少有一條數據

# select start
sql = 'select * from users'
args = []
rs = loop.run_until_complete(select(sql,args))
print(rs)
# select end

  這里直接執行搜索的協程函數select根據函數的定義返回的是所有結果的list,元素是查詢結果的字典

  輸出為

[{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637739212.74493}]

  如果結果有多個則使用list的下標取出

  

  補充

  同步模塊pymysql和異步模塊aiomysql執行速度對比

  假如我們需要往數據庫插入20000條數據,我們分別使用同步模式和異步模式

  首先刪除數據庫所有測試數據

delete from users;

  同步的代碼

  d:/learn-python3/學習腳本/pymysql/use_pymysql.py

import pymysql
db_config = {
    'host': 'localhost',
    'user': 'www-data',
    'password': 'www-data',
    'db': 'awesome'
}
# 創建連接,相當於把字典內的鍵值對傳遞
# 相當於執行pymysql.connect(host='localhost',user='www-data',password='www-data',db='awesome')
conn = pymysql.connect(**db_config)
# 創建游標
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from users'
args = []
# 執行查詢返回結果數量
# 執行查詢
rs=cursor.execute(sql,args)
# 獲取查詢結果
# 獲取查詢的第一條結果,返回一個dict,dict元素是查詢對應的鍵值對
# 如果查詢結果有多條則執行一次,游標移動到下一條數據,在執行一次又返回一條數據
# print(cursor.fetchone())
# print(cursor.fetchone())
# print(cursor.fetchall())
# print(cursor.fetchmany())
# {'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}
# 獲取查詢的所有結果,返回一個list,list元素是dict,dict元素是查詢對應的鍵值對
# print(cursor.fetchall())
# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]
# 獲取查詢的前幾條結果,返回一個dict,dict元素是查詢對應的鍵值對
# print(cursor.fetchmany(1))
# [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}]
# 執行修改操作
import time
# # insert start
sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
args = ['test1@qq.com','password',1,'test','about:blank',time.time(),'1111121']
# 使用replace 把'?'替換成'%s'
# rs = cursor.execute(sql.replace('?','%s'),args)
# print(cursor.rowcount)
# conn.commit()
# print(rs)
# insert end

# update start
# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'
# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] 
# print(cursor.execute(sql.replace('?','%s'),args))
# conn.commit()
# update end

# delete start
# sql = 'delete from `users` where `id`=?'
# args = ['111111'] 
# print(cursor.execute(sql.replace('?','%s'),args))
# conn.commit()
# delete end


# 寫成函數調用,函數內部使用了數據庫連接對象conn
# 可以先定義成全局變量global
def select(sql,args,size=None):
    
    cursor =  conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute(sql.replace('?','%s'),args or ())
    if size:
        rs = cursor.fetchmany(size)
    else:
        rs = cursor.fetchall()
    cursor.close
    # logging.info('rows returned: %s' % len(rs))
    return rs
 
def execute(sql,args):
     
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    try:
        cursor.execute(sql.replace('?','%s'),args)
        # rowcount方法把影響函數返回
        rs = cursor.rowcount
        cursor.close()
        conn.commit()
    except:
        raise
    return rs

start_time = time.time()
for n in range(20000):
    sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
    email = 'test%s@qq.com' %n
    args = [email,'password',1,'test','about:blank',time.time(),n] 
    execute(sql,args)
end_time = time.time()
# 打印開始和結束時間的差
print(end_time - start_time)

  我們使用一個循環20000次往數據庫插入數據

  執行,插入數據比較多需要等待一段時間輸出

D:\learn-python3\函數式編程>C:/Python37/python.exe d:/learn-python3/學習腳本/pymysql/use_pymysql.py
77.46903562545776

  可以在數據庫查詢到這20000條數據,而且這個表的字段created_at存儲了創建這條數據的時間戳,我們可以看到,id越往后的時間戳越往后,說明數據是同步按順序一一插入的

  我們按照字段created_at排序查詢

 

 

  下面我們刪除所有數據使用異步方式插入

  異步的代碼如下

  d:/learn-python3/學習腳本/aiomysql/use_aiomysql.py

import asyncio,aiomysql,time
# 數據庫配置dict
db_config = {
    'host': 'localhost',
    'user': 'www-data',
    'password': 'www-data',
    'db': 'awesome'
}

# 創建數據庫連接池協程函數
async def create_pool(**kw):
    global __pool
    __pool = await aiomysql.create_pool(
        host=kw.get('host', 'localhost'),
        port=kw.get('port', 3306),
        user=kw['user'],
        password=kw['password'],
        db=kw['db']
    )

loop=asyncio.get_event_loop()
loop.run_until_complete(create_pool(**db_config))
# 在事件循環中運行了協程函數則生成了全局變量__pool是一個連接池對象 <aiomysql.pool.Pool object at 0x00000244AD1724C8>
print(__pool)
# <aiomysql.pool.Pool object at 0x00000244AD1724C8>

# 執行select函數
async def select(sql,args,size=None):
    with await __pool as conn:
        cur = await conn.cursor(aiomysql.DictCursor)
        await cur.execute(sql.replace('?','?s'),args or ())
        if size:
            rs = await cur.fetchmany(size)
        else:
            rs = await cur.fetchall()
        await cur.close()
        return rs


# 執行insert update delete函數
async def execute(sql,args):
    with await __pool as conn:
        try:
            cur = await conn.cursor()
            await cur.execute(sql.replace('?','%s'),args)
            affetced = cur.rowcount
            await conn.commit()
            await cur.close()
        except BaseException as e:
            raise
        return affetced

# insert start
# import time
# sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
# args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111']
# async def insert():
#     await execute(sql,args)
# loop.run_until_complete(insert())
# insert end

# update start
# import time
# sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?'
# args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111']
# loop.run_until_complete(execute(sql,args))

# update end

# delete start
# sql = 'delete from `users` where `id`=?'
# args = ['111111'] 
# loop.run_until_complete(execute(sql,args))
# delete end

# select start
# sql = 'select * from users'
# args = []
# rs = loop.run_until_complete(select(sql,args))
# print(rs)
# select end

async def insert1():
     for n in range(10000):
        sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
        email = 'test%s@qq.com' %n
        args = [email,'password',1,'test','about:blank',time.time(),n] 
        await execute(sql,args)

async def insert2():
     for n in range(10001,20001):
        sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'
        email = 'test%s@qq.com' %n
        args = [email,'password',1,'test','about:blank',time.time(),n] 
        await execute(sql,args)

async def main():
    # 需要組合成一個事件才會異步執行即在執行insert1的時候同步執行insert2
    await asyncio.gather(insert1(),insert2())

start_time = time.time()
loop.run_until_complete(main())
end_time = time.time()
print(end_time - start_time)

  這里我們定義了兩個協程函數,分別用來執行前10000個數據和后10000個數據的插入,在main()把這兩個協程函數組合成一個事件循環

  等待一段時間后執行輸出如下,忽略這個warning,可以看到執行時間明顯比同步時間短

d:/learn-python3/學習腳本/aiomysql/use_aiomysql.py:42: DeprecationWarning: with await pool as conn deprecated, useasync with pool.acquire() as conn instead
  with await __pool as conn:
39.794615507125854

  我們去數據庫查詢一下數據也可以看到id從0開始和id從10001開始幾乎是同時插入的

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM