Python - 模塊 peewee


peewee是一款ORM
詳細文檔:https://peewee.readthedocs.io/en/latest/index.html

 

from peewee import *
from datetime import date

database = MySQLDatabase('peewee_test', host='127.0.0.1', port=3306, user='root', password='12345678')


class Person(Model):
    name = CharField(unique=True)
    birthday = DateField()

    class Meta:
        table_name = "hello"
        database = database  # This model uses the "people.db" database.
        indexes = (
            (('name', 'birthday'), True),
        )


class Pet(Model):
    owner = ForeignKeyField(Person, backref='pets')  #
    name = CharField()
    animal_type = CharField()

    class Meta:
        database = database  # this model uses the "people.db" database
        primary_key = False  # 沒有主鍵的模型


# database.connect()
def init():
    # Person.create_table(True)
    # database.create_tables([Person, Pet]) # 會自動解決模型間的依賴關系
    # with database:
    with database.connection_context():
        """
        在包裝的代碼塊期間打開連接。
        """
        database.create_tables([Person])
        database.create_tables([Pet])

    # @db.connection_context()也可用作裝飾器


def drop():
    database.drop_tables([Person, Pet])


def create():
    Person.get_or_create(name='xxx', defaults={'birthday': "2015-11-25"})  # 查找有沒有叫xxx的;沒有創建
    # uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15))
    # # uncle_bob = Person(name='Bob', birthday="2019-04-11")
    # a = uncle_bob.save()
    # print(a, type(a))  # 返回修改的行數
    # grandma = Person.create(name='Grandma', birthday=date(1935, 3, 1))
    # print(grandma, type(grandma), grandma.name)  # 7 <Model: Person>
    # grandma.name = 'lisi'
    # grandma.save()
    # 外鍵存儲

    # try:
    #     with database.atomic():
    #         # herb_mittens_jr = Pet.create(owner=uncle_bob, name='Mittens Jr', animal_type='cat')
    #         herb_mittens_jr = Pet.create(owner=9, name='Mittens Jr', animal_type='cat')
    # except IntegrityError as e:
    #     print(e)

    # print(herb_mittens_jr, type(herb_mittens_jr))
    # 批量創建
    Person.insert_many([('a', '2019-11-21'), ('b', '2019-11-22')], [Person.name, Person.birthday]).execute()
    # 以下僅提供思路
    # # 或者開啟一個事物,再里面create;這樣會一次性提交
    # from peewee import chunked
    # data_source = [
    #     {'field1': 'val1-1', 'field2': 'val1-2'},
    #     {'field1': 'val2-1', 'field2': 'val2-2'},
    # ]
    # with database.atomic():
    #     for batch in chunked(data_source, 100):
    #         Person.insert_many(batch).execute()
    # with db.atomic():
    #     Person.bulk_create([persons,], batch_size=100)

    # Person.insert_from(
    #     Person.select(Person.user, Person.message),
    #     fields=[Person.user, Person.message]
    # ).execute()


def delete():
    # grandma = Person.get(Person.id==1)
    # grandma.delete_instance()
    # Person.delete_by_id(6)

    Person.delete().where(Person.id >= 5).execute()


def update():
    herb_mittens_jr = Pet.create(owner=9, name='Mittens Jr', animal_type='cat')
    herb_mittens_jr.owner = 8
    herb_mittens_jr.save()
    Pet.update().where(Pet.id == 1)
    Pet.update(counter=Pet.counter + 1)
    # First, create 3 users with name u1, u2, u3.
    u1, u2, u3 = [Person.create(name='u%s' % i) for i in (1, 2, 3)]

    # Now we'll modify the user instances.
    u1.name = 'u1-x'
    u2.usernamename = 'u2-y'
    u3.username = 'u3-z'

    # Update all three users with a single UPDATE query.
    Person.bulk_update([u1, u2, u3], fields=[Person.name], batch_size=50)  # 大量數據應該指定batch_size

    ######################
    # subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
    # User.update(num_tweets=subquery).execute()
    # fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg')
    # fn.SUM(Sample.value).filter(Sample.counter != 2).over(order_by=[Sample.id]).alias('csum')


def get():
    # 獲取一條數據
    try:
        grandma = Person.select().where(Person.name == 'Grandma').get()  # get獲取不到,會拋出異常
        grandma = grandma.dicts()
        print(Person[1])  # id
        print(Person.get_by_id(1))  # id
        print(Person.first())
        print(grandma, type(grandma))
    except DoesNotExist:
        print("不存在")
    # 獲取一些數據
    for pet in Pet.select().where(Pet.name != '').iterator():  # 不能寫空的過濾條件
        print(pet.owner.name, pet.name)  # 在執行pet.owner.name會導致N+1次查詢
    # 連表查詢
    # for pet in Pet.select(Pet, Person).join(Person).where(Pet.animal_type == 'cat'):
    # for pet in Pet.select().join(Person, on=Person.pets).where(Pet.animal_type == 'cat'):
    #     print(pet.name, pet.owner.name)
    # 組合式過濾
    # for person in Person.select().where((Person.birthday < "2019-12-12") | (Person.birthday > "2019-01-11")):
    # for person in Person.select().where(Person.birthday.between("2019-01-11", "2019-12-12")):
    #     print(person.name, person.birthday)
    # per = Person.select().where(Person.id.in_((1, 6)))
    # for item in per:
    #     print(item.name)


def cross_table():
    # print(Person.select(fn.MIN(Person.birthday), fn.MAX(Person.birthday)).scalar(as_tuple=True))
    # (datetime.date(2019, 4, 15), datetime.date(2019, 12, 2))

    # expression = fn.Lower(fn.Substr(Person.name, 1, 1)) == 'g'
    """
    查找名稱以大寫或小寫G開頭的所有人員
    """
    # for person in Person.select().where(expression):
    #     print(person.name)

    # # Pet.select().join(Person).where(Pet.animal_type == 'cat')
    # query = Person.select(Person, fn.COUNT(Pet.id).alias('pet_count')).join(Pet, JOIN.LEFT_OUTER).group_by(
    #     Person).order_by(Person.name)
    # for person in query:
    #     # "pet_count" becomes an attribute on the returned model instances.
    #     print(person.name, person.pet_count, 'pets')
    """
        fn(),可用於調用任何SQL函數。
        fn.COUNT(Pet.id).alias('pet_count') 將轉換為。COUNT(pet.id) AS pet_count
    """
    # query = Person.select(Person, Pet).join(Pet, JOIN.LEFT_OUTER).order_by(Person.name, Pet.name)
    # for person in query:
    #     if hasattr(person, 'pet'):
    #         print(person.name, person.pet.name)
    #     else:
    #         print(person.name, 'no pets')
    # query = Person.select().order_by(Person.name).prefetch(Pet)
    query = Person.select().prefetch(Pet)
    # 將屬於person記錄的pet記錄附加到person記錄上
    for person in query:
        print(person.name)
        for pet in person.pets:
            print('  *', pet.name)


def order():
    # for pet in Pet.select().where(Pet.owner == 9).order_by(+Pet.name):
    print(Pet.select().where(Pet.owner >= 1).order_by(Pet.name.desc()).count())
    for pet in Pet.select().where(Pet.owner >= 1).order_by(Pet.name.desc()).limit(5):  # 降序
        print(pet.name)


def total():
    for person in Person.select():
        print(person.name, person.pets.count(), 'pets')  # 同樣會遇到N+1問題


# print(Person._schema._create_table().query())
# print(Person._meta.database.param)
# print(Person._meta.name)
# print(Person._meta.sorted_fields)
# print(Person._meta.database.get_sql_context())
# print(Person._meta.fields_to_index())
def paginate():
    for per in Person.select().paginate(1, 10):
        print(per.name)


paginate()

 

運算符

|比較方式    |含義    |
|:---|:---|
|==    |x等於y|
|<    |x小於y|
|<=    |x小於或等於y|
| \>    |x大於y| 
|>=    |x大於或等於y|
|!=    |x不等於y|
|<<    |x IN y,其中y是列表或查詢|
|\>>    |x IS y,其中y為None / NULL|
|%    |x喜歡y,其中y可能包含通配符|
|**    |x像y,其中y可能包含通配符|
|^    |異或|
|~    |一元否定(例如,NOT x)|


|方法    |含義    |
|:---|:---|
|.in_(value)|    IN查找(與相同<<)。|
|.not_in(value)    |不在查詢中。|
|.is_null(is_null)|    是NULL還是IS NOT NULL。接受布爾參數。|
| .contains(substr)    | 通配符搜索子字符串 |
|.startswith(prefix)|    搜索以開頭的值prefix。|
|.endswith(suffix)|    搜索以結尾的值suffix。|
|.between(low, high)|    在low和之間搜索值high。|
|.regexp(exp)|    正則表達式匹配(區分大小寫)。|
|.iregexp(exp)|    正則表達式匹配(不區分大小寫)。|
|.bin_and(value)|    二進制AND。|
|.bin_or(value)    |二進制或。|
|.concat(other)|使用串聯兩個字符串或對象||。|
|.distinct()|    標記列以進行DISTINCT選擇。|
|.collate(collation)|    用給定的排序規則指定列。|
|.cast(type)    |將列的值強制轉換為給定的類型。|



|操作符    |含義    |例|
|:---:|:---|:---|
|&    |和    |(User.is_active == True) & (User.is_admin == True)|
| 1 |(管)    要么|    (User.is_admin)| (User.is_superuser)|
|~    |不(一元否定)    |~(User.username.contains('admin'))|



# demo
```
  User.last_login >> None
  User.is_null(True)

```  
  

進階

from playhouse.postgres_ext import PostgresqlExtDatabase, Model, HStoreField, CharField, MySQLDatabase
from playhouse.pool import PooledPostgresqlExtDatabase
from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
from playhouse.sqlite_ext import SqliteDatabase

from peewee import DatabaseProxy, CharField, TextField, DateTimeField, BigBitField, CompositeKey
import datetime
import conf

database_proxy = DatabaseProxy()

if conf.config['DEBUG']:
    database = SqliteDatabase('local.db')
elif conf.config['TESTING']:
    database = MySQLDatabase(
        'peewee_test',
        host='127.0.0.1',
        port=3306,
        user='root',
        password='12345678'
    )
else:
    database = PooledPostgresqlExtDatabase(
        'peewee_test',
        user='postgres',
        password='1234',
        host='127.0.0.1',
        isolation_level=ISOLATION_LEVEL_SERIALIZABLE,  # 隔離級別初始化psycopg2.extensions
        autoconnect=False,
        max_connections=8,
        stale_timeout=300,
        register_hstore=True,  # 使用hstore,您可以將任意鍵/值對與結構化的關系數據一起存儲在數據庫中。
    )

"""
postgresql://postgres:my_password@localhost:5432/my_database
mysql://user:passwd@ip:port/my_db
"""
database_proxy.initialize(database)
database_proxy.connect(reuse_if_open=True)


class BaseModel(Model):
    class Meta:
        database = database_proxy  # Use proxy for our DB.


class User(BaseModel):
    username = CharField(index=True, unique=True, choices=(('1', "ok"),), help_text="", verbose_name="")
    body = TextField()
    send_date = DateTimeField(default=datetime.datetime.now)

    class Meta:
        primary_key = CompositeKey('username', 'send_date')


class House(BaseModel):
    address = CharField()
    features = HStoreField()

    @staticmethod
    def request():
        House.create(
            address='123 Main St',
            features={'garage': '2 cars', 'bath': '2 bath'}
        )


def init():
    # @database_proxy.atomic()
    with database_proxy.atomic() as nested_txn:  # 事務
        # 可以使用嵌套事務
        User.create_table(True)
        House.create_table(True)
        # raise Exception(123)
        # nested_txn.rollback()
    # with database_proxy.transaction() as txn:  # 在事務中顯式運行代碼,則可以使用 transaction()
    #     # 不建議使用嵌套事務
    #     User.create(username='mickey')
    #     txn.commit()  # Changes are saved and a new transaction begins.
    #     User.create(username='huey')
    #     txn.rollback()
    #     with database_proxy.savepoint() as sp:  # 顯示的創建保存點
    #         User.create(username='mickey')
    #         sp.rollback()


init()

 

 

 

 

 

 

 

pwiz 模型生成器

pwiz是peewee附帶的一個小腳本,能夠對現有數據庫進行自省,並生成適合與基礎數據進行交互的模型代碼。如果您已經有一個數據庫,pwiz可以通過生成具有正確列親和力和外鍵的框架代碼來給您一個很好的提升。

 

pwiz接受以下命令行選項:

選項 含義
-H 顯示幫助  
-e 數據庫后端 -e MySQL
-H 主機連接 -H remote.db.server
-p 連接端口 -p 9001
-u 數據庫用戶 -u postgres
-P 數據庫密碼 -P(將提示輸入密碼)
-s 圖式 -s公共
-t 表格生成 -t tweet,用戶,關系
-v 為視圖生成模型 (無參數)
-i 將信息元數據添加到生成的文件 (無參數)
-o 表列順序被保留 (無參數)

 

 

 

 

困難

from peewee import *

db = MySQLDatabase()


# 循環外鍵依賴解決方法
class User(Model):
    username = CharField()
    favorite_tweet = DeferredForeignKey('Tweet', null=True)


class Tweet(Model):
    message = TextField()
    user = ForeignKeyField(User, backref='tweets')


db.create_tables([User, Tweet])
User._schema.create_foreign_key(User.favorite_tweet)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM