aiomysql+sqlalchemy實現ORM框架【實現仿優酷系統添加用戶】


  馬上就要開始寫仿優酷系統了,在這里提前學習一下ORM。

  一開始去百度搜了一下相關資料,里面的ORM框架要自己寫,所以花了半天的時間學習了sqlalchemy:

SQLAlchemy

  SQLAlchemy 比 Django 自帶的 ORM 好在哪里?

   python sqlalchemy 和django orm 對比

  不管是說Django的ORM好,還是說Sqlalchemy好,,我認為框架並不是重點,其核心在於通過學習一門ORM框架,達到能熟練的從數據庫取出數據\保存數據(也就是數據持久化),這才是重點。

Aiomysql:

  同樣道理,掌握一門異步庫也能讓自己更加首席線程的操作:

    1.為什么要使用異步訪問數據庫,有什么優缺點?

      在多用戶高並發的情況下,若使用同步查詢,則等待時間過長,代價很高;

      使用了異步能提高效率,但在同時增加了處理難度;而且還會有“讀臟數據的風險”:

      A和B要買票,同時A和B都發現只剩一張票了,這個時候兩人同時點擊購買,則會出現“讀臟數據”的情況

    2.普通的線程池和Aiomysql有什么區別?

      首先准確的來說,Aiomysql主要使用的是協程,其次才是多線程,多線程建立連接池,每個連接使用協程處理多並發的情況,

      所以,他們的區別應該是在,普通的線程池的每一個線程沒有使用協程,而aiomysql線程池里的線程使用了協程

正式開始主題:

  1.Aiomysql創建連接

class DBcontroller:
    __engine=None
    __isinstance = False
    def __new__(cls, *args, **kwargs):
        if cls.__isinstance:  # 如果被實例化了
            return cls.__isinstance  # 返回實例化對象
        print('connecting to database...')
        asyncio.get_event_loop().run_until_complete(DBcontroller.connect())
        cls.__isinstance = object.__new__(cls)  # 否則實例化
        return cls.__isinstance  # 返回實例化的對象

    @staticmethod
    async def connect():
        try:
            __engine = await create_engine(user='root',
                                              db='youku',
                                              host='127.0.0.1',
                                              password='root',
                                              minsize=1,
                                              maxsize=10,
                                              autocommit=True)
            if __engine:
                DBcontroller.__engine = __engine
                DBcontroller.connectStatue =True
                print('connect to mysql success!')
            else:
                raise ("connect to mysql error ")
        except:
            print('connect error.', exc_info=True)
View Code

  2.創建Model

class User(Base,Model):
    __tablename__ = 'user'

    id = Column(INTEGER(11), primary_key=True)
    name = Column(String(255))
    password = Column(String(255))
    is_locked = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
    is_vip = Column(INTEGER(11), nullable=False, server_default=text("'0'"))
    user_type = Column(String(255))
    register_time = Column(String(255))

  

 3.創建sqlalchemy表

from sqlalchemy import Column, Integer, String, MetaData,Table
metadata = sa.MetaData()

user = Table('user', MetaData(),
            Column('id', Integer, primary_key=True),
            Column('name', String(50)),
            Column('is_locked', Integer,nullable=False,default=0),
            Column('is_vip', Integer,nullable=False,default=0),
            Column('user_type', String(12)),
            Column('register_time', String(12))
             )
()
View Code

  4.實現查詢/更新/插入功能

class DBcontroller:
    __engine=None
    __isinstance = False
    def __new__(cls, *args, **kwargs):
        if cls.__isinstance:  # 如果被實例化了
            return cls.__isinstance  # 返回實例化對象
        print('connecting to database...')
        asyncio.get_event_loop().run_until_complete(DBcontroller.connect())
        cls.__isinstance = object.__new__(cls)  # 否則實例化
        return cls.__isinstance  # 返回實例化的對象

    @staticmethod
    async def connect():
        try:
            __engine = await create_engine(user='root',
                                              db='youku',
                                              host='127.0.0.1',
                                              password='root',
                                              minsize=1,
                                              maxsize=10,
                                              autocommit=True)
            if __engine:
                DBcontroller.__engine = __engine
                DBcontroller.connectStatue =True
                print('connect to mysql success!')
            else:
                raise ("connect to mysql error ")
        except:
            print('connect error.', exc_info=True)

    def selectTable(self,table):
        async def select():
            dbstb={
                'user':user
            }
            conn = await DBcontroller.__engine.acquire()

            try:
                result = await conn.execute(dbstb[table].select())
                res = await result.fetchall()
                for row in res:
                    print(row)
            except Exception as e :
                print(e)
            finally:
                # DBcontroller.__engine.release(conn)
                pass
        asyncio.get_event_loop().run_until_complete(select())

    def executeTable(self,table,attr):
        async def execute():
            dbstb={
                'user':user
            }
            conn = await DBcontroller.__engine.acquire()

            try:
                await conn.execute(dbstb[table].insert().values(attr))

            except Exception as e:
                if e.args[0]== 1062:
                    print('主鍵已存在')
                elif e.args[0]==1366:
                    if attr['id']=='':
                        print('未設置主鍵')
                    elif attr['is_vip']=='':
                        print('未設置vip類型')
                    elif attr['is_locked']=='':
                        print('未設置上鎖類型')
                    else:print('未知錯誤,請聯系管理員')
                else:
                    print(f'connect failed:{e}')
            finally:
                DBcontroller.__engine.release(conn)
        asyncio.get_event_loop().run_until_complete(execute())

    def updateTable(self,table,id,attr):
        async def update():
            dbstb={
                'user':user
            }
            conn = await DBcontroller.__engine.acquire()

            try:
                await conn.execute(user.update().where(dbstb[table].c.id==id).values(attr))

            except Exception as e:
                if e.args[0]== 1062:
                    print('主鍵已存在')
                elif e.args[0]==1366:
                    if attr['id']=='':
                        print('未設置主鍵')
                    elif attr['is_vip']=='':
                        print('未設置vip類型')
                    elif attr['is_locked']=='':
                        print('未設置上鎖類型')
                    else:print('未知錯誤,請聯系管理員')
                    print(e)
                else:
                    print(f'connect failed:{e}')
            finally:
                DBcontroller.__engine.release(conn)
        asyncio.get_event_loop().run_until_complete(update())

  

結果

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM