pypika:SQL 語句生成器,讓你再也不用為拼接 SQL 語句而發愁


楔子

日常工作中難免會和數據庫打交道,而為了操作數據庫我們會構建相應的 SQL 語句,而 pypika 的存在可以讓我們更輕松地實現這一點。

pypika 是一個第三方庫,我們首先要安裝它,直接 pip install pypika 即可。

注:Python 還有一個第三庫叫做 pika,是專門用來連接 RabbitMQ 的,這兩個名字雖然很像,但是之間沒有任何關系。

簡單的 select

構建 select 語句的入口點是 pypika.Query,而查詢數據的話必然要有兩個關鍵信息:查詢的表和指定的字段。

from pypika import Query


sql = Query.from_("t").select("id", "name", "age")
print(sql)  # SELECT "id","name","age" FROM "t"
# 返回的是一個 QueryBuilder 對象
print(sql.__class__)  # <class 'pypika.queries.QueryBuilder'>
# 直接轉成字符串即可
print(str(sql))  # SELECT "id","name","age" FROM "t"
# 或者調用 get_sql 方法
print(sql.get_sql())  # SELECT "id","name","age" FROM "t"

如果指定了 select 但是沒有指定表名,那么會報錯:

from pypika import Query

try:
    sql = Query.select("id", "name", "age")
except Exception as e:
    print(e)  # Cannot select id, no FROM table specified.

Table、Column、Schema、Database

如果是簡單的 select 語句,就像上面的栗子那樣,那么 "表" 和 "字段" 直接通過字符串指定即可。但是對於更復雜的栗子來說,我們應該使用 pypika.Table,我們還以上面為例:

from pypika import Query, Table

table = Table("t")
sql = Query.from_(table).select(table.id, table.name, table.age)
# 得到的結果是一樣的
print(str(sql))  # SELECT "id","name","age" FROM "t"

當然對於表而言,我們也是可以指定別名的。

from pypika import Query, Table

table = Table("t").as_("t1")
sql = Query.from_(table).select(table.id, table.name, table.age)

# 一旦起了別名,那么 pypika 就會認為你可能會進行連表操作,於是在選擇字段的時候會帶上別名
print(str(sql))  # SELECT "t1"."id","t1"."name","t1"."age" FROM "t" "t1"

對於 PostgreSQL 而言,它是具有 schema 的,那么我們要如何指定呢?可能有人會這么做?

from pypika import Query, Table, Schema

table = Table("my_schema.t")
sql = Query.from_(table).select(table.id, table.name, table.age)

# 這樣是不行的,因為這表示從一張名為 my_schema.t 的表中篩選數據
# 顯然這是不符合規范的,表名不應該包含 . 這種特殊字符
print(str(sql))  # SELECT "id","name","age" FROM "my_schema.t"
# 真正的格式應該是 "my_schema"."t" 而不是 "my_schema.t"


# 所以我們需要導入 Schema 這個類
sch = Schema("my_schema")
# 對 Schema 對象直接通過屬性引用的方式即可指定表名,比如 sch.t 就表示從 my_schema 下尋找名稱為 t 的表
sql = Query.from_(sch.t).select(sch.t.id, sch.t.name, sch.t.age)
print(str(sql))  # SELECT "id","name","age" FROM "my_schema"."t"

# 如果表名包含中文或者特殊字符的話該怎么辦呢?比如有一張表叫 "<girl>"
# 顯然不可能通過 sch.<girl> 的方式引用,於是這便涉及到 Python 基礎了,我們可以使用反射的方式實現
print(
    str(Query.from_(t := getattr(sch, "<girl>")).select(t.id, t.name, t.age))
)  # SELECT "id","name","age" FROM "my_schema"."<girl>"

# 或者更簡單的做法
table = Table("<girl>", schema="my_schema")
sql = Query.from_(table).select(table.id, table.name, table.age)
print(sql)  # SELECT "id","name","age" FROM "my_schema"."<girl>"

可以指定 table、schema,那么必然逃不掉 database。

from pypika import Query, Database

db = Database("my_db")
# 從 my_db 這個數據庫下尋找名字為 my_schema 的 schema,在 my_schema 下尋找名字為 t 的表
table = db.my_schema.t
sql = Query.from_(table).select(table.id)
print(sql)  # SELECT "id" FROM "my_db"."my_schema"."t"

在查詢到結果集之后,我們還可以進行排序:

from pypika import Query, Order

sql = Query.from_("t").select("id", "name").orderby("id", order=Order.desc)
print(sql)  # SELECT "id","name" FROM "t" ORDER BY "id" DESC

# 如果是多個字段的話
sql = Query.from_("t").select("id", "name").orderby("age", "id")
print(sql)  # SELECT "id","name" FROM "t" ORDER BY "age","id"

sql = Query.from_("t").select("id", "name").orderby("age", "id", order=Order.desc)
print(sql)  # SELECT "id","name" FROM "t" ORDER BY "age" DESC,"id" DESC

# 如果是一個字段升序、一個字段降序怎么辦?答案是調用兩次 orderby 即可
sql = Query.from_("t").select("id", "name").orderby("age", order=Order.desc).orderby("id")
print(sql)  # SELECT "id","name" FROM "t" ORDER BY "age" DESC,"id"

四則運算

執行 select 查詢的時候,字段是可以進行運算的。

from pypika import Query, Field, Table

table = Table("t")
print(
    Query.from_(table).select(table.id, table.age + 1)
)  # SELECT "id","age"+1 FROM "t"

# 我們看一下 table.id 是什么類型
print(table.id.__class__)  # <class 'pypika.terms.Field'>

# 所以我們也可以直接根據 Field 進行構建
print(
    Query.from_(table).select(Field("id"), Field("age") + 1, Field("first") + Field("last"))
)  # SELECT "id","age"+1,"first"+"last" FROM "t"

# 補充一下:對於 Database 而言,進行屬性查找會得到 Schema
db = Database("my_db")
print(db.my_schema.__class__)  # <class 'pypika.queries.Schema'>
# 對於 Schema 而言,進行屬性查找會得到 Table
print(db.my_schema.t.__class__)  # <class 'pypika.queries.Table'>
# 對於 Table 而言,進行屬性查找會得到 Field
print(db.my_schema.t.id.__class__)  # <class 'pypika.terms.Field'>

當然我們也可以進行更加復雜的運算,以及給字段起別名:

from pypika import Query, Field, Table

table = Table("t")

sql = Query.from_(table).select(Field("id").as_("ID"),
                                ((Field("count") + 2000) * Field("price")).as_("amount"))
# 字段起別名可以使用 as、也可以不使用
print(sql)  # SELECT "id" "ID",("count"+2000)*"price" "amount" FROM "t"

可以看到還是比較強大的,特別是會自動給你加上引號,這樣可以防止關鍵字沖突。

where 語句

我們在獲取數據的時候,很少會全部獲取,絕大多數情況只會獲取指定的數據,這個時候就需要使用 where 語句。

from pypika import Query, Field, Table

girl = Table("girl")

# 注意:where 里面不可以寫 "age > 18"
sql = Query.from_(girl).select("*").where(Field("age") >= 18)
print(sql)  # SELECT * FROM "girl" WHERE "age">=18

# 如果是多個 where 條件怎么辦呢?答案是調用多次 where 即可
# 就像我們之前使用的 orderby 一樣,其實除了 orderby 之外,像 select、where、groupby都是可以多次調用的
sql = (Query.from_(girl).select("id").select(girl.name).select(Field("age"))
       .where(Field("age") >= 18).where(girl.age <= 30).where(Field("length") < 165))
print(sql)  # SELECT "id","name","age" FROM "girl" WHERE "age">=18 AND "age"<=30 AND "length"<165

還有 in 和 between 也是支持的,舉個例子:

from pypika import Query, Field, Table, Database

girl = Table("girl")

sql = Query.from_(girl).select("*").where(Field("age").between(18, 30))
print(sql)  # SELECT * FROM "girl" WHERE "age" BETWEEN 18 AND 30

# between 還可以使用切片的形式,並且切片除了數字之外、時間也是可以的,因為數據庫是支持對時間的 between 語法的
sql = Query.from_(girl).select("*").where(Field("age")[18: 30]).where(Field("length").isin([155, 156, 157]))
print(sql)  # SELECT * FROM "girl" WHERE "age" BETWEEN 18 AND 30 AND "length" IN (155,156,157)

當然還有其它的方法,比如:contains、isnull、notnull、notin、not_like 等等,可以自己試一下。

另外可能有人覺得每一個條件都要調用一次 where 太麻煩了,而且默認都是 AND,如果是 OR 的話怎么辦呢?於是我們就引出了 &、|、^ 三個符號。

AND

from pypika import Query, Field, Table

girl = Table("girl")

sql = Query.from_(girl).select("*").where((Field("age") > 18) & Field("length") < 160)
print(sql)  # SELECT * FROM "girl" WHERE "age">18 AND "length"<160

OR

from pypika import Query, Field, Table

girl = Table("girl")

sql = Query.from_(girl).select("*").where((Field("age") > 18) | Field("length") < 160)
print(sql)  # SELECT * FROM "girl" WHERE "age">18 OR "length"<160

XOR

from pypika import Query, Field, Table

girl = Table("girl")

sql = Query.from_(girl).select("*").where((Field("age") > 18) ^ Field("length") < 160)
print(sql)  # SELECT * FROM "girl" WHERE "age">18 XOR "length"<160

對於 AND 和 OR,pypika 提供了專門的類來實現鏈式的 AND、OR 表達式。

from pypika import Query, Field, Table, Criterion

girl = Table("girl")

sql = Query.from_(girl).select("*").where(
    Criterion.all([Field("id") > 10,
                   Field("age") > 18,
                   Field("length").isin([155, 156, 157])])
)
print(sql)  # SELECT * FROM "girl" WHERE "id">10 AND "age">18 AND "length" IN (155,156,157)


sql = Query.from_(girl).select("*").where(
    Criterion.any([Field("id").between(10, 50),
                   Field("age") > 18,
                   Field("length").isin([155, 156, 157])])
)
print(sql)  # SELECT * FROM "girl" WHERE "id" BETWEEN 10 AND 50 OR "age">18 OR "length" IN (155,156,157)

分組和聚合

然后是 group by,我們看看如何實現:

from pypika import Query, Field, Table, functions as fn

girl = Table("girl")

# fn 是一個模塊,里面包含了很多的類,每個類對應一個聚合函數
# 但是注意:往類里面(比如下面的 fn.Count)傳入字段的時候,不能直接傳遞字符串,需要傳遞一個 Field 對象
sql = Query.from_(girl).select("age", fn.Count(Field("id"))).where(
    Field("age")[18: 30] & (Field("length") < 160)
).groupby(Field("age"))
print(sql)  # SELECT "age",COUNT("id") FROM "girl" WHERE "age" BETWEEN 18 AND 30 AND "length"<160 GROUP BY "age"

可以看到拼接的 SQL 語句是正確的,如果我們傳遞的是 fn.Count("id") 的話,那么拼接之后得到的就是 Count('id')。pypika 給你變成了單引號,顯然這是不合法的,因為在數據庫中單引號表示字符串。如果傳遞字符串,那么只能傳遞 "*",會得到 Count(*),這是個特例。當然有人會說這是 PostgreSQL 的語法吧,MySQL 應該是反引號才對,沒錯,后面的話我們會說如何適配數據庫。因為數據庫的種類不同,語法也會稍有不同,而目前我們沒有任何信息表明我們使用的到底是哪一種數據庫。

除了 fn.Count 表示計算次數之外,還有很多其它的函數,比如:fn.Sum 表示計算總和;fn.Avg 表示計算平均值等等。除此之外還有很多其它的函數,比如取絕對值、對數、類型轉換等等,在 SQL 里面有的都可以去里面找。關於 fn 里面的功能后面會提及,我們當前的重點是 groupby,當 groupby 之后我們就可以接 having 了。

from pypika import Query, Field, Table, functions as fn

girl = Table("girl")

sql = Query.from_(girl).select("age", fn.Count(Field("id"))).where(
    Field("age")[18: 30] & (Field("length") < 160)
).groupby(Field("age")).having(fn.Count(Field("id")) > 30)
print(sql)  
# SELECT "age",COUNT("id") FROM "girl" WHERE "age" BETWEEN 18 AND 30 AND "length"<160 GROUP BY "age" HAVING COUNT("id")>30

兩表 join

多表 join 的話可以直接使用 Query.join 來實現, 並且后面需要跟 using 或 on 字句。

from pypika import Query, Table


table1 = Table("t1")
table2 = Table("t2")

sql = Query.from_(table1).select(table2.age, table1.name).left_join(table2).using("id")
print(sql)  # SELECT "t2"."age","t1"."name" FROM "t1" LEFT JOIN "t2" USING ("id")

# 此時查詢字段的時候就不能只寫字段名了,因為涉及到了兩張表
# 假設我們指定的是 "age",那么結果默認是 "t1"."age",但我們不一定就是要 t1 里面的 age
# 假設我們指定的是 Field("age"),那么結果得到的就是 "age",如果兩張表里面都要 age 字段,那么數據庫是無法區分我們到底選擇哪張表里的 age 字段的
# 可能有人覺得 Field("t2.age") 不就行了,答案是不行的,原因跟我們之前說的一樣,該結果得到的是 "t2.age",而結果應該是 "t2"."age"

# 所以我們需要通過 table2.age 的方式,但是這樣也隱藏一個問題,其實這個問題在之前就應該說的
# 我們說通過 "Table對象.字段" 的方式可以篩選指定的字段,但是 Table對象有沒有自己的方法呢?
print(table1.as_)  # <bound method builder.<locals>._copy of Table('t1')>

# 顯然 as_ 就是一個方法,用於起別名,我們之前用過
# 但是問題來了,如果數據庫表中就有一個字段叫做 as_ 該咋辦
sql = Query.from_(table1).select(table2.as_).left_join(table2).using("id")
print(sql)  # SELECT <bound method builder.<locals>._copy of Table('t2')> FROM "t1" LEFT JOIN "t2" USING ("id")

# 我們看到結果不是我們想要的,所以還有最后一個做法
sql = Query.from_(table1).select(table2.field("age")).left_join(table2).using("id")
print(sql)  # SELECT "t2"."age" FROM "t1" LEFT JOIN "t2" USING ("id")

# 因此我們看到選擇字段的方式還是很多的,總結一下:
"""

+---------------------+----------------------------------------------+
|         方式        |                     缺點                     |
+---------------------+----------------------------------------------+
|        "字段"       |               不能是多表 join                |
|    Field("字段")    |               不能是多表 join                |
|      table.字段     | 可以多表 join,但是不能和 Table 內部方法重名 |
| table.field("字段") |              沒有限制,終極選擇              |
+---------------------+----------------------------------------------+

"""

join 的幾種方式

  • left_join
  • right_join
  • inner_join
  • outer_join
  • cross_join
  • hash_join

以上在內部都調用了 join 方法,看部分源碼:

    def inner_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.inner)

    def left_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.left)

    def right_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.right)

    def outer_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.outer)

    def cross_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.cross)

    def hash_join(self, item: Union[Table, "QueryBuilder", AliasedQuery]) -> "Joiner":
        return self.join(item, JoinType.hash)

使用 on 進行連接

當兩張表要連接的字段的名字相同、並且是等值連接,那么可以使用 using。但是這種情況還是不多的,最常見的情況是:兩個名字不同的字段進行等值連接,比如一張表的 uid 等於另一張表的 tid 等等。

from pypika import Query, Table


table1 = Table("t1")
table2 = Table("t2")

sql = Query.from_(table1).select(table2.age, table1.name).left_join(table2)\
    .on(table1.field("uid") == table2.field("tid"))
print(sql)  # SELECT "t2"."age","t1"."name" FROM "t1" LEFT JOIN "t2" ON "t1"."uid"="t2"."tid"

當然我們可以在 join 之前使用 where,先把不必要的數據過濾掉。

from pypika import Query, Table


table1 = Table("t1")
table2 = Table("t2")

sql = Query.from_(table1).select(table2.age, table1.name).left_join(table2).using("id")\
    .where(table1.age > 18)
print(sql)  # SELECT "t2"."age","t1"."name" FROM "t1" LEFT JOIN "t2" USING ("id") WHERE "t1"."age">18

嵌套子查詢

再來看看嵌套子查詢:

from pypika import Query, Table, functions as fn


table1 = Table("t1")
table2 = Table("t2")

sub_query = Query.from_(table1).select(fn.Avg(table2.age).as_("avg")).left_join(table2).using("id")\
    .where(table1.age > 18)
print(sub_query)  # SELECT AVG("t2"."age") "avg" FROM "t1" LEFT JOIN "t2" USING ("id") WHERE "t1"."age">18

# 子查詢完全可以當成一張表來操作
sql = Query.from_(table1).select("age", "name").where(table1.field("age") > Query.from_(sub_query).select("avg"))
print(sql)
"""
SELECT "age", "name"
FROM "t1"
WHERE "age" > (SELECT "sq0"."avg"
               FROM (SELECT AVG("t2"."age") "avg"
                     FROM "t1"
                              LEFT JOIN "t2" USING ("id")
                     WHERE "t1"."age" > 18) "sq0")
"""

集合運算

兩個結果集之間是可以合並的,比如 union 和 union all,至於 union distinct 它是 union 的同義詞,所以 pypika 沒有設置專門的函數。union 雖然可以用來合並多個結果集,但前提是它們要有相同的列。

from pypika import Query, Table

table1 = Table("t1")
table2 = Table("t2")

sql1 = Query.from_(table1).select("name", "age", "salary")
sql2 = Query.from_(table2).select("name", "age", "salary")

print(sql1.union(sql2))  # (SELECT "name","age","salary" FROM "t1") UNION (SELECT "name","age","salary" FROM "t2")
print(sql2.union(sql1))  # (SELECT "name","age","salary" FROM "t2") UNION (SELECT "name","age","salary" FROM "t1")

# union 可以使用 + 代替
print(str(sql1 + sql2) == str(sql1.union(sql2)))  # True
print(str(sql2 + sql1) == str(sql2.union(sql1)))  # True

# union_all 可以使用 * 代替
print(sql1.union_all(sql2))  # (SELECT "name","age","salary" FROM "t1") UNION ALL (SELECT "name","age","salary" FROM "t2")
print(sql2.union_all(sql1))  # (SELECT "name","age","salary" FROM "t2") UNION ALL (SELECT "name","age","salary" FROM "t1")
print(str(sql1 * sql2) == str(sql1.union_all(sql2)))  # True
print(str(sql2 * sql1) == str(sql2.union_all(sql1)))  # True

此外還有交集、差集、對稱差集。

from pypika import Query, Table

table1 = Table("t1")
table2 = Table("t2")

sql1 = Query.from_(table1).select("name", "age", "salary")
sql2 = Query.from_(table2).select("name", "age", "salary")

# 交集,沒有提供專門的操作符
print(sql1.intersect(sql2))  # (SELECT "name","age","salary" FROM "t1") INTERSECT (SELECT "name","age","salary" FROM "t2")
# 差集,可以使用減號替代
print(sql1.minus(sql2))  # (SELECT "name","age","salary" FROM "t1") MINUS (SELECT "name","age","salary" FROM "t2")
# 對稱差集,沒有提供專門的操作符
print(sql1.except_of(sql2))  # (SELECT "name","age","salary" FROM "t1") EXCEPT (SELECT "name","age","salary" FROM "t2")

Date, Time, and Intervals

非常簡單,直接看栗子即可:

from pypika import Query, Table, functions as fn, Interval

fruits = Table('fruits')

sql = Query.from_(fruits) \
    .select(fruits.id, fruits.name) \
    .where(fruits.harvest_date + Interval(months=1) < fn.Now())
print(sql)  # SELECT "id","name" FROM "fruits" WHERE "harvest_date"+INTERVAL '1 MONTH'<NOW()

多值比較

SQL 有一個非常有用的特性,假設一張表中有 year、month 這兩個字段,然后我想找出 year、month 組合起來之后大於 2020 年 7 月的記錄。比如:year = 2021、month = 2 這條記錄就是合法的,因為 year 是大於 2020 的;year = 2020、month = 8 也是合法的。

顯然這個時候就有些不好搞了,我們無法通過 year > 2020 and month > 7 這種形式,但是數據庫提供了多值比較:

select * from t where (year, month) > (2020, 7)

是不是很像元組呢?這樣會先比較 year,如果滿足 year > 2020、直接成立,year < 2020、直接不成立,后面就不用比了;如果 year = 2020,那么再比較 month。

from pypika import Query, Table, Tuple

table = Table("t")

sql = Query.from_(table).select(table.salary).where(Tuple(table.year, table.month) >= (2020, 7))
print(sql)  # SELECT "salary" FROM "t" WHERE ("year","month")>=(2020,7)

對於 in 字句也是同樣的道理:

from pypika import Query, Table, Tuple

table = Table("t")

sql = Query.from_(table).select(table.salary).where(
    Tuple(table.year, table.month
          ).isin([(2020, 7), (2020, 8), (2020, 9)]))
print(sql)  # SELECT "salary" FROM "t" WHERE ("year","month") IN ((2020,7),(2020,8),(2020,9))

字符串函數

字符串操作也是非常常用的,下面我們來看一下:

from pypika import Query, Table

table = Table("t")

sql = Query.from_(table).select(
    "name", "age", "salary"
).where(
    table.field("name").like("古明地%") &
    table.field("place").ilike("ja%")  # ilike 不區分大小寫
)
print(sql)  # SELECT "name","age","salary" FROM "t" WHERE "name" LIKE '古明地%' AND "place" ILIKE 'ja%'

正則也是可以的:

from pypika import Query, Table

table = Table("t")

sql = Query.from_(table).select(
    "name", "age", "salary"
).where(
    table.field("xxx").regex(r"^[abc][a-zA-Z]+&")
)
print(sql)  # SELECT "name","age","salary" FROM "t" WHERE "xxx" REGEX '^[abc][a-zA-Z]+&'

還有字符串之間的 concat、大小寫轉換等等:

from pypika import Query, Table, functions as fn

table = Table("t")

sql = Query.from_(table).select(
    fn.Concat(table.field("name"), "-", table.field("age")),
    fn.Upper(table.field("name")),
    fn.Lower(table.field("name")),
    fn.Length(table.field("name"))
)
print(sql)  # SELECT CONCAT("name",'-',"age"),UPPER("name"),LOWER("name"),LENGTH("name") FROM "t"

自定義函數

盡管 pypika 的 fn 模塊支持大部分的 SQL 函數,但還是有少部分沒有覆蓋到,這個時候我們自定義函數的實現,舉個例子:

from pypika import Table, Query, CustomFunction

table = Table("t")
# 我們自定義實現數據庫的 LENGTH 函數,該函數可以接收一個參數來查看長度
my_length = CustomFunction("LENGTH", ["col"])
# 我們自定義實現數據庫的 POWER 函數,該函數可以接收兩個參數來計算冪
my_power = CustomFunction("POWER", ["col1", "col2"])

sql = Query.from_(table).select(my_length(table.field("name")))
print(sql)  # SELECT LENGTH("name") FROM "t"

sql = Query.from_(table).select(my_power(table.field("num"), table.field("base")))
print(sql)  # SELECT POWER("num","base") FROM "t"
sql = Query.from_(table).select(my_power(table.field("num"), 3))
print(sql)  # SELECT POWER("num",3) FROM "t"

case when

然后看看 case when,SQL 層面上的就不說了,我們只看怎么用 pypika 實現。

from pypika import Table, Query, Case

table = Table("t")

sql = Query.from_(table).select(
    table.name,
    Case().when(table.age < 18, "未成年").when(table.age < 30, "成年")
        .when(table.age < 50, "中年").else_("老年").as_("age")
)
print(sql)
"""
SELECT "name",
CASE WHEN "age"<18 THEN '未成年' 
     WHEN "age"<30 THEN '成年' 
     WHEN "age"<50 THEN '中年' 
     ELSE '老年' END "age" 
FROM "t"
"""

with 語句

with 語句就是給子查詢指定一個名字,然后在其它地方可以直接使用該名字,就像訪問一張已存在的表一樣。

from pypika import Table, Query, AliasedQuery

table = Table("t")

sub_query = Query.from_(table).select("*")
sql = Query.with_(sub_query, "alias").from_(AliasedQuery("alias")).select("*")
print(sql)  # WITH alias AS (SELECT * FROM "t") SELECT * FROM alias

distinct

如果我們相對結果集進行去重的話,要怎么做呢?

from pypika import Query, Table

table = Table("t")
print(
    # 只需要在 select 之前調用一次 distinct 即可
    Query.from_(table).distinct().select(table.id, table.age)
)  # SELECT DISTINCT "id","age" FROM "t"

limit 與 offset

這兩個就沒有什么好說的了,直接看語法吧。

from pypika import Table, Query, functions as fn, Order

table = Table("t")
sql = Query.from_(table).select(fn.Count("id").as_("count"), "age", "length")\
    .where(table.field("age") > 18).groupby("age", "length").having(fn.Count("id") > 10)\
    .orderby("count", order=Order.desc).orderby("age", order=Order.asc).limit(10).offset(5)
print(sql)
"""
SELECT COUNT('id') "count", "age", "length"
FROM "t"
WHERE "age" > 18
GROUP BY "age", "length"
HAVING COUNT('id') > 10
ORDER BY "count" DESC, "age" ASC
LIMIT 10 OFFSET 5
"""

我們將多個字句合並了起來,算是整體總結一下吧。

插入數據

介紹完了查詢數據,下面看看插入數據要怎么實現。

from pypika import Table, Query

table = Table("t")
# 查詢是 Query.from_,插入數據是 Query.into
sql = Query.into(table).insert(1, "古明地覺", 16, "東方地靈殿")
print(sql)  # INSERT INTO "t" VALUES (1,'古明地覺',16,'東方地靈殿')

以上就是簡單的插入數據,但是有需要注意的地方,比如:None 值的處理。

from pypika import Table, Query

table = Table("t")

sql = Query.into(table).insert(1, "古明地覺", None, "東方地靈殿")
# 自動幫你換成了 NULL,如果是我們自己手動拼接的話,那么還需要額外處理一下
# 而 pypika 內部已經幫你做好了,如果不把 None 換成 NULL,交給數據庫執行是會報錯的
print(sql)  # INSERT INTO "t" VALUES (1,'古明地覺',NULL,'東方地靈殿')

# 假設第二個字段在數據庫中是一個 json
sql = Query.into(table).insert(1, {"name": "古明地覺", "age": 16, "where": None})
print(sql)  # INSERT INTO "t" VALUES (1,{'name': '古明地覺', 'age': 16, 'where': None})
# 我們看到這不是我們期望的結果,我們應該手動將值先變成 json 才行

import json
sql = Query.into(table).insert(1, json.dumps({"name": "古明地覺", "age": 16, "where": None}, ensure_ascii=False))
print(sql)  # INSERT INTO "t" VALUES (1,'{"name": "古明地覺", "age": 16, "where": null}')
# 以上就沒有問題了

上面是插入一條數據,如果是插入多條呢?有兩種方式:

from pypika import Table, Query

table = Table("t")

sql = Query.into(table).insert(1, "古明地覺", 16, "東方地靈殿")\
    .insert(2, "古明地戀", 15, "東方地靈殿")
print(sql)  # INSERT INTO "t" VALUES (1,'古明地覺',16,'東方地靈殿'),(2,'古明地戀',15,'東方地靈殿')

# 或者
sql = Query.into(table).insert((1, "古明地覺", 16, "東方地靈殿"), (2, "古明地戀", 15, "東方地靈殿"))
print(sql)  # INSERT INTO "t" VALUES (1,'古明地覺',16,'東方地靈殿'),(2,'古明地戀',15,'東方地靈殿')

指定字段插入

很多時候我們可以只指定部分字段的值:

from pypika import Table, Query, Field

table = Table("t")

sql = Query.into(table).columns(
    "id", table.field("name"), table.age, Field("place")
).insert(1, "古明地覺", 16, "東方地靈殿")
print(str(sql))  # INSERT INTO "t" ("id","name","age","place") VALUES (1,'古明地覺',16,'東方地靈殿')

將一張表的記錄插入到另一張表中

如果要將一張表的部分記錄插入到另一張表中要怎么辦呢?

from pypika import Table, Query, Field

table1 = Table("t1")
table2 = Table("t2")

sql = Query.into(table1).columns(
    "id", "name", "age").from_(table2).select("id", "name", "age").where(Field("age") > 18)
print(sql)  # INSERT INTO "t1" ("id","name","age") SELECT "id","name","age" FROM "t2" WHERE "age">18

將兩個表 join 之后的數據插入到新表中也是可以的:

from pypika import Table, Query, Field

table1 = Table("t1")
table2 = Table("t2")
table3 = Table("t3")

sql = Query.into(table1).columns("id", "name", "age").from_(
    table2).left_join(table3).on(table2.id == table3.id).select(table2.id, table2.name, table3.age)
print(sql)
"""
INSERT INTO "t1" ("id","name","age") 
SELECT "t2"."id","t2"."name","t3"."age" FROM "t2" LEFT JOIN "t3" ON "t2"."id"="t3"."id"
"""

更新數據

然后是更新數據:

from pypika import Table, Query

table = Table("t")
# 更新是 update
sql = Query.update(table).set(table.name, "古明地戀")
print(sql)  # UPDATE "t" SET "name"='古明地戀'
sql = Query.update(table).set(table.name, "古明地戀").where(table.id == 1)
print(sql)  # UPDATE "t" SET "name"='古明地戀' WHERE "id"=1

sql = Query.update(table).set(table.name, "古明地戀").set(table.age, 16)
print(sql)  # UPDATE "t" SET "name"='古明地戀',"age"=16

用另一張表的數據更新當前也是一種比較常見的操作,比如 t1 有 id、name 兩個字段,t2 有 id、name 兩個字段。如果 t1 的 id 在 t2 中存在,那么就用 t2 的 name 更新掉 t1 的 name。

from pypika import Table, Query

table1 = Table("t1")
table2 = Table("t2")

sql = Query.update(table1).join(table2).on(
    table1.uid == table2.tid
).set(table1.name, table2.name).where(table1.uid > 10)
print(sql)  # UPDATE "t1" JOIN "t2" ON "t1"."uid"="t2"."tid" SET "name"="t2"."name" WHERE "t1"."uid">10

最后,更新字段還有另一種方式:

from pypika import Table

table1 = Table("t1")
table2 = Table("t2")

# Query.update(table1) 也可以寫成 table1.update(),其它不變
sql = table1.update().join(table2).on(
    table1.uid == table2.tid
).set(table1.name, table2.name).where(table1.uid > 10)
# 可以看到打印的結果是一樣的
print(sql)  # UPDATE "t1" JOIN "t2" ON "t1"."uid"="t2"."tid" SET "name"="t2"."name" WHERE "t1"."uid">10

占位符

相當於先構建好相應的 SQL 語句,然后驅動在執行的時候會對數據進行替換,舉個栗子:

from pypika import Table, Query, Parameter

table = Table("t")
sql = Query.into(table).columns(
    "id", "name", "age").insert(Parameter(":1"), Parameter(":2"), Parameter(":3"))
print(sql)  # INSERT INTO "t" ("id","name","age") VALUES (:1,:2,:3)

# 我們看到 VALUES 后面只是相應的占位符,具體的值是多少需要驅動執行的時候傳遞
# 但是問題來了,不同的驅動的占位符是不同的。有的是 %s,有的是 $1、$2、$3 這種
# 所以需要根據驅動調整占位符,但是 SQLAlchemy 提供了一種通用的方式
from sqlalchemy import text, create_engine
sql = Query.into(table).columns(
    "id", "name", "age").insert(Parameter(":id"), Parameter(":name"), Parameter(":age"))
# 傳遞到 text 中
sql = text(str(sql))
print(sql)  # INSERT INTO "t" ("id","name","age") VALUES (:id,:name,:age)
engine = create_engine("")
engine.execute(sql, id=123, name="古明地覺", age=16)

一些高級用法

我們之前說不同數據庫的 SQL 語法會有略微不同,最大的一個不同就是包裹字段所用的符號,MySQL 用的是反引號、PostgreSQL 用的是雙引號。而 pypika 不知道要為你適配什么數據庫,所以默認用的是雙引號,如果想適配 MySQL 的話,那么我們應該告訴 pypika,我們要適配 MySQL。

至於做法也很簡單,我們之前不是用 Query 這個類進行的增刪改查嗎?現在不用它了。

from pypika import Table, MySQLQuery, PostgreSQLQuery

table = Table("t")
print(
    MySQLQuery.from_(table).select(table.id, table.age)
)  # SELECT `id`,`age` FROM `t`

print(
    PostgreSQLQuery.from_(table).select(table.id, table.age)
)  # SELECT "id","age" FROM "t"

我們看到此時就實現了數據庫的適配,除了 MySQL 和 PostgreSQL 之外,pypika 還可以適配其它的數據庫:

  • MSSQLQuery: 微軟的 SqlServer
  • OracleQuery: 甲骨文的 Oracle
  • VerticaQuery: 列式數據庫 Vertica

當然還支持窗口函數:

from pypika import Table, Query, analytics as an

table = Table("t")
sql = Query.from_(table).select(
    an.Sum("amount").over("month").as_("amount_sum"),
    an.Avg("amount").over("month").as_("amount_avg")
)
print(sql)
"""
SELECT SUM('amount') OVER (PARTITION BY month) "amount_sum",
       AVG('amount') OVER (PARTITION BY month) "amount_avg"
FROM "t"
"""

更詳細內容可以去 https://pypika.readthedocs.io/en/latest/3_advanced.html#window-frames 進行查看。

總的來說,pypika 還是很方便的,如果需要手動拼接 SQL 的話,不妨試試 pypika。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM