由於公司要求使用peewee模塊,不能使用pymysql,而我又不是特別喜歡ORM的方式,很喜歡原生sql,所以,我決定重新封裝peewee
代碼如下:

from peewee import MySQLDatabase class base_peewee(object): def __init__(self, host=None, port=3306, user=None, password=None, db_name=None): self.db_host = host self.db_port = int(port) self.user = user self.password = str(password) self.db = db_name self.conn = None self.cursor = None def connect(self): # self.conn = MySQLDatabase( # host=self.db_host, # port=self.db_port, # user=self.user, # passwd=self.password, # database=self.db, # charset="utf8" # ) self.conn = MySQLDatabase( host="127.0.0.1", port=3306, user="root", passwd="123123qwe", database='test', charset="utf8" ) self.cursor = self.conn.cursor() class ReDefinedPeeWee(base_peewee): def __init__(self): super(ReDefinedPeeWee, self).__init__() self.connect() def commit(self): self.conn.commit() def rollback(self): self.conn.rollback() def insert_sql(self, sql, value=None, commit=None): self.cursor.execute(sql, value) if commit: self.commit() def update_sql(self, sql, value=None, commit=None): self.cursor.execute(sql, value) if commit: self.commit() def delete_sql(self, sql, value=None, commit=None): self.cursor.execute(sql, value) if commit: self.commit() def selectone_sql(self, sql, columns=None): """ :param sql: :param columns: ['id', 'name'...] 要求sql的返回數據相同 :return: """ self.cursor.execute(sql) self.conn.commit() if not columns: return self.cursor.fetchone() else: data = self.cursor.fetchone() if data and len(data) == len(columns): return dict(zip(columns, data)) else: return data def selectall_sql(self, sql, columns=None): self.cursor.execute(sql) self.conn.commit() if not columns: return self.cursor.fetchall() else: data = self.cursor.fetchall() if len(data) > 0 and len(data[0]) == len(columns): return [dict(zip(columns, i)) for i in data] else: return data def select_sql(self, sql, value=None, columns=None): self.cursor.execute(sql, value) self.conn.commit() return self.cursor.fetchall() def close(self): self.cursor.close() self.conn.close() self.conn = None self.cursor = None def main(): ret = ReDefinedPeeWee() res = ret.selectone_sql("select * from test", ) print(res) res1 = ret.selectone_sql("select * from test", ["id", 'name', "num"]) print(res1) ret.close() if __name__ == '__main__': main()
如果用peewee寫原生的方式就是這么玩

from peewee import MySQLDatabase, Model from marshmallow import Schema, fields from peewee import PrimaryKeyField, IntegerField, CharField, FloatField, DoubleField # MYSQL 配置 # db = MySQLDatabase('test', # user='root', # password='123123qwe', # host='127.0.0.1', # port=3306) db = MySQLDatabase('', user='root', password='', host='', port=37214 ) class BaseModel(Model): class Meta: database = db class TestFactor(BaseModel): id = PrimaryKeyField() type = IntegerField(verbose_name="類型") name = CharField(verbose_name="姓名") num = FloatField(verbose_name="浮點") class Meta: database = db # order_by = ('id',) db_table = 'test1' class TestFactor_(Schema): id = fields.Integer() type = fields.Integer() name = fields.String() num = fields.Float() name_level = fields.Method('get_name_level') def get_name_level(self, item): if item.type == 1: status = '正常' elif item.type == 2: status = "低危" elif item.type == 3: status = "高危" else: status = "正常" return status # 健康管理監測值存儲類 class HealthHouseKeeperMonitoringValue(BaseModel): id = PrimaryKeyField() user_id = IntegerField(verbose_name="用戶ID") type_id = IntegerField(verbose_name="類型") monitoring_value = CharField(verbose_name="監測值") report_filepath = CharField(verbose_name="文檔路徑") create_time = IntegerField(verbose_name="創建時間") update_time = IntegerField(verbose_name="更新時間") status = IntegerField() class Meta: order_by = ('id',) db_table = 'wechat_health_housekeeper_monitoringvalue' class HealthHouseKeeperMonitoringValueSerializer(Schema): id = fields.Integer() user_id = fields.Integer() type_id = fields.Integer() type_level = fields.Method("get_type_level") monitoring_value = fields.String() report_filepath = fields.String() create_time = fields.Integer() update_time = fields.Integer() status = fields.Integer() def get_type_level(self, item): if item.type_id == 1: status = '血壓' elif item.type_id == 2: status = "心率" elif item.type_id == 3: status = "低密度脂蛋白膽固醇" elif item.type_id == 4: status = "空腹血糖" elif item.type_id == 5: status = "甘油三酯" elif item.type_id == 6: status = "糖化血紅蛋白" elif item.type_id == 7: status = "總膽固醇" elif item.type_id == 8: status = "BMI" return status def test(): # get方法-單條數據 # detail = TestFactor.get() # 只查一條,沒有則報錯 # data = TestFactor_(many=False) # 展示數據 # print(data.dump(detail)) # get_or_none方法-單挑數據 # detail = TestFactor.get_or_none() # 沒有不報錯,只查詢一條 # data = TestFactor_(many=False) # 展示數據 # print(data.dump(detail)) # 多條數據 # detail = TestFactor.select() # many=False表示只有一條, exclude表示不展示某些列, only表示只展示某些列 # data = TestFactor_(many=False, exclude=[], only=()) # data = TestFactor_(many=False, exclude=[]) # print([data.dump(i) for i in detail]) detail = HealthHouseKeeperMonitoringValue.select().where( HealthHouseKeeperMonitoringValue.user_id == 180, HealthHouseKeeperMonitoringValue.status == 1, HealthHouseKeeperMonitoringValue.type_id == 1, ).order_by(HealthHouseKeeperMonitoringValue.create_time.desc()) print(detail) data_serial = HealthHouseKeeperMonitoringValueSerializer() print(len(detail)) print(data_serial.dump(detail[0])) # user_detail = RiskFactorSerializers(many=False, exclude=['create_time', 'id', 'chronic_id', 'user_id']) if __name__ == '__main__': test()
用ORM的Peewee模塊查詢返回字典模式,哈哈哈,無意發現的,啦啦啦,如果你用到了請點贊+關注,謝謝
basemodel

class CaseAdjusterResult(BaseModel): """ 理算結果(后台) """ id = PrimaryKeyField() case_id = IntegerField() assurer_id = IntegerField() case_no = CharField() adjuster_id = IntegerField() fee_adjuster_id = IntegerField() self_charge_1 = FloatField() self_charge_2 = FloatField() self_charge = FloatField() year_first_hospital_self_charge_1 = FloatField() year_first_hospital_self_charge_2 = FloatField() year_first_hospital_self_charge = FloatField() year_no_first_hospital_self_charge_1 = FloatField() year_no_first_hospital_self_charge_2 = FloatField() year_no_first_hospital_self_charge = FloatField() year_hospital_self_charge_1 = FloatField() year_hospital_self_charge_2 = FloatField() year_hospital_self_charge = FloatField() limit_charge = FloatField() injury = FloatField() special_disease_self_charge_1 = FloatField() special_disease_self_charge_2 = FloatField() special_disease_self_charge = FloatField() serious_illness_self_charge_1 = FloatField() serious_illness_self_charge_2 = FloatField() serious_illness_self_charge = FloatField() work_injury_self_charge_1 = FloatField() work_injury_self_charge_2 = FloatField() work_injury_self_charge = FloatField() special_limit_charge = FloatField() status = IntegerField() create_time = IntegerField() update_time = IntegerField() class Meta: order_by = ('id',) db_table = 'case_adjuster_result'
查詢代碼,返回的結果是字典的形式
case_no = "wechat_20200817271"
# 一條數據這么玩
# case_adjuster_result = CaseAdjusterResult.get(CaseAdjusterResult.case_no == case_no)
# data = case_adjuster_result.__dict__["__data__"] ---》就是這句,哈哈哈哈,可以直接拿到字典類型的數據
# print(data)
# 多條數據這么玩
case_adjuster_result = CaseAdjusterResult.select() --> 這個地方也可以select().get()看源碼就是這么玩的
for son_sql_obj in case_adjuster_result:
data = son_sql_obj.__dict__.get("__data__", {})
print(data)
如果你用的,請點贊 + 評論 + 關注,一件三聯,哈哈哈哈
呸,好不要臉,沒辦法,程序員的快樂也就這么點了:)
peewee 怎么模擬SQL里面的select count(1) as count from table group by的寫法
#!/usr/bin/env python # -*- coding:utf-8 -*- from hyh.model.model import XueShuPaper from peewee import fn data = XueShuPaper.select(XueShuPaper.technology_crawler_keyword, fn.COUNT(XueShuPaper.id).alias("count")).group_by(XueShuPaper.technology_crawler_keyword) aaa = [] print(data) for i in data: new_son_data = { "crawler_keyword": i.__dict__.get("__data__").get("technology_crawler_keyword"), "count": i.__dict__.get("count") } print(new_son_data)
問題,由於出現了tornado + peewee阻塞的情況
這里注意個問題,如果大批量訪問,長時間訪問,就會出現訪問沒有數據,阻塞的情況,因為一般大家tornado都是用同步的形式
那解決方案呢
先說為什么
peewee 的連接池,使用時需要顯式的關閉連接。下面先說下為什么,最后會給出推薦的使用方法,避免進坑。
為什么要顯式的關閉連接
Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.
這里引用官方文檔的提示。大致說:“超時連接不會自動關閉,只會在有新的請求時是才會關閉”。這里的request
是指‘web 框架處理的請求’,peewee 源碼片段:
根據 pool 庫中的 _connect
方法的代碼可知:每次在建立數據庫連接時,會檢查連接實例是否超時。但是需要注意一點:使用中的數據庫連接實例(_in_use dict中的數據庫連接實例) 是不會在創建數據庫連接時,檢查是否超時的。
因為這段代碼中,每次創建連接實例,都是在 _connections
(pool) 取實例,如果有的話就判斷是否超時;如果沒有的話就新建。
然而,使用中的數據庫連接並不在 _connections
中,所以每次創建數據庫連接實例時,並沒有檢測使用中的數據庫連接實例是否超時。
只有調用連接池實例的 _close
方法。執行這個方法后,才會把使用后的連接實例放回到 _connections
(pool)。
如果不顯式的關閉連接,會出現的問題
如果不調用_close
方法的話,使用后 的數據庫連接就一直不會關閉(兩個含義:回到pool中和關閉數據庫連接),這樣會造成兩個問題:
- 每次都是新建數據庫連接,因為 pool 中沒有數據庫連接實例。會導致稍微有一點並發量就會返回
Exceeded maximum connections.
錯誤 - MySQL也是有 timeout 的,如果一個連接長時間沒有請求的話,MySQL Server 就會關閉這個連接,但是,peewee的已建立(后面會解釋為什么特指已建立的)的連接實例,並不知道 MySQL Server 已經關閉了,再去通過這個連接請求數據的話,就會返回
Error 2006: “MySQL server has gone away”
錯誤,根據官方文檔
解決方案
在return的地方,進行一個判斷
if not database.is_closed(): database.close()
已經進行了代碼改進,但是還沒有進行大批量測試
參考連接:https://www.cnblogs.com/xueweihan/p/6698456.html ,這里說一聲感謝