測試平台系列(79) 編寫Redis配置功能(下)


大家好~我是米洛

我正在從0到1打造一個開源的接口測試平台, 也在編寫一套與之對應的完整教程,希望大家多多支持。

歡迎關注我的公眾號米洛的測開日記,獲取最新文章教程!

回顧

上一節我們提出了優化Dao邏輯的想法,那今天就試着來兌現之,並運用到Redis配置管理的開發中去。

初步構思list方法

我們在dao/init.py新建類: Mapper,以后所有的dao類都繼承自它。

想想list需要什么,一般需要,字段參數是like還是等於這3個重要的信息。

明白這個以后,我們的偽代碼就好編寫了:

# 1. 獲取session
async with async_session() as session:
  condition = [model.deleted_at == 0]
  # 2. 根據參數里的字段,字段值構造查詢條件
  DatabaseHelper.where(字段,字段值,condition)
  # 3. 查詢出結果,后續與原來的方式一致,就不寫了

可以看到,這邊除了需要上面的3個信息以外,還需要try去寫入日志,也就是說需要我們平時經常創建的log成員變量,還需要model這個類,否則你不知道改的是什么表。

思考

我們平時的參數,都是key-value的形式,一旦value不為空,那么說明我們要根據這個條件來查詢。

而model和log,我們可以通過類的裝飾器,由子類傳遞給父類(這里會用到setattr和getattr)

class Mapper(object):
    log = None
    model = None

    @classmethod
    async def list_data(cls, **kwargs):
        try:
            async with async_session() as session:
                # 構造查詢條件,默認是數據未被刪除
                condition = [getattr(cls.model, "deleted_at") == 0]
                # 遍歷參數,當參數不為None的時候傳遞
                for k, v in kwargs.items():
                    # 判斷是否是like的情況 TODO: 這里沒支持in查詢
                    like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                    # 如果是like模式,則使用Model.字段.like 否則用 Model.字段 等於
                    DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                         condition)
                result = await session.execute(select(cls.model).where(*condition))
                return result.scalars().all()
        except Exception as e:
            # 這邊調用cls本身的log參數,寫入日志+拋出異常
            cls.log.error(f"獲取{cls.model}列表失敗, error: {e}")
            raise Exception(f"獲取數據失敗")

注釋寫的比較詳細,由於現在字段是個變量,所以我們不能用model.字段來取值,所以取而代之的是getattr(model, 字段)。不熟悉的朋友可以去搜索下getattr

這樣,一個粗略的list方法就寫好了,但是這個是不帶分頁的,所以我們還需要補充一個分頁的模式,其實也很簡單。

    @classmethod
    async def list_data_with_pagination(cls, page, size, **kwargs):
        try:
            async with async_session() as session:
                condition = [getattr(cls.model, "deleted_at") == 0]
                for k, v in kwargs.items():
                    like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                    sql = DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                               condition)
                return await DatabaseHelper.pagination(page, size, session, sql)
        except Exception as e:
            cls.log.error(f"獲取{cls.model}列表失敗, error: {e}")
            raise Exception(f"獲取數據失敗")

基本上長的差不多,只是最后返回那里,調用了pagination相關方法。

看看dao裝飾器

這也是為什么要用classmethod而不是staticmethod的原因

我們很壞,把這個裝飾器套到子類上,並把model和log傳給父類。畢竟方法都是在調用父類的方法,父類無法直接拿到子類的數據

用的話,就是把model和log傳入dao參數

這樣就避免了在調用list方法的時候,還需要傳入model和log的尷尬情況。

完善其他方法

list搞定以后,其他的還會遠嗎?但還真的好像還有點問題,因為我們一般會有一些重名判斷,但沒關系,我們可以編寫一個query方法。

    @classmethod
    def query_wrapper(cls, **kwargs):
        condition = [getattr(cls.model, "deleted_at") == 0]
        # 遍歷參數,當參數不為None的時候傳遞
        for k, v in kwargs.items():
            # 判斷是否是like的情況 TODO: 這里沒支持in查詢
            like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
            # 如果是like模式,則使用Model.字段.like 否則用 Model.字段 等於
            DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                 condition)
        return select(cls.model).where(*condition)

    @classmethod
    async def query_record(cls, **kwargs):
        try:
            async with async_session() as session:
                sql = cls.query_wrapper(**kwargs)
                result = await session.execute(sql)
                return result.scalars().first()
        except Exception as e:
            cls.log.error(f"查詢{cls.model}失敗, error: {e}")
            raise Exception(f"查詢數據失敗")

由於查詢方法太過於通用,所以抽成了query_wrapper方法。

  • 正常編寫insert接口
    @classmethod
    async def insert_record(cls, model):
        try:
            async with async_session() as session:
                async with session.begin():
                    session.add(model)
                    await session.flush()
                    session.expunge(model)
                    return model
        except Exception as e:
            cls.log.error(f"添加{cls.model}記錄失敗, error: {e}")
            raise Exception(f"添加記錄失敗")

這邊返回了model,如果不需要也可以不用,但咱還是給返回。

  • insert接口層

和以前一樣,先查,如果沒有再關掉。

但這樣會產生2個session,開->關->開->關

但也解耦了查詢和插入2個操作。

  • 編寫刪除和修改方法
    @classmethod
    async def update_record_by_id(cls, user, model, not_null=False):
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=model.id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("記錄不存在")
                    DatabaseHelper.update_model(original, model, user, not_null)
                    await session.flush()
                    session.expunge(original)
                    return original
        except Exception as e:
            cls.log.error(f"更新{cls.model}記錄失敗, error: {e}")
            raise Exception(f"更新記錄失敗")

    @classmethod
    async def delete_record_by_id(cls, user, id):
        """
        邏輯刪除
        :param user:
        :param id:
        :return:
        """
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("記錄不存在")
                    DatabaseHelper.delete_model(original, user)
        except Exception as e:
            cls.log.error(f"刪除{cls.model}記錄失敗, error: {e}")
            raise Exception(f"刪除記錄失敗")

    @classmethod
    async def delete_by_id(cls, id):
        """
        物理刪除
        :param id:
        :return:
        """
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("記錄不存在")
                    session.delete(original)
        except Exception as e:
            cls.log.error(f"邏輯刪除{cls.model}記錄失敗, error: {e}")
            raise Exception(f"刪除記錄失敗")

刪除這邊支持了物理刪除和邏輯刪除,當然我們一般是用軟刪除

完善其他接口

可以看到刪除和修改和以前是差不多的(也是內部進行數據是否存在判斷),這些步驟做完。redis的管理工作就可以順利進行了,接着我們需要為之編寫頁面咯!

由於是很基礎的表格頁面,所以我們不贅述。下一節我們會利用配置的redis連接編寫redisManager,管理我們的連接數據,為之后在線執行redis以及將它作為前置條件打下基礎。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM