馬上就要開始寫仿優酷系統了,在這里提前學習一下ORM。
一開始去百度搜了一下相關資料,里面的ORM框架要自己寫,所以花了半天的時間學習了sqlalchemy:
SQLAlchemy
SQLAlchemy 比 Django 自帶的 ORM 好在哪里?
python sqlalchemy 和django orm 對比
不管是說Django的ORM好,還是說Sqlalchemy好,,我認為框架並不是重點,其核心在於通過學習一門ORM框架,達到能熟練的從數據庫取出數據\保存數據(也就是數據持久化),這才是重點。
Aiomysql:
同樣道理,掌握一門異步庫也能讓自己更加首席線程的操作:
1.為什么要使用異步訪問數據庫,有什么優缺點?
在多用戶高並發的情況下,若使用同步查詢,則等待時間過長,代價很高;
使用了異步能提高效率,但在同時增加了處理難度;而且還會有“讀臟數據的風險”:
A和B要買票,同時A和B都發現只剩一張票了,這個時候兩人同時點擊購買,則會出現“讀臟數據”的情況
2.普通的線程池和Aiomysql有什么區別?
首先准確的來說,Aiomysql主要使用的是協程,其次才是多線程,多線程建立連接池,每個連接使用協程處理多並發的情況,
所以,他們的區別應該是在,普通的線程池的每一個線程沒有使用協程,而aiomysql線程池里的線程使用了協程
正式開始主題:
1.Aiomysql創建連接
class DBcontroller: __engine=None __isinstance = False def __new__(cls, *args, **kwargs): if cls.__isinstance: # 如果被實例化了 return cls.__isinstance # 返回實例化對象 print('connecting to database...') asyncio.get_event_loop().run_until_complete(DBcontroller.connect()) cls.__isinstance = object.__new__(cls) # 否則實例化 return cls.__isinstance # 返回實例化的對象 @staticmethod async def connect(): try: __engine = await create_engine(user='root', db='youku', host='127.0.0.1', password='root', minsize=1, maxsize=10, autocommit=True) if __engine: DBcontroller.__engine = __engine DBcontroller.connectStatue =True print('connect to mysql success!') else: raise ("connect to mysql error ") except: print('connect error.', exc_info=True)
2.創建Model
class User(Base,Model):
__tablename__ = 'user'
id = Column(INTEGER(11), primary_key=True)
name = Column(String(255))
password = Column(String(255))
is_locked = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
is_vip = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
user_type = Column(String(255))
register_time = Column(String(255))
3.創建sqlalchemy表
from sqlalchemy import Column, Integer, String, MetaData,Table metadata = sa.MetaData() user = Table('user', MetaData(), Column('id', Integer, primary_key=True), Column('name', String(50)), Column('is_locked', Integer,nullable=False,default=0), Column('is_vip', Integer,nullable=False,default=0), Column('user_type', String(12)), Column('register_time', String(12)) ) ()
4.實現查詢/更新/插入功能
class DBcontroller:
__engine=None
__isinstance = False
def __new__(cls, *args, **kwargs):
if cls.__isinstance: # 如果被實例化了
return cls.__isinstance # 返回實例化對象
print('connecting to database...')
asyncio.get_event_loop().run_until_complete(DBcontroller.connect())
cls.__isinstance = object.__new__(cls) # 否則實例化
return cls.__isinstance # 返回實例化的對象
@staticmethod
async def connect():
try:
__engine = await create_engine(user='root',
db='youku',
host='127.0.0.1',
password='root',
minsize=1,
maxsize=10,
autocommit=True)
if __engine:
DBcontroller.__engine = __engine
DBcontroller.connectStatue =True
print('connect to mysql success!')
else:
raise ("connect to mysql error ")
except:
print('connect error.', exc_info=True)
def selectTable(self,table):
async def select():
dbstb={
'user':user
}
conn = await DBcontroller.__engine.acquire()
try:
result = await conn.execute(dbstb[table].select())
res = await result.fetchall()
for row in res:
print(row)
except Exception as e :
print(e)
finally:
# DBcontroller.__engine.release(conn)
pass
asyncio.get_event_loop().run_until_complete(select())
def executeTable(self,table,attr):
async def execute():
dbstb={
'user':user
}
conn = await DBcontroller.__engine.acquire()
try:
await conn.execute(dbstb[table].insert().values(attr))
except Exception as e:
if e.args[0]== 1062:
print('主鍵已存在')
elif e.args[0]==1366:
if attr['id']=='':
print('未設置主鍵')
elif attr['is_vip']=='':
print('未設置vip類型')
elif attr['is_locked']=='':
print('未設置上鎖類型')
else:print('未知錯誤,請聯系管理員')
else:
print(f'connect failed:{e}')
finally:
DBcontroller.__engine.release(conn)
asyncio.get_event_loop().run_until_complete(execute())
def updateTable(self,table,id,attr):
async def update():
dbstb={
'user':user
}
conn = await DBcontroller.__engine.acquire()
try:
await conn.execute(user.update().where(dbstb[table].c.id==id).values(attr))
except Exception as e:
if e.args[0]== 1062:
print('主鍵已存在')
elif e.args[0]==1366:
if attr['id']=='':
print('未設置主鍵')
elif attr['is_vip']=='':
print('未設置vip類型')
elif attr['is_locked']=='':
print('未設置上鎖類型')
else:print('未知錯誤,請聯系管理員')
print(e)
else:
print(f'connect failed:{e}')
finally:
DBcontroller.__engine.release(conn)
asyncio.get_event_loop().run_until_complete(update())
結果

