peewee:精致小巧的orm,sqlalchemy的一個很好的替代品


楔子

下面我們來了解一下python中的orm:peewee和peewee_async,peewee是python的一個比較精簡的orm,源碼是一個大概七千多行的py文件。是的,peewee只有一個py文件。至於peewee_async,從名字上也能看出這是基於peewee的一個異步orm。所以介紹peewee_async之前我們需要先介紹一下peewee

下面來安裝peewee_async,直接pip install peewee_async即可,會自動安裝peewee。

peewee

我們來看看如何使用peewee,peewee是一個比較精簡的orm,目前只能適配sqlite、MySQL、PostgreSQL,至於Oracle和SQLserver則不支持。

我們這里以PostgreSQL數據庫為例,當然orm並不具備直接和數據庫通信的功能,它需要依賴於底層驅動,python連接PostgreSQL使用的模塊是psycopg2,直接pip install psycopg2_binary即可。

如果你用的MySQL,那么需要pip install pymysql

定義Model並映射成表

下面我們來看看如何使用peewee定義一個Model,並映射成數據中的表。

import peewee

# 第一個參數是我們要連接到哪個數據庫,剩余的參數就無需多說了
db = peewee.PostgresqlDatabase("postgres",
                               host="localhost",
                               port=5432,
                               user="postgres",
                               password="zgghyys123")

"""
如果是sqlite:那么使用peewee.SqliteDatabase
如果是mysql: 那么使用peewee.MySQLDatabase
"""

# 參數我們來定義Model, 首先要繼承自peewee.Model
class Girl(peewee.Model):

    pk = peewee.IntegerField(primary_key=True, verbose_name="主鍵")
    name = peewee.CharField(max_length=200, verbose_name="姓名")
    where = peewee.CharField(max_length=200, verbose_name="住址")

    class Meta:
        # 綁定上面的數據庫實例
        database = db

        # 設置表名
        table_name = "girl"

        # 設置schema, 當然對於PostgreSQL來說,不設置默認為public
        schema = "public"


if __name__ == '__main__':
    # 調用db.create_tables即可將上面的模型映射成表
    db.create_tables([Girl])

    # 除此之外,還可以調用db.is_closed()查看連接是否關閉
    print(db.is_closed())  # False

    # 也可以手動關閉連接
    db.close()
    print(db.is_closed())  # True

執行上面代碼之后會發現數據庫中多出一張名為girl的表,使用起來還是比較簡單的。

peewee中的Field

我們看到數據庫表中的字段對應peewee中的Field,那么在peewee中都有哪些Field呢?這里介紹幾個常用的,其實很多都是類似的,比如:IntegerField、BigIntegerField、SmallIntegerField這幾個明顯就是一類的。

IntegerField

針對整型字段,里面常用參數如下:

  • null=False:是否允許為空
  • index=False:是否為索引
  • unique=False:是否要求唯一
  • column_name=None:映射表中的字段名,如果為None,那么采用變量名當做字段名
  • default=None:默認值
  • primary_key=False:是否為主鍵
  • constraints=None:約束
  • verbose_name:字段注釋

這個Integer本身是沒有__init__函數的,它是繼承自Field

AutoField

如果是AutoField,那么對應字段必須為主鍵,並且自動會自增。我們上面使用IntegerField設置的主鍵並不是自增的。

peewee.AutoField()  # 創建的默認就是自增主鍵了

FloatField

和IntegerField的參數一致。

DecimalField

和IntegerField的參數一致,並且還可以指定精度,也就是數據庫里面的numberic

CharField

和IntegerField的參數一致,並且還多了一個max_length,也就是最大長度。

TextField

和CharField的參數一致,沒有長度限制。

UUIDField

繼承自Field,和IntegerField參數一樣。

DateTimeField、DateField、TimeField

對應:年月日時分秒、年月日、時分秒,也繼承自Field

TimestampField

時間戳,Unix到指定之間經過的秒數,繼承自Field

IPField

針對IP

BooleanField

針對布爾類型

ForeignKeyField

針對外鍵

常用的Field如上,至於peewee提供的其他Field,可以去源碼中查看。

主鍵和約束

關於主鍵和約束,我們知道可以在Field中設置,但是聯合主鍵呢?

class Girl(peewee.Model):

    pk = peewee.IntegerField(primary_key=True, verbose_name="主鍵")
    name = peewee.CharField(primary_key=True, max_length=200, verbose_name="姓名")
    where = peewee.CharField(max_length=200, verbose_name="住址")

如果給多個字段設置主鍵,那么它們不會變成聯合主鍵,而是會報錯:ValueError: over-determined primary key Girl.

解決辦法如下:

class Girl(peewee.Model):

    pk = peewee.IntegerField(verbose_name="主鍵")
    name = peewee.CharField(primary_key=True, max_length=200, verbose_name="姓名")
    where = peewee.CharField(max_length=200, verbose_name="住址")、
    
    class Meta:
        # 通過peewee.CompositeKey進行設置
        # 里面以字符串的形式直接傳入設置變量名即可
        # 注意:是變量名,並且以字符串的形式
        primary_key = peewee.CompositeKey("pk", "name")
        
        # 除此之外還可以設置約束, 當然約束也可以在Field中設置
        constraints = [peewee.SQL("CHECK(length(name) > 3)"),
                       peewee.SQL("CHECK(pk > 3)")]

此時pk和name就是說聯合主鍵了,並且要求name的長度大於3個字符,pk的值大於3。

關於自增還可以這么設置,比如關閉自增:User._meta.auto_increment = False

另外,我們這里創建表的時候定義了主鍵,但如果我們沒有定義主鍵的話,那么peewee會自動幫我們加上一個名為id的自增主鍵,並且我們還可以通過Girl.id進行獲取。但如果我們定義了主鍵,那么peewee就不會再幫我們自動加主鍵了。

表的增刪改查

增加記錄

下面我們來看看如何使用peewee給表增加記錄

import peewee

db = peewee.PostgresqlDatabase("postgres",
                               host="localhost",
                               port=5432,
                               user="postgres",
                               password="zgghyys123")


# 參數我們來定義Model, 首先要繼承自peewee.Model
class Girl(peewee.Model):

    pk = peewee.AutoField(verbose_name="自增主鍵")
    name = peewee.CharField(max_length=200, verbose_name="姓名")
    where = peewee.CharField(max_length=200, verbose_name="住址")

    class Meta:
        database = db
        table_name = "girl"


if __name__ == '__main__':
    db.create_tables([Girl])
    # 增加記錄有以下幾種方式
    g1 = Girl()
    g1.name = "古明地覺"
    g1.where = "東方地靈殿"

    # 或者
    g2 = Girl(name="博麗靈夢", where="博麗神社")

    # 然后一定要save,否則記錄不會進入到表中
    g1.save()
    g2.save()

此時查看數據庫,會發現數據庫的表girl中多了兩條記錄。

或者這樣插入記錄也是可以的

# 直接調用Girl.create即可
Girl.create(name="芙蘭朵露", where="紅魔館")
Girl.create(name="蕾米莉亞", where="紅魔館")

會發現數據庫中又多了兩條記錄

但問題是,我們這里的記錄是一條一條插入的,效率上不夠好,可不可以多條記錄一塊插入到數據庫呢?

# 我們可以調用insert和insert_many來插入記錄
# 這兩者使用上沒有什么區別,都可以接收一個字典插入一條記錄
# 接收多個字典組成的列表,插入多條記錄。
# 但是調用之后一定要再調用一下execute,才會進入到數據庫中

Girl.insert([{"name": "帕秋莉·諾蕾姬", "where": "紅魔館"},
             {"name": "西行寺幽幽子", "where": "白玉樓"}]).execute()
Girl.insert({"name": "八意永琳", "where": "輝夜永遠亭"}).execute()
Girl.insert_many({"name": "霧雨魔理沙", "where": "魔法森林"}).execute()
Girl.insert_many([{"name": "紅美鈴", "where": "紅魔館"}]).execute()

此外,我們還可以設置事務。

with db.transaction():
    Girl.create(pk=10, name="xx", where="xx")
    Girl.create(pk=10, name="xx", where="xx")

# 或者    
with db.atomic():
    Girl.create(pk=10, name="xx", where="xx")
    Girl.create(pk=10, name="xx", where="xx")

顯然pk重復了,因此無論哪種方式,兩條記錄最終都會插入失敗。

當然如果失敗了,我們最好要記得回滾,在sqlalchemy中你應該遇到過這么個錯誤。就是使用session操作數據庫的時候,如果失敗不會滾的話,那么這個錯誤會一直持續到你連接斷開為止。因此如果操作失敗,一定要記得回滾。

不過當我們使用with db.atomic或者with db.transaction的時候,失敗了peewee會自動幫我們進行回滾。這一點不需要我們手動做了,當然如果是我們在不使用atomic、transaction,並且需要多次操作數據庫的時候,失敗了要記得回滾。

try:
    Girl.insert([{"pk": 10, "name": "xx", "where": "xxx"},
                 {"pk": 10, "name": "xx", "where": "xxx"}]).execute()
except Exception:
    db.rollback()

orm的insert插入多條記錄的時候,整體是具備事務性質的,最終兩條記錄都插入失敗。但是,插入失敗了,一定要回滾。不過對於insert來說,也建議使用with db.atomic()或者with db.transaction()的方式。

peewee插入記錄的幾種方式我們就介紹到這里,支持的方式還是不少的。

刪除記錄

下面來看看刪除記錄,刪除記錄非常簡單。

# Girl.delete().execute()相當於刪除全部記錄
# 如果刪除指定條件的記錄的話,那么可以通過where指定
# where中怎么進行篩選,我們會在 "查詢記錄" 的時候詳細介紹
# 查詢、更新、刪除,它們的where都是一樣的
print(
    Girl.delete().where(Girl.name.in_(["紅美鈴", "八意永琳"])).execute()
)  # 2

# 上面返回2,表示成功刪除兩條記錄
# 我們說不加where表示全部刪除
print(Girl.delete().execute())  # 7

此時記錄就全沒了,我們重新創建一下吧,不然下面沒有數據演示了。

修改記錄

修改記錄也沒有什么難度,我們來看一下。

# update里面直接通過關鍵字參數的方式修改
print(Girl.update(where="東方紅魔館").where(Girl.where == "紅魔館").execute())  # 4
print(Girl.update(where="紅魔館").where(Girl.where == "東方紅魔館").execute())  # 4

# 返回4表示成功修改4條

查詢記錄

重頭戲來了,也不知道誰的頭這么重,我們用的最多的應該就是查詢了,下面來看看peewee都支持我們怎么查詢。而查詢的關鍵就在where上面,當然我們表里面也有個字段叫where,兩個沒啥關系,不要搞混了。

一種簡單的方式,調用Model的get方法,會返回滿足條件的第一條記錄。

res = Girl.get(Girl.where == "紅魔館")
# 返回的是一個Model對象,這個3是什么?
# 直接打印的話,顯示的是記錄的主鍵的值
print(res, type(res))  # 3 <Model: Girl>
# 獲取其它屬性
print(res.name, res.where)  # 芙蘭朵露 紅魔館

如果是根據主鍵獲取的話,還有如下兩種簡單的形式:

res = Girl.get_by_id(3)
print(res, type(res))  # 3 <Model: Girl>
print(res.name, res.where)  # 芙蘭朵露 紅魔館

res = Girl[3]
print(res, type(res))
print(res.name, res.where)  # 芙蘭朵露 紅魔館

# 注意:通過get_by_id獲取的話,如果記錄不存在會報錯

我們看到如果是根據主鍵獲取的話,那么可以直接通過get_by_id,或者直接通過字典的方式。至於為什么可以通過字典的方式,想都不用想,肯定是內部實現了__getitem__方法。

查看源碼的話,會發現peewee.Model繼承的父類中實現了__getitem__,底層還是調用了get_by_id

上面只是獲取單條記錄,如果是多條的話使用select。

# 如果select里面不指定字段,那么是獲取全部字段
res = Girl.select(Girl.name, Girl.where).where(Girl.pk > 5)
print(res)  # SELECT "t1"."name", "t1"."where" FROM "girl" AS "t1" WHERE ("t1"."pk" > 5)
print(type(res))  # <class 'peewee.ModelSelect'>

# 上面的res返回的是一個<class 'peewee.ModelSelect'>,上面的語句不會立即執行
# 而是一個懶執行,類似於spark里面的transform,或者python里面的迭代器
# 像get,get_by_id等方法,使用之后會立即組成sql語句然后去查詢
# 我們可以調用res.sql查看SQL語句
print(res.sql())  # ('SELECT "t1"."name", "t1"."where" FROM "girl" AS "t1" WHERE ("t1"."pk" > %s)', [5])

# 當我們調用for循環迭代的時候,才會執行,如何實現?實際上是底層實現了迭代協議
for _ in res:
    print(_, type(_), _.name, _.where)
    """
    None <Model: Girl> 西行寺幽幽子 白玉樓
    None <Model: Girl> 八意永琳 輝夜永遠亭
    None <Model: Girl> 霧雨魔理沙 魔法森林
    None <Model: Girl> 紅美鈴 紅魔館
    """
    # 返回的仍然是一個Model對象,如果打印的話默認打印的還是主鍵的值
    # 但是我們這里沒有選擇主鍵,因此打印的是None

    # 如果我們調用get的話,也可以返回第一條滿足條件的記錄
    first = res.get()
    # 這里打印None不要慌,默認顯示的主鍵的值,但是沒有選擇主鍵所以為None
    print(first)  # None
    print(first.name, first.where)  # 西行寺幽幽子 白玉樓

除了使用for循環,還可以這么做

res = Girl.select(Girl.name, Girl.where).where(Girl.pk > 5)
# 可以調用list將其全部打印出來
print(list(res))  # [<Girl: None>, <Girl: None>, <Girl: None>, <Girl: None>]

# 使用Girl.select().where()這種方式獲取的結果永遠可以當成一個列表來使用
# 因此可以通過索引獲取單個記錄
some = res[3]
print(some)  # None
print(some.name, some.where)  # 紅美鈴 紅魔館

還沒完,我們還可以得到一個字典

# 調用dicts之后得到的依舊是<class 'peewee.ModelSelect'>對象
# 打印的時候會打印一條SQL語句
res = Girl.select(Girl.name, Girl.where).where(Girl.pk > 5).dicts()
print(res)  # SELECT "t1"."name", "t1"."where" FROM "girl" AS "t1" WHERE ("t1"."pk" > 5)
print(type(res))  # <class 'peewee.ModelSelect'>

# 但是當我們調用list、或者for循環的時候,打印就是一個字典了
print(list(res))
"""
[{'name': '西行寺幽幽子', 'where': '白玉樓'}, 
{'name': '八意永琳', 'where': '輝夜永遠亭'}, 
{'name': '霧雨魔理沙', 'where': '魔法森林'}, 
{'name': '紅美鈴', 'where': '紅魔館'}]
"""
# 通過索引或者切片獲取
print(res[1: 3])  # [{'name': '八意永琳', 'where': '輝夜永遠亭'}, {'name': '霧雨魔理沙', 'where': '魔法森林'}]

或者得到一個tuple對象、或者namedtuple對象

res = Girl.select(Girl.name, Girl.where).where(Girl.pk > 5).tuples()
print(list(res))
"""
    [('西行寺幽幽子', '白玉樓'), ('八意永琳', '輝夜永遠亭'), 
    ('霧雨魔理沙', '魔法森林'), ('紅美鈴', '紅魔館')]
    """
print(res[1: 3])  # [('八意永琳', '輝夜永遠亭'), ('霧雨魔理沙', '魔法森林')]

# 或者namedtuple
res = Girl.select(Girl.name, Girl.where).where(Girl.pk > 5).namedtuples()
print(list(res))
"""
[Row(name='西行寺幽幽子', where='白玉樓'), 
Row(name='八意永琳', where='輝夜永遠亭'), 
Row(name='霧雨魔理沙', where='魔法森林'), 
Row(name='紅美鈴', where='紅魔館')]
"""

支持的結果種類還是蠻多的,下面我們來看看peewee都支持哪些where操作

  • alias:起別名

    # 起別名,當然這是在select里面的
    res = Girl.select(Girl.where.alias("WHERE")).where(Girl.where == "紅魔館")
    # 起完別名就只能用別名獲取了
    print(res[0].where, res[0].WHERE)  # None 紅魔館
    
  • cast:改變類型

    # 改變類型, 這也是在select里面, 但是類型要寫PostgreSQL的類型
    res = Girl.select(Girl.pk.cast("text"))
    print(res[0].pk, res[0].pk == "1")  # 1 True
    
  • is_null:查詢為NULL的

    # 查找name為null的
    res = Girl.select().where(Girl.name.is_null())
    print(len(list(res)))  # 0
    
    # 查找name不為null的
    res = Girl.select().where(Girl.name.is_null(False))
    print(len(list(res)))  # 9
    
  • contains:查詢包含某個字符串的

    # 查找name包含"莉"的記錄, 相當於 name like '%莉%'
    res = Girl.select().where(Girl.name.contains("莉"))
    print([_.name for _ in res])  # ['蕾米莉亞', '帕秋莉·諾蕾姬']
    
  • startswith:查詢以某個字符串開始的

    # 查找where以"紅"開頭的, 相當於 where like '紅%'
    res = Girl.select().where(Girl.where.startswith("紅"))
    print([_.where for _ in res])  # ['紅魔館', '紅魔館', '紅魔館', '紅魔館']
    
  • endswith:查詢以某個字符串結尾的

    # 查找where以"樓"結尾的, 相當於 where like '%樓'
    res = Girl.select().where(Girl.where.endswith("樓"))
    print([(_.name, _.where) for _ in res])  # [('西行寺幽幽子', '白玉樓')]
    
  • between:查詢位於兩個值之間的

    # 查找pk在3到6之間的
    res = Girl.select().where(Girl.pk.between(3, 6))
    print([(_.pk, _.name) for _ in res])
    """
    [(3, '芙蘭朵露'), (4, '蕾米莉亞'), (5, '帕秋莉·諾蕾姬'), (6, '西行寺幽幽子')]
    """
    
    
    # 以上等價於Girl.pk[slice(3, 6)],注意傳入的切片是包含結尾的
    # 當然這種方式底層也是調用的between
    res = Girl.select().where(Girl.pk[slice(3, 6)])
    print([(_.pk, _.name) for _ in res])
    """
    [(3, '芙蘭朵露'), (4, '蕾米莉亞'), (5, '帕秋莉·諾蕾姬'), (6, '西行寺幽幽子')]
    """
    
    
    # 既然可以傳入一個切片,也可以傳入普通的整型
    res = Girl.select().where(Girl.pk[3])
    # 等價於Girl.select().where(Girl.pk == 3)
    print([(_.pk, _.name) for _ in res])  # [(3, '芙蘭朵露')]
    
  • in_:查找位於指定的多個記錄之中的,反之是not_in

    res = Girl.select().where(Girl.pk.in_([1, 3, 5]))
    print([(_.pk, _.name) for _ in res])  # [(1, '古明地覺'), (3, '芙蘭朵露'), (5, '帕秋莉·諾蕾姬')]
    
    
    # 或者還可以這么寫
    res = Girl.select().where(Girl.pk << [1, 3, 5])
    print([(_.pk, _.name) for _ in res])  # [(1, '古明地覺'), (3, '芙蘭朵露'), (5, '帕秋莉·諾蕾姬')]
    
  • regexp、iregexp:正則,前者大小寫敏感,后者大小寫不敏感

    # 查找name只有四個字符的,這里的正則要遵循對應數據的正則語法
    res = Girl.select().where(Girl.name.regexp(r"^.{4}$"))
    print([(_.pk, _.name) for _ in res]) 
    """
    [(1, '古明地覺'), (2, '博麗靈夢'), (3, '芙蘭朵露'), (4, '蕾米莉亞'), (7, '八意永琳')]
    """
    

上面我們介紹了一些常見的where操作,當然也包含select。當然PostgreSQL里面還有concat、substring等等,這些使用peewee該如何實現呢?在peewee中有一個fn,通過fn來調用這些函數。

# 通過fn調用的函數要大寫
res = Girl.select(peewee.fn.CONCAT(Girl.name, "xx")).where(Girl.pk > 5)
print([_.name for _ in res])  # [None, None, None, None]

# 但是我們看到的全是None,這是什么鬼?
# 因為我們使用CONCAT之后,這個字段名就不叫name了,而是叫concat
print([_.concat for _ in res])  # ['西行寺幽幽子xx', '八意永琳xx', '霧雨魔理沙xx', '紅美鈴xx']

# 因此這種方式不是很友好,因此解決辦法之一就是起一個別名
res = Girl.select(peewee.fn.CONCAT(Girl.name, "xx").alias("name")).where(Girl.pk > 5)
print([_.name for _ in res])  # ['西行寺幽幽子xx', '八意永琳xx', '霧雨魔理沙xx', '紅美鈴xx']

# 另一個辦法就是通過字典或者元組的方式
res = Girl.select(peewee.fn.CONCAT(Girl.name, "xx")).where(Girl.pk > 5).dicts()
print(list(res))
"""
[{'concat': '西行寺幽幽子xx'}, 
{'concat': '八意永琳xx'}, 
{'concat': '霧雨魔理沙xx'}, 
{'concat': '紅美鈴xx'}]
"""

# 再比如substr
res = Girl.select(peewee.fn.SUBSTR(Girl.name, 1, 2)).where(Girl.pk > 5).tuples()
print(list(res))  # [('西行',), ('八意',), ('霧雨',), ('紅美',)]

不僅是這些函數,包括數學相關的函數,一些常用的聚合函數都是通過fn來調用,比如保留兩位小數:peewee.fn.ROUND、求次數fn.COUNT等等。

多條件篩選

res = Girl.select().where((Girl.pk > 5) & (Girl.where == "紅魔館")).tuples()
print(list(res))  # [(9, '紅美鈴', '紅魔館')]
# 只有一個滿足條件的,&代表and、|代表or、~代表not
# 記得每個條件之間使用小括號括起來,因為優先級的問題
# 我們上面的例子如果不使用小括號括起來的話,那么5會先和Girl.where進行&運算,這顯然不是我們想要的結果

returning

returning語句是專門針對insert、update、delete的,表示在完成相應操作的時候返回一個值,我們看一下。

res = Girl.update(name="古明地戀").where(Girl.where == "東方地靈殿").returning(Girl.name).execute()
# 更新之后返回更新的name
print([_.name for _ in res])

# 返回多個也可以
res = Girl.update(name="古明地戀").where(Girl.where == "東方地靈殿").returning(Girl.name, Girl.where).execute()
print([(_.name, _.where) for _ in res])  # [('古明地戀', '東方地靈殿')]


# 刪除數據也是可以的
res = Girl.delete().where(Girl.where == "東方地靈殿").returning(Girl.name, Girl.pk).execute()
print([(_.name, _.pk) for _ in res])  # [('古明地戀', 1), ('古明地戀', 3)]


# 當然插入也是如此
res = Girl.insert([{"name": "帕秋莉·諾蕾姬", "where": "紅魔館"},
                   {"name": "西行寺幽幽子", "where": "白玉樓"}]).returning(Girl.name, Girl.where).execute()

print([(_.name, _.where) for _ in res])  # [('帕秋莉·諾蕾姬', '紅魔館'), ('西行寺幽幽子', '白玉樓')]

# 插入單條數據也是如此,同樣需要使用循環
res = Girl.insert({"name": "帕秋莉·諾蕾姬", "where": "紅魔館"}
                  ).returning(Girl.name).execute()
print([_.name for _ in res])  # ['帕秋莉·諾蕾姬']


# 如果不指定returning,那么對於insert來說返回的是主鍵
res = Girl.insert({"name": "帕秋莉·諾蕾姬", "where": "紅魔館"}
                  ).execute()
# 直接打印即可
print(res)  # 22


# 如果是插入多條數據
res = Girl.insert([{"name": "帕秋莉·諾蕾姬", "where": "紅魔館"},
                  {"name": "帕秋莉·諾蕾姬", "where": "紅魔館"}]).execute()

print(list(res))  # [(27,), (28,)]

distinct、nullif、coalesce

下面來看看上面這三個函數怎么實現,不過既然是函數,就可以通過fn來調用。

# 通過fn調用的函數要大寫
res = Girl.select(peewee.fn.DISTINCT(Girl.where)).tuples()
print(list(res))
"""
[('白玉樓',), ('魔法森林',), ('博麗神社',), 
('東方地靈殿',), ('紅魔館',), ('輝夜永遠亭',)]
"""
# 我們看到實現了去重的效果
# 除此之外我們還可以這么做
res = Girl.select(Girl.where).distinct().tuples()
print(list(res))
"""
[('白玉樓',), ('魔法森林',), ('博麗神社',), 
('東方地靈殿',), ('紅魔館',), ('輝夜永遠亭',)]
"""
# 得到的結果是一樣的


# nullif的作用就是,如果兩個值一樣,那么返回null
# 不一樣返回第一個值,比如為了防止除零錯誤,就可以用 a / nullif(b, 0)
# 這樣當b為0的時候就不會報錯了,而是返回null
res = Girl.select(peewee.fn.NULLIF(Girl.where, "紅魔館"), Girl.where).tuples()
print(list(res))
"""
[('東方地靈殿', '東方地靈殿'), ('博麗神社', '博麗神社'), 
(None, '紅魔館'), (None, '紅魔館'), (None, '紅魔館'), 
('白玉樓', '白玉樓'), ('輝夜永遠亭', '輝夜永遠亭'), 
('魔法森林', '魔法森林'), (None, '紅魔館')]
"""


# coalesce的作用是,里面傳入多個值,返回一個不為空的值
# 如果都為空,那么就只能是空了
res = Girl.select(peewee.fn.coalesce(None, "紅魔館", None)).tuples()
print(list(res))
"""
[('紅魔館',), ('紅魔館',), ('紅魔館',), ('紅魔館',),
 ('紅魔館',), ('紅魔館',), ('紅魔館',), ('紅魔館',), ('紅魔館',)]
"""

group by和having

在做聚合的時候需要使用到group by和having,這兩個就一起說吧。

res = Girl.select(peewee.fn.COUNT(Girl.where), Girl.where)\
        .group_by(Girl.where).tuples()  # 如果是根據多個字段group by,那么就直接寫多個字段即可
print(list(res))
"""
[(1, '白玉樓'), (1, '魔法森林'),
 (1, '博麗神社'), (1, '東方地靈殿'),
 (4, '紅魔館'), (1, '輝夜永遠亭')]
"""

# 加上having的話
res = Girl.select(peewee.fn.COUNT(Girl.where), Girl.where) \
        .group_by(Girl.where).having(peewee.fn.COUNT(Girl.where) > 1).tuples()
print(list(res))  # [(4, '紅魔館')]
# 這里選擇Girl.where出現次數大於1的
# 如果having里面需要多個條件,那么和多條件篩選一樣,使用&、|、~

order by

res = Girl.select(peewee.fn.COUNT(Girl.where), Girl.where)\
        .group_by(Girl.where).order_by(peewee.fn.COUNT(Girl.where)).tuples()
print(list(res))
"""
[(1, '白玉樓'), (1, '魔法森林'), (1, '博麗神社'), 
(1, '東方地靈殿'), (1, '輝夜永遠亭'), (4, '紅魔館')]
"""

# 默認是升序的,如果降序呢?
res = Girl.select(peewee.fn.COUNT(Girl.where), Girl.where) \
        .group_by(Girl.where).order_by(peewee.fn.COUNT(Girl.where).desc()).tuples()
print(list(res))
"""
[(4, '紅魔館'), (1, '白玉樓'), (1, '魔法森林'), 
(1, '博麗神社'), (1, '東方地靈殿'), (1, '輝夜永遠亭')]
"""
# 要是按照多字段排序,那么直接寫上多個字段即可。
# 其中peewee.fn.COUNT(Girl.where).desc()也可以寫成 -peewee.fn.COUNT(Girl.where)
# 前面加上+號表示升序,-號表示降序

limit和offset

res = Girl.select().limit(2).offset(1).tuples()
print(list(res))  # [(2, '博麗靈夢', '博麗神社'), (3, '芙蘭朵露', '紅魔館')]

# 或者這樣寫也可以,但是按照SQL來說,上面的寫法更習慣一些
res = Girl.select().offset(1).limit(2).tuples()
print(list(res))  # [(2, '博麗靈夢', '博麗神社'), (3, '芙蘭朵露', '紅魔館')]

res = Girl.select().limit(2).offset(1).tuples()
print(list(res))  # [(2, '博麗靈夢', '博麗神社'), (3, '芙蘭朵露', '紅魔館')]

# 或者我們還可以通過paginate來實現
# paginate(a, b), 表示將數據分頁,每一頁顯示b條數據,然后獲取第a頁的數據
# 這里表示每一頁顯示3條數據,然后返回第二頁的數據。實際上這個paginate內部還是調用了limit和offset
res = Girl.select().paginate(2, 3).tuples()
print(list(res))  # [(4, '蕾米莉亞', '紅魔館'), (5, '帕秋莉·諾蕾姬', '紅魔館'), (6, '西行寺幽幽子', '白玉樓')]

所以我們通過peewee執行SQL時候,順序如下:

Girls.select().where().group_by().having().order_by().limit().offset()

實現count(*)

# 直接返回一個int
print(Girl.select().count())  # 9

原生SQL

有些時候,我們是希望執行一些原生SQL的,我舉個例子:比如我們想要查找某個字符、比如"幽"在字段name中出現的位置,在PostgreSQL中可以這么寫:position('幽' in name),那如果在peewee里面要怎么做呢?難道是peewee.fn.POSITION('幽' in Girl.name) ?這樣顯然是不行的,因此這個時候我們就需要執行一些原生的SQL了。

res = Girl.select(peewee.SQL("position('幽' in name), name")).tuples()
print(list(res))
"""
[(0, '古明地覺'), (0, '博麗靈夢'), (0, '芙蘭朵露'), 
(0, '蕾米莉亞'), (0, '帕秋莉·諾蕾姬'), (4, '西行寺幽幽子'), 
(0, '八意永琳'), (0, '霧雨魔理沙'), (0, '紅美鈴')]
"""
# 為0的話表示name中不存在'幽'這個字,顯然我們執行成功了的
# 我們看到peewee.SQL的作用就是將字符串里面的內容當成普通SQL來執行

# 不僅如此,我們還可以混合使用
res = Girl.select(peewee.SQL("position('幽' in name)"), Girl.name).tuples()
print(list(res))
"""
[(0, '古明地覺'), (0, '博麗靈夢'), (0, '芙蘭朵露'), 
(0, '蕾米莉亞'), (0, '帕秋莉·諾蕾姬'), (4, '西行寺幽幽子'), 
(0, '八意永琳'), (0, '霧雨魔理沙'), (0, '紅美鈴')]
"""

# peewee.SQL不僅可以在在select里面,還可以在其他的地方
from peewee import fn, SQL
# where是SQL的關鍵字,所以需要使用雙引號括起來, 而group by語句中可以使用給字段起的別名
res = Girl.select(fn.COUNT(SQL('"where"')), Girl.where.alias("哈哈")).group_by(SQL("哈哈")).tuples()
print(res)  # SELECT COUNT("where"), "t1"."where" AS "哈哈" FROM "girl" AS "t1" GROUP BY 哈哈
print(list(res))  
"""
[(1, '白玉樓'), (1, '魔法森林'), (1, '博麗神社'), 
(1, '東方地靈殿'), (4, '紅魔館'), (1, '輝夜永遠亭')]
"""

總的來說,peewee.SQL的作用就是將里面的內容原封不動的交給數據庫來執行。

rollup、cube、grouping sets多維度統計

先來看一下數據集。

select * from sales_data;
/*
pk	 saledate product channel amount
1	2019-01-01	桔子	淘寶	1864
2	2019-01-01	桔子	京東	1329
3	2019-01-01	桔子	店面	1736
4	2019-01-01	香蕉	淘寶	1573
5	2019-01-01	香蕉	京東	1364
6	2019-01-01	香蕉	店面	1178
7	2019-01-01	蘋果	淘寶	511
8	2019-01-01	蘋果	京東	568
9	2019-01-01	蘋果	店面	847
10	2019-01-02	桔子	淘寶	1923
11	2019-01-02	桔子	京東	775
12	2019-01-02	桔子	店面	599
13	2019-01-02	香蕉	淘寶	1612
14	2019-01-02	香蕉	京東	1057
15	2019-01-02	香蕉	店面	1580
16	2019-01-02	蘋果	淘寶	1345
17	2019-01-02	蘋果	京東	564
18	2019-01-02	蘋果	店面	1953
19	2019-01-03	桔子	淘寶	729
20	2019-01-03	桔子	京東	1758
21	2019-01-03	桔子	店面	918
22	2019-01-03	香蕉	淘寶	1879
23	2019-01-03	香蕉	京東	1142
24	2019-01-03	香蕉	店面	731
25	2019-01-03	蘋果	淘寶	1329
26	2019-01-03	蘋果	京東	1315
27	2019-01-03	蘋果	店面	1956
28	2019-01-04	桔子	淘寶	547
29	2019-01-04	桔子	京東	1462
30	2019-01-04	桔子	店面	1418
31	2019-01-04	香蕉	淘寶	1205
32	2019-01-04	香蕉	京東	1326
33	2019-01-04	香蕉	店面	746
34	2019-01-04	蘋果	淘寶	940
35	2019-01-04	蘋果	京東	898
36	2019-01-04	蘋果	店面	1610
*/

其中pk表示自增主鍵,saledate表示日期,product表示商品,channel表示銷售渠道,amount表示銷售金額。

關於rollup和cube、grouping sets的具體含義可以網上搜索,我們直接演示。

import peewee

db = peewee.PostgresqlDatabase("postgres", host="localhost", password="zgghyys123", user="postgres", port=5432)


class SalesData(peewee.Model):
    pk = peewee.AutoField()
    saledate = peewee.DateField()
    product = peewee.CharField()
    channel = peewee.CharField()
    amount = peewee.IntegerField()

    class Meta:
        database = db
        table_name = "sales_data"
        
        
from peewee import SQL, fn
from pprint import pprint
res = SalesData.select(SQL("product, channel, sum(amount)")).group_by(fn.ROLLUP(SQL("product, channel"))).tuples()
pprint(list(res))
"""
[('桔子', '店面', 4671),
 ('桔子', '京東', 5324),
 ('桔子', '淘寶', 5063),
 ('桔子', None, 15058),
 ('蘋果', '店面', 6366),
 ('蘋果', '京東', 3345),
 ('蘋果', '淘寶', 4125),
 ('蘋果', None, 13836),
 ('香蕉', '店面', 4235),
 ('香蕉', '京東', 4889),
 ('香蕉', '淘寶', 6269),
 ('香蕉', None, 15393),
 (None, None, 44287)]
"""

group by product, channel這是普通的group by語句,但如果是group by rollup(product, channel),那么除了會按照product、channel匯總之外,還會單獨按照product匯總和整體匯總,按照product匯總的時候channel就會空了,整體匯總的時候product和channel都為空。

group by cube(product, channel),如果是cube的話,那么還是會按照product、channel匯總,但同時還會單獨按照product匯總、單獨按照channel匯總、整體匯總。我們看到cube相當於比rollup多了一個按照channel匯總

from peewee import SQL, fn
from pprint import pprint
res = SalesData.select(SQL("product, channel, sum(amount)")).group_by(fn.CUBE(SQL("product, channel"))).tuples()
pprint(list(res))
"""
[('桔子', '店面', 4671),
 ('桔子', '京東', 5324),
 ('桔子', '淘寶', 5063),
 ('桔子', None, 15058),
 ('蘋果', '店面', 6366),
 ('蘋果', '京東', 3345),
 ('蘋果', '淘寶', 4125),
 ('蘋果', None, 13836),
 ('香蕉', '店面', 4235),
 ('香蕉', '京東', 4889),
 ('香蕉', '淘寶', 6269),
 ('香蕉', None, 15393),
 (None, None, 44287),
 (None, '店面', 15272),
 (None, '京東', 13558),
 (None, '淘寶', 15457)]
 """

rollup和cube都可以通過grouping sets來實現,這么說吧:

group by rollup(product, channel) 等價於 group by grouping sets( (product, channel), (product), ()  ),首先最外層的括號不用說,里面的(product, channel)表示按照product和channel進行匯總,(product)表示按照product單獨進行匯總,()表示整體進行匯總。

至於cube估計有人也想到了,group by cube(product, channel) 等價於 group by grouping sets( (product, channel), (product), (channel), () ),直接多一個(channel)即可。

group by product, channel,顯然就是group by grouping sets( (product, channel) ),因此grouping sets可以更加方便我們自定制。

from peewee import SQL, fn
from pprint import pprint

res = (
    SalesData.
    select(SQL("coalesce(product, '所有商品'), coalesce(channel, '所有渠道'), sum(amount)")).
    group_by(
        getattr(fn, "GROUPING SETS")(SQL("(product, channel), (product), (channel), ()"))
    ).tuples()
)
pprint(list(res))
"""
[('桔子', '店面', 4671),
 ('桔子', '京東', 5324),
 ('桔子', '淘寶', 5063),
 ('桔子', '所有渠道', 15058),
 ('蘋果', '店面', 6366),
 ('蘋果', '京東', 3345),
 ('蘋果', '淘寶', 4125),
 ('蘋果', '所有渠道', 13836),
 ('香蕉', '店面', 4235),
 ('香蕉', '京東', 4889),
 ('香蕉', '淘寶', 6269),
 ('香蕉', '所有渠道', 15393),
 ('所有商品', '所有渠道', 44287),
 ('所有商品', '店面', 15272),
 ('所有商品', '京東', 13558),
 ('所有商品', '淘寶', 15457)]
"""

可以仔細體會一下上面的用法,總之在數據庫中我們能直接使用的,基本上都能通過fn來直接調用。甚至中間包含了空格的grouping sets,我們也能通過使用反射的方式進行獲取。

窗口函數

窗口函數是在select語句中的,但是為什么直到現在才說呢?因為它稍微難一些,下面我們就來看看如何在peewee中實現窗口函數。事實上如果你SQL語句寫的好的話,那么直接通過peewee.SQL寫原生的SQL也是可以的,會更方便。不僅是窗口函數,當然也包括上面剛說的cube、rollup、grouping sets等等。或者再比如case when語句,事實上peewee中提供了一個函數Case來實現這一邏輯,但是我們沒說,因為覺得沒有必要,還不如直接在peewee.SQL中寫case when邏輯。

事實上你看一下peewee的Case函數的實現你就知道了,Case里面做的事情也是使用peewee.SQL來拼接case when語句,當然orm最大的作用不就是拼接SQL語句嗎。因此,有些語句,我個人還是推薦在peewee.SQL里面寫原生SQL的方式,會更方便一些。

from peewee import SQL, fn
from pprint import pprint

# 定義窗口,如果有多個窗口必須要起別名, 否則會報錯:提示窗口已存在
w1 = peewee.Window(partition_by=[SalesData.product, SalesData.channel]).alias("w1")
w2 = peewee.Window(partition_by=[SalesData.product]).alias("w2")
w3 = peewee.Window(partition_by=[SalesData.channel]).alias("w3")

res = SalesData.select(SQL("product, channel"),
                       # 通過over來指定窗口,注意:此時只是指定了窗口
                       # 但是窗口的定義是什么,當前的sum還是不知道的
                       # 比如:第一個窗口函數當前只是 sum(amount) over w1
                       # 這個w1究竟如何定義的,我們需要在下面的window中指定
                       fn.SUM(SalesData.amount).over(window=w1),
                       fn.SUM(SalesData.amount).over(window=w2),
                       fn.SUM(SalesData.amount).over(window=w3)
                       # 必須調用window,將定義的窗口傳進去
                       # 等價於 window w1 as (partition by product, channel), w2 as ..., w3 as ...
                      ).window(w1, w2, w3).tuples()
pprint(list(res))
"""
[('蘋果', '店面', 6366, 13836, 15272),
 ('蘋果', '店面', 6366, 13836, 15272),
 ('蘋果', '店面', 6366, 13836, 15272),
 ('蘋果', '店面', 6366, 13836, 15272),
 ('桔子', '店面', 4671, 15058, 15272),
 ('桔子', '店面', 4671, 15058, 15272),
 ('桔子', '店面', 4671, 15058, 15272),
 ('桔子', '店面', 4671, 15058, 15272),
 ('香蕉', '店面', 4235, 15393, 15272),
 ('香蕉', '店面', 4235, 15393, 15272),
 ('香蕉', '店面', 4235, 15393, 15272),
 ('香蕉', '店面', 4235, 15393, 15272),
 ('蘋果', '京東', 3345, 13836, 13558),
 ('桔子', '京東', 5324, 15058, 13558),
 ('桔子', '京東', 5324, 15058, 13558),
 ('桔子', '京東', 5324, 15058, 13558),
 ('桔子', '京東', 5324, 15058, 13558),
 ('蘋果', '京東', 3345, 13836, 13558),
 ('蘋果', '京東', 3345, 13836, 13558),
 ('蘋果', '京東', 3345, 13836, 13558),
 ('香蕉', '京東', 4889, 15393, 13558),
 ('香蕉', '京東', 4889, 15393, 13558),
 ('香蕉', '京東', 4889, 15393, 13558),
 ('香蕉', '京東', 4889, 15393, 13558),
 ('桔子', '淘寶', 5063, 15058, 15457),
 ('桔子', '淘寶', 5063, 15058, 15457),
 ('桔子', '淘寶', 5063, 15058, 15457),
 ('桔子', '淘寶', 5063, 15058, 15457),
 ('香蕉', '淘寶', 6269, 15393, 15457),
 ('香蕉', '淘寶', 6269, 15393, 15457),
 ('香蕉', '淘寶', 6269, 15393, 15457),
 ('香蕉', '淘寶', 6269, 15393, 15457),
 ('蘋果', '淘寶', 4125, 13836, 15457),
 ('蘋果', '淘寶', 4125, 13836, 15457),
 ('蘋果', '淘寶', 4125, 13836, 15457),
 ('蘋果', '淘寶', 4125, 13836, 15457)]
"""

如果沒有調用window的話,那么會報錯:窗口"w1"不存在,當然不僅w1,w2、w3也是不存在的,總之在select中over的窗口必須在window中傳進去。

當然Window這個類里面,還可以傳入order_by,以及窗口的起始和結束位置。

ROWS frame_start
-- 或者
ROWS BETWEEN frame_start AND frame_end

其中,ROWS 表示以行為單位計算窗口的偏移量。frame_start 用於定義窗口的起始位置,可以指定以下內容之一:

  • UNBOUNDED PRECEDING,窗口從分區的第一行開始,默認值;
  • N PRECEDING,窗口從當前行之前的第 N 行開始;
  • CURRENT ROW,窗口從當前行開始。

frame_end 用於定義窗口的結束位置,可以指定以下內容之一:

  • CURRENT ROW,窗口到當前行結束,默認值
  • N FOLLOWING,窗口到當前行之后的第 N 行結束。
  • UNBOUNDED FOLLOWING,窗口到分區的最后一行結束;

下圖演示了這些窗口選項的作用:

1

我們舉例說明:

  • rows unbounded preceding
select product, amount,
       sum(amount) over w as sum_amount
from sales_data 
where saledate = '2019-01-01'
window w as (partition by product order by amount rows unbounded preceding)
/*
桔子	1329	1329
桔子	1736	3065
桔子	1864	4929
蘋果	511	    511
蘋果	568	    1079
蘋果	847	    1926
香蕉	1178	1178
香蕉	1364	2542
香蕉	1573	4115
*/
from peewee import SQL, fn
from pprint import pprint

# 定義窗口
w = peewee.Window(partition_by=[SalesData.product], order_by=[SalesData.amount],
                  start=peewee.Window.preceding())
res = SalesData.select(SQL("product, amount"),
                       fn.SUM(SalesData.amount).over(window=w)
                      ).where(SalesData.saledate == '2019-01-01').window(w).tuples()
pprint(list(res))
"""
[('桔子', 1329, 1329),
 ('桔子', 1736, 3065),
 ('桔子', 1864, 4929),
 ('蘋果', 511, 511),
 ('蘋果', 568, 1079),
 ('蘋果', 847, 1926),
 ('香蕉', 1178, 1178),
 ('香蕉', 1364, 2542),
 ('香蕉', 1573, 4115)]
"""
  • rows n preceding
select product, amount,
       sum(amount) over w as sum_amount
from sales_data 
where saledate = '2019-01-01'
window w as (partition by product order by amount rows 2 preceding)
/*
桔子	1329	1329
桔子	1736	3065
桔子	1864	4929
蘋果	511	    511
蘋果	568	    1079
蘋果	847	    1926
香蕉	1178	1178
香蕉	1364	2542
香蕉	1573	4115
*/
from peewee import SQL, fn
from pprint import pprint

# 定義窗口
w = peewee.Window(partition_by=[SalesData.product], order_by=[SalesData.amount],
                  # Window.preceding中不傳入值就是unbounded preceding,傳入值value就是<value> preceding
                  start=peewee.Window.preceding(2))
res = SalesData.select(SQL("product, amount"),
                       fn.SUM(SalesData.amount).over(window=w)
                      ).where(SalesData.saledate == '2019-01-01').window(w).tuples()
pprint(list(res))
"""
[('桔子', 1329, 1329),
 ('桔子', 1736, 3065),
 ('桔子', 1864, 4929),
 ('蘋果', 511, 511),
 ('蘋果', 568, 1079),
 ('蘋果', 847, 1926),
 ('香蕉', 1178, 1178),
 ('香蕉', 1364, 2542),
 ('香蕉', 1573, 4115)]
"""
  • rows between 1 preceding and 1 following
select product, amount,
       round(avg(amount) over w, 2) as sum_amount
from sales_data 
where saledate = '2019-01-01'
window w as (partition by product order by amount rows between 1 preceding and 1 following
)
/*
桔子	1329	1532.5
桔子	1736	1643
桔子	1864	1800
蘋果	511	    539.5
蘋果	568	    642
蘋果	847	    707.5
香蕉	1178	1271
香蕉	1364	1371.67
香蕉	1573	1468.5
*/
from peewee import SQL, fn
from pprint import pprint

# 定義窗口
w = peewee.Window(partition_by=[SalesData.product], order_by=[SalesData.amount],
                  start=peewee.Window.preceding(1),
                  end=peewee.Window.following(1))
res = SalesData.select(SQL("product, amount"),
                       fn.ROUND(fn.AVG(SalesData.amount).over(window=w), 2)
                      ).where(SalesData.saledate == '2019-01-01').window(w).tuples()
pprint(list(res))
"""
[('桔子', 1329, Decimal('1532.50')),
 ('桔子', 1736, Decimal('1643.00')),
 ('桔子', 1864, Decimal('1800.00')),
 ('蘋果', 511, Decimal('539.50')),
 ('蘋果', 568, Decimal('642.00')),
 ('蘋果', 847, Decimal('707.50')),
 ('香蕉', 1178, Decimal('1271.00')),
 ('香蕉', 1364, Decimal('1371.67')),
 ('香蕉', 1573, Decimal('1468.50'))]
"""

所以我們看到可以在窗口中指定大小,方式為:rows frame_start或者rows between frame_start and frame_end,如果出現了frame_end那么必須要有frame_start,並且是通過between and的形式

frame_start的取值為:沒有frame_end的情況下,unbounded preceding(從窗口的第一行到當前行),n preceding(從當前行的上n行到當前行),current now(從當前行到當前行)

frame_end的取值為:current now(從frame_start到當前行),n following(從frame_start到當前行的下n行),unbounded following(從frame_start到窗口的最后一行)

此外數據庫還提供了一些排名窗口函數、取值窗口函數等等,這些只要你熟悉數據庫的語法,那么調用peewee也是很簡單的,我們舉個例子:

from peewee import SQL, fn
from pprint import pprint

# 定義窗口
w = peewee.Window(partition_by=[SQL("product")], order_by=SalesData.amount)
res = SalesData.select(SalesData.amount,
                       fn.ROW_NUMBER().over(window=w)
                      ).where(SalesData.saledate == '2019-01-01').window(w).tuples()
pprint(list(res))
"""
[(1329, 1),
 (1736, 2),
 (1864, 3),
 (511, 1),
 (568, 2),
 (847, 3),
 (1178, 1),
 (1364, 2),
 (1573, 3)]
"""

其他的函數類似,可以自己嘗試一下。

查看表和字段的信息

peewee的一些常見用法,我暫時只想到了上面那些。如果沒有介紹到的,可以通過peewee的源代碼或者官方文檔查看。總之,peewee.fn是一個很不錯的東西,數據庫里面的能直接用的基本上都可以通過fn來調用。甚至grouping sets這種,我們可以可以通過反射的形式來調用。還有一個萬金油peewee.SQL,我們可以直接在里面寫原生SQL,如果你SQL寫得好的話,根本不需要那么多花里胡哨的。

我們調用的peewee的函數,其底層做的事情就是在轉成peewee.SQL進行拼接,比如我們來看一下peewee中的Case函數。

def Case(predicate, expression_tuples, default=None):
 clauses = [SQL('CASE')]
 if predicate is not None:
     clauses.append(predicate)
 for expr, value in expression_tuples:
     clauses.extend((SQL('WHEN'), expr, SQL('THEN'), value))
 if default is not None:
     clauses.extend((SQL('ELSE'), default))
 clauses.append(SQL('END'))
 return NodeList(clauses)

所以我們看到Case這個函數就是在使用peewee.SQL進行拼接,因此我們直接通過peewee.SQL是完全沒有問題的,有些時候反而推薦這種做法。

下面我們來看看如何通過peewee查看一個表的信息。

from pprint import pprint
import peewee

db = peewee.PostgresqlDatabase("postgres",
                               host="localhost",
                               port=5432,
                               user="postgres",
                               password="zgghyys123")


class OverWatch(peewee.Model):

    pk = peewee.AutoField(verbose_name="自增主鍵")
    name = peewee.CharField(verbose_name="姓名", null=False, index=True)
    hp = peewee.IntegerField(verbose_name="血量", default=200)
    attack = peewee.CharField(verbose_name="英雄定位")
    ultimate = peewee.CharField(verbose_name="終極技能")

    class Meta:
        database = db
        table_name = "ow"
        schema = "anime"


# 1. 查詢一張表的記錄總數
print(OverWatch.select().count())  # 15

# 2. 查詢該表的所有字段名
meta = OverWatch._meta
pprint(meta.columns)
"""
{'attack': <CharField: OverWatch.attack>,
 'hp': <IntegerField: OverWatch.hp>,
 'name': <CharField: OverWatch.name>,
 'pk': <AutoField: OverWatch.pk>,
 'ultimate': <CharField: OverWatch.ultimate>}
"""

# 3. 獲取該表的主鍵
print(meta.get_primary_keys())  # (<AutoField: OverWatch.pk>,)
print([_.name for _ in meta.get_primary_keys()])  # ['pk']

# 4. 是否是聯合主鍵
print(meta.composite_key)  # False

# 5. 獲取該表的默認值
print(meta.get_default_dict())  # {'hp': 200}

# 6. 主鍵是否自增
print(meta.auto_increment)  # True

# 7. 表名和schema名
print(meta.table_name, meta.schema)  # ow anime

# 8. 獲取所有的約束
print(meta.constraints)  # None

通過peewee查看一個表的所有字段的信息。

meta = OverWatch._meta

columns = meta.columns
for col in columns:
    pprint({"字段名": columns[col].column_name,
            "是否為主鍵": columns[col].primary_key,
            "字段類型": columns[col],
            "是否允許非空": columns[col].null,
            "是否必須唯一": columns[col].unique,
            "是否是索引": columns[col].index,
            "默認值": columns[col].default,
            "約束": columns[col].constraints,
            "注釋": columns[col].verbose_name}
           )
"""
{'字段名': 'pk',
 '字段類型': <AutoField: OverWatch.pk>,
 '是否為主鍵': True,
 '是否允許非空': False,
 '是否必須唯一': False,
 '是否是索引': False,
 '注釋': '自增主鍵',
 '約束': None,
 '默認值': None}
{'字段名': 'name',
 '字段類型': <CharField: OverWatch.name>,
 '是否為主鍵': False,
 '是否允許非空': False,
 '是否必須唯一': False,
 '是否是索引': True,
 '注釋': '姓名',
 '約束': None,
 '默認值': None}
{'字段名': 'hp',
 '字段類型': <IntegerField: OverWatch.hp>,
 '是否為主鍵': False,
 '是否允許非空': False,
 '是否必須唯一': False,
 '是否是索引': False,
 '注釋': '血量',
 '約束': None,
 '默認值': 200}
{'字段名': 'attack',
 '字段類型': <CharField: OverWatch.attack>,
 '是否為主鍵': False,
 '是否允許非空': False,
 '是否必須唯一': False,
 '是否是索引': False,
 '注釋': '英雄定位',
 '約束': None,
 '默認值': None}
{'字段名': 'ultimate',
 '字段類型': <CharField: OverWatch.ultimate>,
 '是否為主鍵': False,
 '是否允許非空': False,
 '是否必須唯一': False,
 '是否是索引': False,
 '注釋': '終極技能',
 '約束': None,
 '默認值': None}
"""

我們通過db也可以獲取很多信息

from pprint import pprint
import peewee

db = peewee.PostgresqlDatabase("postgres",
                               host="localhost",
                               port=5432,
                               user="postgres",
                               password="zgghyys123")

# 獲取所有字段,傳入表和schema
pprint(db.get_columns("girl", "public"))
"""
[ColumnMetadata(name='pk', data_type='integer', null=False, primary_key=True, 
                table='girl', default="nextval('girl_pk_seq'::regclass)"),
 ColumnMetadata(name='name', data_type='character varying', null=False,
                primary_key=False, table='girl', default=None),
 ColumnMetadata(name='where', data_type='character varying', null=False, 
                primary_key=False, table='girl', default=None),
 ColumnMetadata(name='country', data_type='character varying', null=True, 
                primary_key=False, table='girl', default=None)]
"""
print(db.get_columns("girl")[0].data_type)  # integer


# 獲取表的外鍵,第二個參數schema不指定默認是public
print(db.get_foreign_keys("girl"))  # []

# 獲取表的索引
print(db.get_indexes("girl"))
"""
[IndexMetadata(name='girl_pkey', 
sql='CREATE UNIQUE INDEX girl_pkey ON public.girl USING btree (pk)', 
columns=['pk'], unique=True, table='girl')]
"""

# 獲取一個schema下的所有表,schema不指定默認是public
print(db.get_tables())
"""
['a', 'b', 'course', 'girl', 'girl_info', 'girl_score', 
'interface', 'ods_cir_df', 'ods_cir_di', 'people', 
'sales_data', 't1', 't_case', 'teacher', '全球患病人數']
"""

# 獲取一個schema下的所有視圖,schema不指定默認是public
print(db.get_views())
"""
[ViewMetadata(name='people_view', 
sql='SELECT people.pk,\n    people.id,\n    people.degree\n   FROM people')]
"""

# 查看schema下是否存在某張表,schema不指定默認是public
print(db.table_exists("girl"))  # True

修改表結構

當我們創建完一張表后,發現字段需要進行修改,這個時候怎么辦呢?我們可以使用一個叫做playhouse的模塊,這個模塊為peewee提供了很多擴展功能,並且它不需要單獨安裝,裝完peewee的時候就已經有了,我們來看一下。

import peewee
from playhouse.migrate import PostgresqlMigrator, migrate
from playhouse.db_url import connect

# 通過peewee.PostgresqlDatabase也可以,這兩者是通用的
db = connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

# 傳入db,實例化一個PostgresqlMigrator對象
migrator = PostgresqlMigrator(db)

with db.transaction():
    migrate(
        # 設置schema
        migrator.set_search_path("anime"),

        # 刪除一個字段,傳入表名、字段名
        migrator.drop_column("ow", "attack"),
        # 增加一個字段,傳入表名、字段名、peewee.xxxField
        migrator.add_column("ow", "country", peewee.CharField(verbose_name="英雄的國籍", null=True)),
        # 重命名一個字段,傳入表名、字段名、新字段名
        migrator.rename_column("ow", "name", "Name"),
        # 修改字段類型
        migrator.alter_column_type("ow", "ultimate", peewee.TextField())

        # 里面有很多操作,比如:增加約束、索引,刪除約束、索引等等,可以進入源碼中查看
        # 甚至可以給表重命名
    )
# 在外層我們寫上了一個with db.transaction(): ,這是因為這些操作不是一個事務
# 執行完之后,會發現表被修改了

反射表

我們想通過orm來操作數據庫中的表的時候,往往會定義一個Model,但是數據庫里面已經存在了大量的表,我們總不能每操作一張表就定義一個Model吧,這樣也太麻煩了。於是在sqlalchemy中提供了一個反射機制,可以自動將數據庫中的表反射成sqlalchemy中Table。那么在peewee中可不可以呢?答案是可以的,只不過我們用的不是peewee,而是playhouse,當然我們完全可以把這兩個模塊當成是一家子。

import peewee
from playhouse.reflection import generate_models

# 通過playhouse.db_url.connect也可以,這兩者是通用的
db = peewee.PostgresqlDatabase("postgres",
                               host="localhost",
                               port=5432,
                               user="postgres",
                               password="zgghyys123")


models = generate_models(db)
# 得到了一個字典,分別是表名和Model組成的鍵值對
print(models)
"""
{'a': <Model: a>, 'b': <Model: b>, 'course': <Model: course>, 
'girl': <Model: girl>, 'girl_info': <Model: girl_info>, 
'girl_score': <Model: girl_score>, 'interface': <Model: interface>, 
'ods_cir_df': <Model: ods_cir_df>, 'ods_cir_di': <Model: ods_cir_di>, 
'people': <Model: people>, 'sales_data': <Model: sales_data>, 
't1': <Model: t1>, 't_case': <Model: t_case>, 'teacher': <Model: teacher>, 
'全球患病人數': <Model: 全球患病人數>}
"""

# 當然也可以指定表名,給指定的表反射成Model
# 如果想反射視圖的話,那么只需要添加一個參數include_views=True即可,默認是False
print(
    generate_models(db, table_names=["girl", "interface"])
)  # {'girl': <Model: girl>, 'interface': <Model: interface>}


# 指定schema
models = generate_models(db, schema="anime")
print(models)  # {'ow': <Model: ow>}


# 我們來操作一波
Girl = generate_models(db, table_names=["girl"])["girl"]
from pprint import pprint
pprint(list(Girl.select().dicts()))
"""
[{'name': '古明地覺', 'pk': 1, 'where': '東方地靈殿'},
 {'name': '博麗靈夢', 'pk': 2, 'where': '博麗神社'},
 {'name': '芙蘭朵露', 'pk': 3, 'where': '紅魔館'},
 {'name': '蕾米莉亞', 'pk': 4, 'where': '紅魔館'},
 {'name': '帕秋莉·諾蕾姬', 'pk': 5, 'where': '紅魔館'},
 {'name': '西行寺幽幽子', 'pk': 6, 'where': '白玉樓'},
 {'name': '八意永琳', 'pk': 7, 'where': '輝夜永遠亭'},
 {'name': '霧雨魔理沙', 'pk': 8, 'where': '魔法森林'},
 {'name': '紅美鈴', 'pk': 9, 'where': '紅魔館'}]
"""

總的來說,peewee還是很強大的,可以實現我們日常所需的功能。並且搭配playhouse,可以實現很多意想不到的功能。至少在PostgreSQL方面,目前是可以和sqlalchemy相媲美的,當然sqlalchemy還支持peewee不支持的數據庫,比如:Oracle、SqlServer,甚至是hive,並提供了更高級的功能,畢竟代碼量擺在那里,只不過那些功能我們很少使用。

另外,可能有人注意到了,我們目前說的都是單表操作。那多表之間的union呢?join呢?以及定義多張表,通過外鍵建立聯系,關聯查詢呢?答案是都沒有,這里只介紹單表,至於多表的話可以自己去了解,事實上也比較簡單,沒什么復雜的。

peewee_async

介紹完peewee,再解釋peewee_async就簡單很多了,因為peewee_async是基於peewee並進行了異步化,執行SQL的流程沒有任何變化,只不過操作需要在協程中進行執行。使用peewee_async操作PostgreSQL的話,需要pip install aiopg,操作MySQL,則需要pip install aiomysql

import asyncio
from peewee_async import PostgresqlDatabase, Manager
from playhouse.reflection import generate_models

db = PostgresqlDatabase("postgres",
                        host="localhost",
                        port=5432,
                        user="postgres",
                        password="zgghyys123")
# 得到模型
Model = generate_models(db, table_names=["girl"])["girl"]
# 然后我們需要將db傳入到Manager中得到一個async_db
async_db = Manager(db)


async def main():
    res = await async_db.execute(Model.select().dicts())
    print(list(res))


asyncio.run(main())

只需要在協程中運行,並且將原來的操作寫成await async_db.execute中即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM