使用 PEEWEE 斷斷續續的差不多已經三個年頭了,但是沒有像這次使用這么多的特性和功能,所以這次一並記錄一下,需要注意的地方和一些使用細節,之后使用起來可能會更方便。
因為是使用的 SQLAchedemy 的引擎,所以增刪改查的語法也很像。
查找方法
cls 這里默認指類對象了
查找單個會使用:
cls.get_one(cls.user_id == user_id)
查找批量可以使用:
cls.select().where(cls.user_id == user_id)
批量查找會返回一個數組,可以使用 for in 語句迭代他們。另外需要注意的一點是,如果需要指定查詢的內容可以使用:
cls.select(cls.user_id, cls.id).where(cls.user_id == user_id)
里面也可以使用一些聚合函數例如 count sum 之類的
cls.select(cls.user_id, fn.COUNT(cls.id).alias('hahah')).where(xxxxx)
添加數據方法
User.create(username='Charlie')
類似使用這種語法,我們可以構造一個key: value 的字典,最后使用
User.create(**data)
一並寫入即可。
注意,使用這種方法的返回值是有幾條結果受到了影響。如果我們像知道添加了之后的主鍵 id 可以使用:
User.insert(username='Mickey').execute()
這種插入方法會返回 主鍵值。一般我們可以認為是 id 值。
批量插入方法
批量插入提供了很多種方法:
data_source = [ {'field1': 'val1-1', 'field2': 'val1-2'}, {'field1': 'val2-1', 'field2': 'val2-2'}, # ... ] for data_dict in data_source: MyModel.create(**data_dict)
寫到一個數組中,然后遍歷一個一個插。。。親測這是最慢的,如果另外幾種行得通最好不要用。
with db.atomic(): for data_dict in data_source: MyModel.create(**data_dict)
修改剛才那種方法的提交模式,讓他們在同一個事務里面提交以此來提高速度。這次我的場景其實對插入速度要求很高,嘗試了這種方法,但是我每次插入500條雖然速度是提高了不少,但是當我寫到2000條的時候就報了一個事務的錯誤,在網上搜了一下也無果。所以對於量很大的插入,感覺這個方法也不是很好。。。或者歡迎來人指出的我使用姿勢的問題。
另外的姿勢:
# Fastest. MyModel.insert_many(data_source).execute() # Fastest using tuples and specifying the fields being inserted. fields = [MyModel.field1, MyModel.field2] data = [('val1-1', 'val1-2'), ('val2-1', 'val2-2'), ('val3-1', 'val3-2')] MyModel.insert_many(data, fields=fields).execute() # You can, of course, wrap this in a transaction as well: with db.atomic(): MyModel.insert_many(data, fields=fields).execute()
第一種方法就是一個 list 然后里面是字典,只要你能保證這個結構,就可以進行批量插入。親測速度不錯,也沒有遇到事務報錯的問題。這次使用的就是這個方法,下面第二種第三種寫法,感覺構造起來不是很友好。所以沒有使用,有興趣的朋友可以嘗試。
修改數據方法
修改數據方法提供了普通的更新和原子更新方法。
普通的更新:
>>> today = datetime.today() >>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today) >>> query.execute() # Returns the number of rows that were updated. 4
原子更新:
>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()
>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id) >>> update = User.update(num_tweets=subquery) >>> update.execute()
還有一種 upsert 操作。如果指定鍵存在就執行更新操作,如果不存在執行插入操作。
下面提供了兩種寫法。
# Insert or update the user. The "last_login" value will be updated # regardless of whether the user existed previously. user_id = (User .replace(username='the-user', last_login=datetime.now()) .execute()) # This query is equivalent: user_id = (User .insert(username='the-user', last_login=datetime.now()) .on_conflict_replace() .execute())
另外 mysql 還提供了一種獨有的語法 ON DUPLICATE KEY UPDATE 可以使用以下方法實現。
class User(Model): username = TextField(unique=True) last_login = DateTimeField(null=True) login_count = IntegerField() # Insert a new user. User.create(username='huey', login_count=0) # Simulate the user logging in. The login count and timestamp will be # either created or updated correctly. now = datetime.now() rowid = (User .insert(username='huey', last_login=now, login_count=1) .on_conflict( preserve=[User.last_login], # Use the value we would have inserted. update={User.login_count: User.login_count + 1}) .execute())
上面的例子要注意 username 申明的 unique=true 如果模型上不申明可能會報錯。
刪除方法
刪除方法跟普通的 orm 似乎設計得很像。要么獲得一個 instance 然后調用 delete() 方法進行刪除,要么就是直接條件查詢直接刪除,來看例子。
>>> user = User.get(User.id == 1) >>> user.delete_instance() # Returns the number of rows deleted.
批量刪除
>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago) >>> query.execute() # Returns the number of rows deleted. 7
聯表查詢方法
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
其實 peewee 提供的方法就非常獨立和干凈。你可以按照自己的需求構造足夠復雜的 sql ,語句也比較清晰。需要注意的是這幾點,在聯表查詢的時候,select()函數里面需要寫上自己需要的返回值,否則聯表之后會發現竟然還是只有自己 cls 的字段,就會感覺到很疑惑。
query = User\ .select(User, UserDatum, Card7daysRecv)\ .join(UserDatum, JOIN.LEFT_OUTER, on=(User.uid == UserDatum.user_id))\ .join(Card7daysRecv, JOIN.LEFT_OUTER, on=(User.uid == Card7daysRecv.receive_uid)).order_by(User.create_time).limit(limit).offset(offset)
另外一些使用的時候需要注意的問題
在遇到復雜查詢的時候有個不可缺少的工具就是打印以下自己的 orm 究竟構造了什么語句。可以使用 query.sql() 方便的看到。
談下在 model 構造上的時候遇到的一些問題。
如果要往模型里面插數據,建議最好還是指定好 主鍵和默認值,以免讀取模型的時候報錯。需要的字段一定要在模型里面寫上,否則拿的時候會發現沒有這個字段。
class HdNewUserInfo(AliyunModel): class Meta: db_table = 'hd_new_user_info' _version = 0 user_id = CharField(64, default='', primary_key=True) phone = CharField(32, default='') phone_province = CharField(64, default='') phone_city = CharField(64, default='') phone_community_id = CharField(32, default='') name = CharField(64, default='') head_img = CharField(1024, default='') student_no = CharField(36, default='')
這里面其實還有個問題,我們在 mysql 5.6 5.7 還是會經常使用 timestamp 字段,並且使用它的默認當前時間字段。這在 peewee 里面的映射好像有點問題,如果我們使用 peewee 里面的 TimestampField 申明我們的字段,在讀區和操作比較的時候可能會出現一些問題,會被調試需要的是 float 字段。但是 如果你使用 time.time() 類似的 float 去比較也會報出別的錯誤。但是使用 DateTimeField 卻沒有這個問題,所以我選擇使用了 DateTimeField。
另外還有一個 database gone away 的問題。注意構造好自己的 db
class Off(Model): class Meta: database = db.offline_db_08_yanzhi @classmethod def get_one(cls, *query, **kwargs): try: return cls.get(*query, **kwargs) except DoesNotExist: return None
初始化自己的 db。並且構造一個可以重連的 retrydb
class RetryOperationalError(object): def execute_sql(self, sql, params=None, commit=True): try: cursor = super(RetryOperationalError, self).execute_sql( sql, params, commit) except OperationalError: if not self.is_closed(): self.close() with __exception_wrapper__: cursor = self.cursor() cursor.execute(sql, params or ()) if commit and not self.in_transaction(): self.commit() return cursor class MyRetryDB(RetryOperationalError, MySQLDatabase): pass
這次玩數據暫時就遇到這些問題,以后再來補充。
Reference:
https://github.com/coleifer/peewee/issues/114 Print SQL queries
http://docs.peewee-orm.com/en/latest/peewee/query_examples.html#retrieve-the-start-times-of-members-bookings 聯表查 example
https://stackoverflow.com/questions/15559468/why-is-peewee-including-the-id-column-into-the-mysql-select-query column into id 1 報錯解決