馬上就要開始寫仿優酷系統了,在這里提前學習一下ORM。
一開始去百度搜了一下相關資料,里面的ORM框架要自己寫,所以花了半天的時間學習了sqlalchemy:
SQLAlchemy
SQLAlchemy 比 Django 自帶的 ORM 好在哪里?
python sqlalchemy 和django orm 對比
不管是說Django的ORM好,還是說Sqlalchemy好,,我認為框架並不是重點,其核心在於通過學習一門ORM框架,達到能熟練的從數據庫取出數據\保存數據(也就是數據持久化),這才是重點。
Aiomysql:
同樣道理,掌握一門異步庫也能讓自己更加首席線程的操作:
1.為什么要使用異步訪問數據庫,有什么優缺點?
在多用戶高並發的情況下,若使用同步查詢,則等待時間過長,代價很高;
使用了異步能提高效率,但在同時增加了處理難度;而且還會有“讀臟數據的風險”:
A和B要買票,同時A和B都發現只剩一張票了,這個時候兩人同時點擊購買,則會出現“讀臟數據”的情況
2.普通的線程池和Aiomysql有什么區別?
首先准確的來說,Aiomysql主要使用的是協程,其次才是多線程,多線程建立連接池,每個連接使用協程處理多並發的情況,
所以,他們的區別應該是在,普通的線程池的每一個線程沒有使用協程,而aiomysql線程池里的線程使用了協程
正式開始主題:
1.Aiomysql創建連接

class DBcontroller: __engine=None __isinstance = False def __new__(cls, *args, **kwargs): if cls.__isinstance: # 如果被實例化了 return cls.__isinstance # 返回實例化對象 print('connecting to database...') asyncio.get_event_loop().run_until_complete(DBcontroller.connect()) cls.__isinstance = object.__new__(cls) # 否則實例化 return cls.__isinstance # 返回實例化的對象 @staticmethod async def connect(): try: __engine = await create_engine(user='root', db='youku', host='127.0.0.1', password='root', minsize=1, maxsize=10, autocommit=True) if __engine: DBcontroller.__engine = __engine DBcontroller.connectStatue =True print('connect to mysql success!') else: raise ("connect to mysql error ") except: print('connect error.', exc_info=True)
2.創建Model
class User(Base,Model): __tablename__ = 'user' id = Column(INTEGER(11), primary_key=True) name = Column(String(255)) password = Column(String(255)) is_locked = Column(INTEGER(11), nullable=False, server_default=text("'0'")) is_vip = Column(INTEGER(11), nullable=False, server_default=text("'0'")) user_type = Column(String(255)) register_time = Column(String(255))
3.創建sqlalchemy表

from sqlalchemy import Column, Integer, String, MetaData,Table metadata = sa.MetaData() user = Table('user', MetaData(), Column('id', Integer, primary_key=True), Column('name', String(50)), Column('is_locked', Integer,nullable=False,default=0), Column('is_vip', Integer,nullable=False,default=0), Column('user_type', String(12)), Column('register_time', String(12)) ) ()
4.實現查詢/更新/插入功能
class DBcontroller: __engine=None __isinstance = False def __new__(cls, *args, **kwargs): if cls.__isinstance: # 如果被實例化了 return cls.__isinstance # 返回實例化對象 print('connecting to database...') asyncio.get_event_loop().run_until_complete(DBcontroller.connect()) cls.__isinstance = object.__new__(cls) # 否則實例化 return cls.__isinstance # 返回實例化的對象 @staticmethod async def connect(): try: __engine = await create_engine(user='root', db='youku', host='127.0.0.1', password='root', minsize=1, maxsize=10, autocommit=True) if __engine: DBcontroller.__engine = __engine DBcontroller.connectStatue =True print('connect to mysql success!') else: raise ("connect to mysql error ") except: print('connect error.', exc_info=True) def selectTable(self,table): async def select(): dbstb={ 'user':user } conn = await DBcontroller.__engine.acquire() try: result = await conn.execute(dbstb[table].select()) res = await result.fetchall() for row in res: print(row) except Exception as e : print(e) finally: # DBcontroller.__engine.release(conn) pass asyncio.get_event_loop().run_until_complete(select()) def executeTable(self,table,attr): async def execute(): dbstb={ 'user':user } conn = await DBcontroller.__engine.acquire() try: await conn.execute(dbstb[table].insert().values(attr)) except Exception as e: if e.args[0]== 1062: print('主鍵已存在') elif e.args[0]==1366: if attr['id']=='': print('未設置主鍵') elif attr['is_vip']=='': print('未設置vip類型') elif attr['is_locked']=='': print('未設置上鎖類型') else:print('未知錯誤,請聯系管理員') else: print(f'connect failed:{e}') finally: DBcontroller.__engine.release(conn) asyncio.get_event_loop().run_until_complete(execute()) def updateTable(self,table,id,attr): async def update(): dbstb={ 'user':user } conn = await DBcontroller.__engine.acquire() try: await conn.execute(user.update().where(dbstb[table].c.id==id).values(attr)) except Exception as e: if e.args[0]== 1062: print('主鍵已存在') elif e.args[0]==1366: if attr['id']=='': print('未設置主鍵') elif attr['is_vip']=='': print('未設置vip類型') elif attr['is_locked']=='': print('未設置上鎖類型') else:print('未知錯誤,請聯系管理員') print(e) else: print(f'connect failed:{e}') finally: DBcontroller.__engine.release(conn) asyncio.get_event_loop().run_until_complete(update())
結果