python peewee模塊執行原生sql語句的方法


由於公司要求使用peewee模塊,不能使用pymysql,而我又不是特別喜歡ORM的方式,很喜歡原生sql,所以,我決定重新封裝peewee

 

代碼如下:

from peewee import MySQLDatabase


class base_peewee(object):
    def __init__(self, host=None, port=3306, user=None, password=None, db_name=None):
        self.db_host = host
        self.db_port = int(port)
        self.user = user
        self.password = str(password)
        self.db = db_name
        self.conn = None
        self.cursor = None

    def connect(self):
        # self.conn = MySQLDatabase(
        #     host=self.db_host,
        #     port=self.db_port,
        #     user=self.user,
        #     passwd=self.password,
        #     database=self.db,
        #     charset="utf8"
        # )
        self.conn = MySQLDatabase(
            host="127.0.0.1",
            port=3306,
            user="root",
            passwd="123123qwe",
            database='test',
            charset="utf8"
        )
        self.cursor = self.conn.cursor()


class ReDefinedPeeWee(base_peewee):

    def __init__(self):
        super(ReDefinedPeeWee, self).__init__()
        self.connect()

    def commit(self):
        self.conn.commit()

    def rollback(self):
        self.conn.rollback()

    def insert_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def update_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def delete_sql(self, sql, value=None, commit=None):
        self.cursor.execute(sql, value)
        if commit:
            self.commit()

    def selectone_sql(self, sql, columns=None):
        """

        :param sql:
        :param columns: ['id', 'name'...] 要求sql的返回數據相同
        :return:
        """
        self.cursor.execute(sql)
        self.conn.commit()
        if not columns:
            return self.cursor.fetchone()
        else:
            data = self.cursor.fetchone()
            if data and len(data) == len(columns):
                return dict(zip(columns, data))
            else:
                return data

    def selectall_sql(self, sql, columns=None):
        self.cursor.execute(sql)
        self.conn.commit()
        if not columns:
            return self.cursor.fetchall()
        else:
            data = self.cursor.fetchall()

            if len(data) > 0 and len(data[0]) == len(columns):
                return [dict(zip(columns, i)) for i in data]
            else:
                return data

    def select_sql(self, sql, value=None, columns=None):
        self.cursor.execute(sql, value)
        self.conn.commit()
        return self.cursor.fetchall()

    def close(self):
        self.cursor.close()
        self.conn.close()
        self.conn = None
        self.cursor = None


def main():
    ret = ReDefinedPeeWee()
    res = ret.selectone_sql("select * from test", )
    print(res)
    res1 = ret.selectone_sql("select * from test", ["id", 'name', "num"])
    print(res1)
    ret.close()


if __name__ == '__main__':
    main()
View Code

 

 

如果用peewee寫原生的方式就是這么玩

from peewee import MySQLDatabase, Model
from marshmallow import Schema, fields
from peewee import PrimaryKeyField, IntegerField, CharField, FloatField, DoubleField

# MYSQL 配置

# db = MySQLDatabase('test',
#                    user='root',
#                    password='123123qwe',
#                    host='127.0.0.1',
#                    port=3306)


db = MySQLDatabase('',
                   user='root',
                   password='',
                   host='',
                   port=37214
                   )


class BaseModel(Model):
    class Meta:
        database = db


class TestFactor(BaseModel):
    id = PrimaryKeyField()
    type = IntegerField(verbose_name="類型")
    name = CharField(verbose_name="姓名")
    num = FloatField(verbose_name="浮點")

    class Meta:
        database = db
        # order_by = ('id',)
        db_table = 'test1'


class TestFactor_(Schema):
    id = fields.Integer()
    type = fields.Integer()
    name = fields.String()
    num = fields.Float()
    name_level = fields.Method('get_name_level')

    def get_name_level(self, item):
        if item.type == 1:
            status = '正常'
        elif item.type == 2:
            status = "低危"
        elif item.type == 3:
            status = "高危"
        else:
            status = "正常"
        return status


# 健康管理監測值存儲類
class HealthHouseKeeperMonitoringValue(BaseModel):
    id = PrimaryKeyField()
    user_id = IntegerField(verbose_name="用戶ID")
    type_id = IntegerField(verbose_name="類型")
    monitoring_value = CharField(verbose_name="監測值")
    report_filepath = CharField(verbose_name="文檔路徑")
    create_time = IntegerField(verbose_name="創建時間")
    update_time = IntegerField(verbose_name="更新時間")
    status = IntegerField()

    class Meta:
        order_by = ('id',)
        db_table = 'wechat_health_housekeeper_monitoringvalue'


class HealthHouseKeeperMonitoringValueSerializer(Schema):
    id = fields.Integer()
    user_id = fields.Integer()
    type_id = fields.Integer()
    type_level = fields.Method("get_type_level")
    monitoring_value = fields.String()
    report_filepath = fields.String()
    create_time = fields.Integer()
    update_time = fields.Integer()
    status = fields.Integer()

    def get_type_level(self, item):
        if item.type_id == 1:
            status = '血壓'
        elif item.type_id == 2:
            status = "心率"
        elif item.type_id == 3:
            status = "低密度脂蛋白膽固醇"
        elif item.type_id == 4:
            status = "空腹血糖"
        elif item.type_id == 5:
            status = "甘油三酯"
        elif item.type_id == 6:
            status = "糖化血紅蛋白"
        elif item.type_id == 7:
            status = "總膽固醇"
        elif item.type_id == 8:
            status = "BMI"
        return status


def test():
    # get方法-單條數據
    # detail = TestFactor.get() # 只查一條,沒有則報錯
    # data = TestFactor_(many=False) # 展示數據
    # print(data.dump(detail))

    # get_or_none方法-單挑數據
    # detail = TestFactor.get_or_none() # 沒有不報錯,只查詢一條
    # data = TestFactor_(many=False) # 展示數據
    # print(data.dump(detail))

    # 多條數據
    # detail = TestFactor.select()
    # many=False表示只有一條, exclude表示不展示某些列, only表示只展示某些列
    # data = TestFactor_(many=False, exclude=[], only=())
    # data = TestFactor_(many=False, exclude=[])
    # print([data.dump(i) for i in detail])

    detail = HealthHouseKeeperMonitoringValue.select().where(
        HealthHouseKeeperMonitoringValue.user_id == 180,
        HealthHouseKeeperMonitoringValue.status == 1,
        HealthHouseKeeperMonitoringValue.type_id == 1,
    ).order_by(HealthHouseKeeperMonitoringValue.create_time.desc())
    print(detail)
    data_serial = HealthHouseKeeperMonitoringValueSerializer()
    print(len(detail))
    print(data_serial.dump(detail[0]))
    # user_detail = RiskFactorSerializers(many=False, exclude=['create_time', 'id', 'chronic_id', 'user_id'])


if __name__ == '__main__':
    test()
View Code

 

用ORM的Peewee模塊查詢返回字典模式,哈哈哈,無意發現的,啦啦啦,如果你用到了請點贊+關注,謝謝

basemodel

class CaseAdjusterResult(BaseModel):
    """
    理算結果(后台)
    """
    id = PrimaryKeyField()
    case_id = IntegerField()
    assurer_id = IntegerField()
    case_no = CharField()
    adjuster_id = IntegerField()
    fee_adjuster_id = IntegerField()
    self_charge_1 = FloatField()
    self_charge_2 = FloatField()
    self_charge = FloatField()
    year_first_hospital_self_charge_1 = FloatField()
    year_first_hospital_self_charge_2 = FloatField()
    year_first_hospital_self_charge = FloatField()
    year_no_first_hospital_self_charge_1 = FloatField()
    year_no_first_hospital_self_charge_2 = FloatField()
    year_no_first_hospital_self_charge = FloatField()
    year_hospital_self_charge_1 = FloatField()
    year_hospital_self_charge_2 = FloatField()
    year_hospital_self_charge = FloatField()
    limit_charge = FloatField()
    injury = FloatField()
    special_disease_self_charge_1 = FloatField()
    special_disease_self_charge_2 = FloatField()
    special_disease_self_charge = FloatField()
    serious_illness_self_charge_1 = FloatField()
    serious_illness_self_charge_2 = FloatField()
    serious_illness_self_charge = FloatField()
    work_injury_self_charge_1 = FloatField()
    work_injury_self_charge_2 = FloatField()
    work_injury_self_charge = FloatField()
    special_limit_charge = FloatField()
    status = IntegerField()
    create_time = IntegerField()
    update_time = IntegerField()

    class Meta:
        order_by = ('id',)
        db_table = 'case_adjuster_result'
View Code

查詢代碼,返回的結果是字典的形式

 

 
         
case_no = "wechat_20200817271"
# 一條數據這么玩
# case_adjuster_result = CaseAdjusterResult.get(CaseAdjusterResult.case_no == case_no)
# data = case_adjuster_result.__dict__["__data__"]  ---》就是這句,哈哈哈哈,可以直接拿到字典類型的數據
# print(data)
# 多條數據這么玩
case_adjuster_result = CaseAdjusterResult.select()    --> 這個地方也可以select().get()看源碼就是這么玩的
for son_sql_obj in case_adjuster_result:
data = son_sql_obj.__dict__.get("__data__", {})
print(data)

  
  如果你用的,請點贊 + 評論 + 關注,一件三聯,哈哈哈哈

  呸,好不要臉,沒辦法,程序員的快樂也就這么點了:)

 

 

peewee 怎么模擬SQL里面的select count(1) as count from table group by的寫法

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from hyh.model.model import XueShuPaper
from peewee import fn

data = XueShuPaper.select(XueShuPaper.technology_crawler_keyword, fn.COUNT(XueShuPaper.id).alias("count")).group_by(XueShuPaper.technology_crawler_keyword)
aaa = []
print(data)
for i in data:
    new_son_data = {
        "crawler_keyword": i.__dict__.get("__data__").get("technology_crawler_keyword"),
        "count": i.__dict__.get("count")
    }
    print(new_son_data)

 

 

問題,由於出現了tornado + peewee阻塞的情況

這里注意個問題,如果大批量訪問,長時間訪問,就會出現訪問沒有數據,阻塞的情況,因為一般大家tornado都是用同步的形式

那解決方案呢

先說為什么

peewee 的連接池,使用時需要顯式的關閉連接。下面先說下為什么,最后會給出推薦的使用方法,避免進坑。

為什么要顯式的關閉連接

Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.

這里引用官方文檔的提示。大致說:“超時連接不會自動關閉,只會在有新的請求時是才會關閉”。這里的request是指‘web 框架處理的請求’,peewee 源碼片段:

 

根據 pool 庫中的 _connect 方法的代碼可知:每次在建立數據庫連接時,會檢查連接實例是否超時。但是需要注意一點:使用中的數據庫連接實例(_in_use dict中的數據庫連接實例) 是不會在創建數據庫連接時,檢查是否超時的

因為這段代碼中,每次創建連接實例,都是在 _connections(pool) 取實例,如果有的話就判斷是否超時;如果沒有的話就新建。

然而,使用中的數據庫連接並不在 _connections 中,所以每次創建數據庫連接實例時,並沒有檢測使用中的數據庫連接實例是否超時。

只有調用連接池實例的 _close 方法。執行這個方法后,才會把使用后的連接實例放回到 _connections (pool)。

 

 如果不顯式的關閉連接,會出現的問題

如果不調用_close方法的話,使用后 的數據庫連接就一直不會關閉(兩個含義:回到pool中和關閉數據庫連接),這樣會造成兩個問題:

  1. 每次都是新建數據庫連接,因為 pool 中沒有數據庫連接實例。會導致稍微有一點並發量就會返回Exceeded maximum connections.錯誤
  2. MySQL也是有 timeout 的,如果一個連接長時間沒有請求的話,MySQL Server 就會關閉這個連接,但是,peewee的已建立(后面會解釋為什么特指已建立的)的連接實例,並不知道 MySQL Server 已經關閉了,再去通過這個連接請求數據的話,就會返回 Error 2006: “MySQL server has gone away”錯誤,根據官方文檔

 

 

解決方案

  在return的地方,進行一個判斷
if not database.is_closed(): database.close()

 

已經進行了代碼改進,但是還沒有進行大批量測試

參考連接:https://www.cnblogs.com/xueweihan/p/6698456.html   ,這里說一聲感謝

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM