peewee常用功能速查
peewee 簡介
Peewee是一種簡單而小的ORM。它有很少的(但富有表現力的)概念,使它易於學習和直觀的使用。
常見orm數據庫框架
- Django ORM
- peewee
- SQLAlchemy
Django ORM
優點
:
易用,學習曲線短
和Django緊密集合,用Django時使用約定俗成的方法去操作數據庫
缺點
:
QuerySet速度不給力,會逼我用Mysqldb來操作原生sql語句。
Peewee
優點
:
Django式的API,使其易用
輕量實現,很容易和任意web框架集成
缺點
:
不支持自動化 schema 遷移
不能像Django那樣,使線上的mysql表結構生成結構化的模型。
SQLAlchemy
優點
:
巨牛逼的API,使得代碼有健壯性和適應性
靈活的設計,使得能輕松寫復雜查詢
缺點
:
工作單元概念不常見
重量級 API,導致長學習曲線
peewee 簡單demo
import datetime
from peewee import *
db = MySQLDatabase(
"test", host="127.0.0.1", port=3306, user="root", passwd="123456"
)
db.connect()
class BaseModel(Model):
class Meta:
database = db
class Person(BaseModel):
name = CharField()
age = IntegerField()
height = IntegerField()
sex = BooleanField(default='male')
if __name__ == "__main__":
Person.create_table()
# 創建
Person.create(name='tom', age=30, height=177)
# 查詢
res = Person.select().where(Person.name=='tom')
print(res)
print(res[0])
print(res[0].name)
print(res[0].age)
print(res[0].height)
print(res[0].sex)
>>>>
SELECT `t1`.`id`, `t1`.`name`, `t1`.`age`, `t1`.`High`, `t1`.`sex` FROM `person` AS `t1` WHERE (`t1`.`name` = 'ljk')
1
tom
30
177
True
Model 和 Field 關系
在ORM對象關系數據庫中 Model是一個類,映射到數據庫表中就是一個表。Filed是字段,映射到表中就是字段。model實例就是數據庫中的一條記錄。在peewee中Model和Field的關系如下:
Thing | 對應關系 |
---|---|
Model 類 | 表 |
Field 實例 | 表中字段 |
Model 實例 | 表中數據 |
數據庫連接和model類定義的典型使用
import datetime
from peewee import *
db = SqliteDatabase('my_app.db')
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
username = CharField(unique=True)
class Tweet(BaseModel):
user = ForeignKeyField(User, backref='tweets')
message = TextField()
created_date = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
- 創建一個數據庫實例
db = SqliteDatabase('my_app.db')
- 創建一個基礎model類
class BaseModel(Model):
class Meta:
database = db
定義一個用於建立數據庫連接的基模類是一種推薦的做法,因為將不必為后續表指定數據庫。
3.定義一個普通 model 類
class User(BaseModel):
username = CharField(unique=True)
模型定義使用的是其他流行的orm(如SQLAlchemy或Django)中看到的聲明式風格。因為User繼承了BaseModel 類,所以User類可以繼承數據庫連接。
User已經明確定義了一個具有唯一約束的用戶名列。因為我們沒有指定主鍵,peewee 會自動添加一個自增整數主鍵字段,名為 id。沒有指定主鍵的表peewee會自動創建一個名字為id的自增主鍵。
Model 模型
為了不污染model的命名空間,model的配置放在特殊的元屬性類中。這是從Django的框架中借鑒過來的。
contacts_db = SqliteDatabase('contacts.db')
class Person(Model):
name = CharField()
class Meta:
database = contacts_db
在簡單model示例中,你會注意到,我們創建了一個定義數據庫的BaseModel,然后擴展了它。這是定義數據庫和創建模型的首選方法。
你可以通過ModelClass._meta
來使用:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: type object 'Person' has no attribute 'Meta'
>>> Person._meta
<peewee.modeloptions object="" at="" 0x7f51a2f03790="">
ModelOptions
實現了幾個查看model metadata的方法:
{'id': <peewee.autofield object="" at="" 0x7f51a2e92750="">,
'name': <peewee.charfield object="" at="" 0x7f51a2f0a510="">}
>>> Person._meta.primary_key
<peewee.autofield object="" at="" 0x7f51a2e92750="">
>>> Person._meta.database
<peewee.sqlitedatabase object="" at="" 0x7f519bff6dd0="">
Model 在ORM數據中就是一張表,那么表的屬性可以有如下選項。它們是被定義在Meta中元數據。
Option | Meaning | 是否可繼承? |
---|---|---|
database | 指定表創建依附的數據庫 | yes |
table_name | 表名 | no |
table_function | 生成表名的函數 | yes |
indexes | 多行索引 | yes |
primary_key | 主鍵 | yes |
constraints | 表約束的列表 | yes |
schema | 模型的數據庫架構 | yes |
only_save_dirty | 調用model.save()時,僅保存臟字段,指定字段? | yes |
options | 創建表擴展的選項字典 | yes |
table_settings | 在右括號后設置字符串的列表 | yes |
temporary | 指示臨時表 | yes |
legacy_table_names | 使用舊表名生成(默認情況下啟用) | yes |
depends_on | 指示此表依賴於另一個表進行創建 | no |
without_rowid | 指示表不應具有rowid(僅限SQLite) | no |
strict_tables | 指示嚴格的數據類型(僅限SQLite,3.37+) | yes |
Filed 字段
Field類是用來將Model屬性映射到數據庫列。每個字段類型都有一個相應的SQL存儲類,將python數據類型轉化為基本的存儲類型。
當創建Model類時,fields被定義成類的屬性。它看起來和django的數據庫框架很類似。
class User(Model):
username = CharField()
join_date = DateTimeField()
about_me = TextField()
在上面的例子中,因為沒有field有主鍵屬性primary_key=True,所以會創建一個名字是id的自增主鍵。
peewee中可用的字段包括:
字段類型 | Sqlite | Postgresql | MySQL |
---|---|---|---|
AutoField | integer | serial | integer |
BigAutoField | integer | bigserial | bigint |
IntegerField | integer | integer | integer |
BigIntegerField | integer | bigint | bigint |
SmallIntegerField | integer | smallint | smallint |
IdentityField | not supported | int identity | not supported |
FloatField | real | real | real |
DoubleField | real | double precision | double precision |
DecimalField | decimal | numeric | numeric |
CharField | varchar | varchar | varchar |
FixedCharField | char | char | char |
TextField | text | text | text |
BlobField | blob | bytea | blob |
BitField | integer | bigint | bigint |
BigBitField | blob | bytea | blob |
UUIDField | text | uuid | varchar(40) |
BinaryUUIDField | blob | bytea | varbinary(16) |
DateTimeField | datetime | timestamp | datetime |
DateField | date | date | date |
TimeField | time | time | time |
TimestampField | integer | integer | integer |
IPField | integer | bigint | bigint |
BooleanField | integer | boolean | bool |
BareField | untyped | not supported | not supported |
ForeignKeyField | integer | integer | integer |
字段初始化參數
所有字段類型接受的參數及其默認值
- null = False 允許空值
- index = False 創建索引
- unique = False 創建唯一索引
- column_name = None 顯式指定數據庫中的列名
- default = None 默認值,可以使任意值或可調用對象
- primary_key = False 指明主鍵
- constraints = None 約束條件
- sequence = None 序列名字(如果數據庫支持)
- collation = None 排序字段
- unindexed = False 虛表上的字段不應該被索引
- choices = None 兩種可選項:value display
- help_text = None 幫助說明字段。表示此字段的任何有用文本的字符串
- verbose_name = None 表示此字段的用戶友好名稱的字符串
- index_type = None 索引類型
字段特有參數
在一些字段中有些自己特有的參數,如下:
字段類型 | 特有參數 |
---|---|
CharField | max_length |
FixedCharField | max_length |
DateTimeField | formats |
DateField | formats |
TimeField | formats |
TimestampField | resolution, utc |
DecimalField | max_digits, decimal_places, auto_round, rounding |
ForeignKeyField | model, field, backref, on_delete, on_update, deferrable lazy_load |
BareField | adapt |
字段默認參數
peewee可以為每一個字段提供默認值,比如給intergerField 默認值0而不是NULL。你可以申明字段時指定默認值:
class Message(Model):
context = TextField()
read_count = IntegerField(default=0)
在某些情況下,默認值是動態的會更有意義。一個可能的場景就是當前時間。Peewee 允許您在這些情況下指定一個函數,該函數的返回值將在創建對象時使用。注意,使用時只提供了函數,並不需要實際調用它。
class Message(Model):
context = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
如果你正在使用一個接受可變類型(list, dict等)的字段,並想提供一個默認值。將默認值包裝在一個簡單的函數中是個好主意,這樣,多個模型實例就不會共享對同一底層對象的引用。
def house_defaults():
return {'beds': 0, 'baths': 0}
class House(Model):
number = TextField()
street = TextField()
attributes = JSONField(default=house_defaults)
索引
peewee可以通過單列索引和多列索引。可選地包括UNIQUE約束。Peewee還支持對模型和字段的用戶定義約束。
單列索引
單列索引使用字段初始化參數定義。下面的示例在用戶名字段上添加一個惟一索引,在電子郵件字段上添加一個普通索引
class User(Model):
username = CharField(unique=True)
email = CharField(index=True)
在列上添加用戶定義的約束。你可以使用constraints參數。例如,您可能希望指定一個默認值,或者添加一個CHECK約束
class Product(Model):
name = CharField(unique=True)
price = DecimalField(constraints=[Check('price < 10000')])
created = DateTimeField(
constraints=[SQL("DEFAULT (datetime('now'))")])
多列索引
可以使用嵌套元組將多列索引定義為元屬性。每個表的索引是一個2元組,第一部分是索引字段名稱的元組,可以有多個字段,第二部分是一個布爾值,指示索引是否應該唯一。
class Transaction(Model):
from_acct = CharField()
to_acct = CharField()
amount = DecimalField()
date = DateTimeField()
class Meta:
indexes = (
# create a unique on from/to/date
(('from_acct', 'to_acct', 'date'), True),
# create a non-unique on from/to
(('from_acct', 'to_acct'), False),
)
記住,如果索引元組只包含一項,則添加末尾逗號
基本操作 增刪改查
peewee中關於增刪改查的基本操作方法如下:
增
:
create():最常用創建,返回創建實例
save():第一次執行的save是插入,第二次是修改
insert: 插入數據,不創建數據庫實例。返回id
insert_many: 批量插入
bulk_create:批量插入,類似於insert_many。可指定單次插入的數量
batch_commit: 自動添加了一個事務,然后一條條的插入
insert_from: 從另一個表中查詢的數據作為插入的數據
刪除
:
delete().where().execute()
delete_instance() 直接執行刪除了,不用調用execute() 方法
修改
:
save(): 第一次執行的save是插入,第二次是修改
update() 用於多字段更新
查詢
:
Model.get(): 檢索與給定查詢匹配的單個實例。報 Model.DoesNotExist 異常。如果有多條記錄滿足條件,則返回第一條
get_or_none() :與get使用方法相同。區別是找不到結果時不會報錯
get_by_id() :通過主鍵查找,是一種快捷方式
Model['id_num']: 和上面的get_by_id一樣是通過主鍵查找。
get_or_create(): 首先查詢,如果查不到將創建一個新的記錄
select() 查詢多條數據
創建
單條插入
你可以用Model.create()
創建一個新的實例。這個方法接收關鍵字參數,參數要和表定義的字段一致。返回值是新的實例
>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>
批量插入
有幾種方法可以快速加載大量數據,缺乏經驗的做法是在循環中調用Model.create來創建
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
for data_dict in data_source:
MyModel.create(**data_dict)
上面的方法比較慢的原因有幾個:
- 如果沒有在事務中裝飾循環,那么每個對create()的調用都發生在它自己的事務中。這將會非常緩慢
- 必須生成每個InsertQuery並將其解析為SQL
- 需要原生SQL語句傳入到數據庫中解析
- 檢索最后一個insert id,這在某些情況下會導致執行額外的查詢
可以通過一個簡單的裝飾:atomic
來大幅度提高速度
# This is much faster.
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
上面的代碼仍然沒有解決2、3、4這三點。我們可以通過 insert_many
帶來一個大的速度提升。這個方法接收多列元組或字典,然后在一次SQL語句中插入多行數據。
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()
insert_many()
方法還接收多行元組,同時需要提供一個對應的字段。
# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
('val2-1', 'val2-2'),
('val3-1', 'val3-2')]
# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
在裝飾中批量插入是一個好的方法。
# You can, of course, wrap this in a transaction as well:
with db.atomic():
MyModel.insert_many(data, fields=fields).execute()
插入大量數據
在大量數據的插入場景下,根據數據源中的行數,您可能需要將其分解為多個塊。SQLite通常有999或32766的限制
您可以編寫一個循環來將數據批處理成塊(在這種情況下,強烈建議您使用事務)
# Insert rows 100 at a time.
with db.atomic():
for idx in range(0, len(data_source), 100):
MyModel.insert_many(data_source[idx:idx+100]).execute()
peewwee提供了一個chunked函數幫助你高效的將普通可迭代對象拆分成為可批處理對象。
from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
for batch in chunked(data_source, 100):
MyModel.insert_many(batch).execute()
Model.bulk_create()
的行為有點像insert_many(),但是可以用來插入沒有保存的數據庫實例,並且可以指定每次插入的數量。如一共插入345,如果指定了一次插入100條記錄,那么就是4次插入,3 * 100 + 1 * 45
什么叫沒有保存的數據庫實例呢?就是類似於User(username='kk')
,創建的數據庫實例。
# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
# Create a list of unsaved User instances.
users = [User(username=line.strip()) for line in fh.readlines()]
# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
User.bulk_create(users, batch_size=100)
bulk_update()
和bulk_create
類似,可以用來插入沒有保存的數據庫實例,自動添加了一個事務,然后一條條的插入
# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]
# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
User.create(**row)
從另一個表批量裝載
Model.insert_from()
如果要批量插入的數據存儲在另一個表中,還可以創建源為SELECT查詢的INSERT查詢。
res = (TweetArchive
.insert_from(
Tweet.select(Tweet.user, Tweet.message),
fields=[TweetArchive.user, TweetArchive.message])
.execute())
刪除
要刪除單個模型實例,可以使用model.delete_instance()快捷方式。delete_instance()將刪除給定的模型實例,並且可以選擇遞歸地刪除任何依賴對象(通過指定recursive=True)。
刪除一個記錄:Model.delete_instance()
刪除任意記錄:Model.delete()
更新
save()
:單個更新
一旦模型實例有了主鍵,隨后對save()的任何調用都將導致一個UPDATE而不是另一個INSERT。模型的主鍵不會改變
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2
update
:批量更新
接受關鍵字參數,其中鍵對應於模型的字段名稱
>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute() # Returns the number of rows that were updated.
4
查詢
單條記錄查詢
你可以通過Model.get()方法查詢到給條件的數據。如果是通過主鍵查找,也可以用一個快捷方法 Model.get_by_id()。
此方法是使用給定查詢調用Model.select()的快捷方式,但將結果集限制為一行。需要注意的是使用get()方法,如果沒有找到匹配的數據會拋出錯誤:DoesNotExist
get
>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>
>>> User.get_by_id(1) # Same as above.
<__main__.User object at 0x252df10>
>>> User[1] # Also same as above.
<__main__.User object at 0x252dd10>
>>> User.get(User.id == 1).username
u'Charlie'
>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>
>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']
單條記錄查詢方法:
- Model.get()
- Model.get_by_id()
- Model.get_or_none() - if no matching row is found, return None.
- Model.select()
- SelectBase.get()
- SelectBase.first() - return first record of result-set or None.
查詢或創建
Model.get_or_create() 它首先嘗試檢索匹配的行。如果失敗,將創建一個新行。
通常,可以依賴唯一約束或主鍵來防止創建重復對象。例如,假設我們希望使用示例用戶模型實現注冊新用戶帳戶。用戶模型對用戶名字段有唯一的約束,因此我們將依賴數據庫的完整性保證,以確保不會出現重復的用戶名:
try:
with db.atomic():
return User.create(username=username)
except peewee.IntegrityError:
# `username` is a unique column, so this username already exists,
# making it safe to call .get().
return User.get(User.username == username)
上面的例子首先嘗試創建,然后回退到查詢,依靠數據庫來強制執行唯一約束。
如果您希望首先嘗試檢索記錄,可以使用get_or_create()。該函數返回一個2元組,其中包含實例和一個布爾值,該值指示對象是否被創建。
user, created = User.get_or_create(username=username)
person, created = Person.get_or_create(
first_name=first_name,
last_name=last_name,
defaults={'dob': dob, 'favorite_color': 'green'})
查詢多行記錄
可以通過Model.select()獲取多行數據。peewee允許你迭代這些數據,同時也可以索引和切片。
>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']
>>> query[1]
<__main__.User at 0x7f83e80f5550>
>>> query[1].username
'Huey'
>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]
select()是很智能的,在查詢一次的前提下可以多次迭代,切片,下標取值等。
在緩存結果時,同一查詢的后續迭代不會命中數據庫。要禁用此行為(以減少內存使用),請在迭代時調用Select.iterator()。
除了返回模型實例外,Select查詢還可以返回字典、元組和命名元組。根據您的用例,您可能會發現將行作為字典使用更容易
>>> query = User.select().dicts()
>>> for row in query:
... print(row)
{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}
iterator()
:不緩存查詢結果
默認情況下,peewee將緩存迭代Select查詢時返回的行。這是一種優化,允許多次迭代以及索引和切片,而不會導致額外的查詢。但是,當您計划在大量行上進行迭代時,這種緩存可能會有問題。
為了減少內存的消耗,使用iterator()方法。這個方法允許返回結果不緩存數據。使用更少的內存。
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats and serialize.
for stat in stats.iterator():
serializer.serialize_object(stat)
對於簡單的查詢,您可以通過將行作為字典返回來進一步提高速度。namedtuples或元組。以下方法可用於任何Select查詢,以更改結果行類型。
dicts()
namedtuples()
tuples()
objects
: 將多個查詢表放在一個實例中
當對包含多個表中的列的大量行進行迭代時,peewee將為返回的每一行構建查詢模型。對於復雜查詢,此操作可能很慢。例如,如果我們選擇一個tweet列表以及tweet作者的用戶名和頭像,Peewee必須為每一行創建兩個對象(tweet和用戶)。除了上述行類型之外,還有第四個方法objects(),它將作為模型實例返回行,但不會分解模型查詢。
query = (Tweet
.select(Tweet, User) # Select tweet and user data.
.join(User))
# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
print(tweet.user.username, tweet.content)
# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
print(tweet.username, tweet.content)
為了獲得最佳性能,您可以執行查詢,然后使用底層數據庫游標對結果進行迭代。
Database.execute()。接受查詢對象,執行查詢,並返回DB-API 2.0游標對象。光標將返回原始行元組:
query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
print(username, '->', content)
事務
數據庫事務
(Transaction)是一種機制,包含了一組數據庫操作命令
事務把所有的命令作為一個整體一起向系統提交或撤銷操作請求,即這一組數據庫命令要么都執行,要么都不執行,因此事務是一個不可分割的工作邏輯單元。
事務具有 4 個特性,即原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability),這 4 個特性通常簡稱為 ACID。
peewee事務
Peewee實現事務的方法是Database.atomic()
方法,非常簡單
當事務執行成功之后,它會自動commit(),不需要我們手動調。當事務的代碼塊中拋出異常時,它會自動調用rollback(),將數據庫狀態恢復到操作之前,保證要么命令全部執行,要么全部不執行。
Peewee中實現事務有兩種使用方式,一種是將atomic當做Context manager使用,另外一種將atomic當修飾器使用。
Context manager
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
裝飾器
@db.atomic()
def insert_data()
for data_dict in data_source:
MyModel.create(**data_dict)
事務其他特性:
- 除了自動commit()和rollback()之外,也可以手動調用commit()和rollback()方法
- 事務支持嵌套使用
- 在一個事務中對數據庫操作能夠有效減少事務的耗時,增加操作效率
過濾
您可以使用普通的python操作符過濾特定的記錄。
>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
... print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun
>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
... print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00
...
print(tweet.message)
hello world
this is fun
look at this picture of my food
記錄分類
給返回的數據排序,可以使用order_by
1.普通使用
>>> for t in Tweet.select().order_by(Tweet.created_date):
... print(t.pub_date)
2.倒序排列
可以使用desc或者-
號
Tweet.select().order_by(Tweet.created_date.desc())
Tweet.select().order_by(-Tweet.created_date) # Note the "-" prefix.
3.正序排列
User.select().order_by(+User.username)
4.高級使用
對計算值進行排序時,可以包括必要的SQL表達式,也可以引用指定給該值的別名。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username))
您可以使用select子句中使用的相同計數表達式進行訂購。在下面的示例中,我們按tweet ID的COUNT()降序排序:
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(fn.COUNT(Tweet.id).desc()))
或者,可以在select子句中引用指定給計算值的別名。這種方法的優點是易於閱讀。請注意,我們不是直接引用命名別名,而是使用SQL幫助程序對其進行包裝:
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(SQL('num_tweets').desc()))
同樣,也可以使用如上
ntweets = fn.COUNT(Tweet.id)
query = (User
.select(User.username, ntweets.alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(ntweets.desc())
計數
可以使用count來計算返回數量
>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50
分頁
paginate()
方法可以很簡單的獲取一個分頁的數據。paginate有兩個參數:page_number 和 items_per_page。第一個參數是取回數據的頁數;第二個參數是每一頁多少元素。這兩個參數加起來才能完成分頁
>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
... print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19
分頁的功能也可以用limit()
和offset()
來實現
Tweet.select().order_by(Tweet.id).offset(10).limit(10)
offset(10) 跳過10個記錄
limit(10) 取10個記錄
聚合查詢
聚合查詢:對查詢出來的結果進一步處理,包括統計,分組,求最大值,求平均值等。
聚合常用的函數:
COUNT:計算表中的記錄數(行數)
SUM:計算表中數值列中數據的合計值
AVG:計算表中數值列中數據的平均值
MAX:求出表中任意列中數據的最大值
MIN:求出表中任意列中數據的最小值
用於匯總的函數稱為聚合函數或者聚集函數。所謂聚合,就是將多行匯總為一行。實際上,所有的聚合函數都是這樣,輸入多行輸出一行。
聚合函數的使用:
mysql> select * from person;
+----+------+-----+------+-----+
| id | name | age | High | sex |
+----+------+-----+------+-----+
| 1 | ljk | 30 | 177 | 1 |
| 2 | aghj | 23 | 168 | 1 |
+----+------+-----+------+-----+
2 rows in set (0.00 sec)
************************************
* 聚合函數 *
************************************
mysql> select count(*) from person;
+----------+
| count(*) |
+----------+
| 2 |
+----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select sum(age) from person;
+----------+
| sum(age) |
+----------+
| 53 |
+----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select avg(high) from person;
+-----------+
| avg(high) |
+-----------+
| 172.5000 |
+-----------+
1 row in set (0.00 sec)
----------------------------------------
mysql> select max(high) from person;
+-----------+
| max(high) |
+-----------+
| 177 |
+-----------+
1 row in set (0.00 sec)
mysql> select * from person;
+----+------+-----+------+-----+
| id | name | age | High | sex |
+----+------+-----+------+-----+
| 1 | ljk | 30 | 177 | 1 |
| 2 | aghj | 23 | 168 | 1 |
| 3 | 0 | 22 | 165 | 0 |
+----+------+-----+------+-----+
3 rows in set (0.00 sec)
mysql> select avg(High) from person group by sex;
+-----------+
| avg(High) |
+-----------+
| 172.5000 |
| 165.0000 |
+-----------+
2 rows in set (0.00 sec)
# 使用having對分組的數據篩選
mysql> select avg(High) as high from person group by sex having high > 170;
+----------+
| high |
+----------+
| 172.5000 |
+----------+
1 row in set (0.00 sec)
where
:分組之前篩選數據
where 子句的作用是在對查詢結果進行分組前,將不符合where條件的行去掉,即在分組之前過濾數據,where條件中不能包含聚組函數,使用where條件過濾出特定的行。
having
: 對分組之后篩選分組的數據
having 子句的作用是篩選滿足條件的組,即在分組之后過濾數據,條件中經常包含聚組函數,使用having 條件過濾出特定的組,也可以使用多個分組標准進行分組。
總結一下過濾的順序
on->join->where->group by->having
分組
查詢用戶以及每個人擁有的tweet賬號數量。這里使用了group_by,將結果根據User表分類。
query = (User
.select(User, fn.Count(Tweet.id).alias('count'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User))
假設有如下數據庫,一個多對多的關系。
class Photo(Model):
image = CharField()
class Tag(Model):
name = CharField()
class PhotoTag(Model):
photo = ForeignKeyField(Photo)
tag = ForeignKeyField(Tag)
查詢Tag記錄,按照Tag分組,篩選出每組Tag里Photo數量超過5個的記錄。
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
HAVING 子句可以讓我們篩選分組后的各組數據。
HAVING,它與 GROUP BY 配合使用,為聚合操作指定條件。
WHERE 子句只能指定行的條件,而不能指定組的條件。所以當數據分組之后就需要 HAVING 對分組的數據篩選。
具體區別:
- where 用在group_by前,having用在group_by之后。
- 聚合函數(avg、sum、max、min、count),不能作為條件放在where之后,但可以放在having之后
Scalar
對查詢出來的數據做處理
可以通過調用Query.scalar()來檢索標量值。例如
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
您可以通過傳遞來檢索多個標量值
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
窗口
窗口函數是指對作為SELECT查詢一部分處理的數據滑動窗口進行操作的聚合函數。窗口功能可以執行以下操作:
對結果集的子集執行聚合。
計算一個運行總數。
排名結果。
將行值與前面(或后面!)行中的值進行比較。
peewee支持SQL窗口函數,可以通過調用Function.over()並傳入分區或排序參數來創建這些函數。
復雜篩選
peewee支持以下類型的比較
查詢中支持的篩選運算符
Comparison | Meaning |
---|---|
== | x equals y |
< | x is less than y |
<= | x is less than or equal to y |
> | x is greater than y |
>= | x is greater than or equal to y |
!= | x is not equal to y |
<< | x IN y, where y is a list or query |
>> | x IS y, where y is None/NULL |
% | x LIKE y where y may contain wildcards |
** | x ILIKE y where y may contain wildcards |
^ | x XOR y |
~ | Unary negation (e.g., NOT x) |
篩選方法
因為用完了要重寫的操作符,所以有一些額外的查詢操作可以作為方法使用
Method | Meaning |
---|---|
.in_(value) | 查詢在范圍內 |
.not_in(value) | 查詢不在范圍內 |
.is_null(is_null) | 為空或不為空。接受布爾參數 |
.contains(substr) | 通配符搜索子字符串 |
.startswith(prefix) | 查詢以prefix開頭的數據 |
.endswith(suffix) | 查詢以prefix結尾的數據 |
.between(low, high) | 查詢在low和high中間的值 |
.regexp(exp) | 正則表達式匹配匹配的數據,貪婪模式 |
.iregexp(exp) | 正則表達式匹配匹配的數據,非貪婪模式 |
.bin_and(value) | 二進制加 |
.bin_or(value) | 二進制或 |
.concat(other) | Concatenate two strings or objects using ||. |
.distinct() | 標記重復的數據 |
.collate(collation) | 指定具有給定排序規則的列 |
.cast(type) | 將列的值強制轉換為給定類型 |
聯合查詢邏輯操作
使用邏輯操作的聯合查詢
Operator | Meaning | Example |
---|---|---|
& | AND | (User.is_active == True) & (User.is_admin == True) |
| | OR | (User.is_admin) | (User.is_superuser) |
~ | NOT (unary negation) | ~(User.username.contains('admin')) |
# Find the user whose username is "charlie".
User.select().where(User.username == 'charlie')
# Find the users whose username is in [charlie, huey, mickey]
User.select().where(User.username.in_(['charlie', 'huey', 'mickey']))
Employee.select().where(Employee.salary.between(50000, 60000))
Employee.select().where(Employee.name.startswith('C'))
Blog.select().where(Blog.title.contains(search_string))
請注意,實際的比較用括號括起來。 Python 的運算符優先級要求將比較括在括號中。
# Find any users who are active administrations.
User.select().where(
(User.is_admin == True) &
(User.is_active == True))
可能你嘗試使用python語法中的in and or 和not操作,但是在查詢中是不生效的。所有的操作返回都是一個布爾值。
建議如下:
- 用
.in_()
和.not_in()
替換 in和not in
- 用&替換and
- 用|替換or
- 用~替換not
- 用.is_null()替換 is None 或 == None
SQL 方法
SQL方法,如like
,sum
等,可以通過fn
來表達
從peewee中導入fn:from peewee import fn
query = (User
.select(User, fn.COUNT(Tweet.id).alias('tweet_count'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User)
.order_by(fn.COUNT(Tweet.id).desc()))
for user in query:
print('%s -- %s tweets' % (user.username, user.tweet_count))
fn可以表達任何SQL方法,它的參數可以是字段,值,子查詢甚至嵌套函數
基礎使用
- fn.AVG() 返回指定列的平均值,NULL值不包括在計算中。
- fn.SUM() 返回指定列的數目,NULL值不包括在計算中。
- fn.MIN() 返回指定列的最小值,NULL值不包括在計算中。
- fn.MAX() 返回指定列的最大值,NULL值不包括在計算中。
- fn.DATE() 返回指定日期時間格式列的日期格式
- fn.DECIMAL(10, 2) ===> decimal(10,2)中的“2”表示小數部分的位數
進階使用
- fn.to_char() 返回指定列格式化后的字符串 e.g.: fn.to_char(18.88, '99.999') ===> 18.888; fn.to_char(model.field, '')。
- fn.char_length(str) 返回字符串字符數
- fn.array_agg() 接受一組值並返回一個數組。
- fn.array_agg(model.name).order_by(model.id.asc()) # array_agg(name order by id asc)
- fn.rank().over(partition_by=[field1, field2, or aggregation_field1], order_by=[fn.SUM(Booking.slots).desc()]) 實現rank() over(partition by filed order by filed)分區功能。
- fn.length() 返回指定列的長度。也可應用於order_by。e.g.: .order_by(fn.length(model.field).asc())。
- fn.CONCAT() 返回合並的字符串(CONCAT一定要大寫,小寫的concat用法不一樣)。fn.CONCAT(model.id, '-', model.name) ===> '188-張三'
SQL helper
有時,您可能想在sql中傳一些任意的sql語句。您可以使用特殊的SQL類來實現這一點
# We'll query the user table and annotate it with a count of tweets for
# the given user
query = (User
.select(User, fn.Count(Tweet.id).alias('ct'))
.join(Tweet)
.group_by(User))
# Now we will order by the count, which was aliased to "ct"
query = query.order_by(SQL('ct'))
# You could, of course, also write this as:
query = query.order_by(fn.COUNT(Tweet.id))
使用peewee執行手工SQL語句有兩種方法
- Database.execute_sql() 用於執行任何類型的查詢
- RawQuery 執行SELECT查詢並返回模型實例
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
query.execute_sql()
安全和SQL注入
默認情況下,peewee將參數化查詢,因此用戶傳入的任何參數都將被轉義。
請確保將任何用戶定義的數據作為查詢參數傳入,而不是作為實際SQL查詢的一部分傳入:
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s' % (user_data,))
# Good. `user_data` will be treated as a parameter to the query.
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
# Bad! DO NOT DO THIS!
query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))
# Good. `user_data` will be treated as a parameter.
query = MyModel.select().where(SQL('Some SQL expression %s', user_data))
MySQL和Postgresql使用“%s”表示參數。另一方面,SQLite使用“?”。請確保使用適合數據庫的字符。還可以通過檢查Database.param來查找此參數。
小結
個人水平問題翻譯並不是很准確,由於方法太多使用也未給出適當例子。后面有時間挑增刪改查等功能寫詳細操作。
</peewee.sqlitedatabase></peewee.autofield></peewee.charfield></peewee.autofield></peewee.modeloptions>