peewee是一款ORM
详细文档:https://peewee.readthedocs.io/en/latest/index.html
from peewee import * from datetime import date database = MySQLDatabase('peewee_test', host='127.0.0.1', port=3306, user='root', password='12345678') class Person(Model): name = CharField(unique=True) birthday = DateField() class Meta: table_name = "hello" database = database # This model uses the "people.db" database. indexes = ( (('name', 'birthday'), True), ) class Pet(Model): owner = ForeignKeyField(Person, backref='pets') # name = CharField() animal_type = CharField() class Meta: database = database # this model uses the "people.db" database primary_key = False # 没有主键的模型 # database.connect() def init(): # Person.create_table(True) # database.create_tables([Person, Pet]) # 会自动解决模型间的依赖关系 # with database: with database.connection_context(): """ 在包装的代码块期间打开连接。 """ database.create_tables([Person]) database.create_tables([Pet]) # @db.connection_context()也可用作装饰器 def drop(): database.drop_tables([Person, Pet]) def create(): Person.get_or_create(name='xxx', defaults={'birthday': "2015-11-25"}) # 查找有没有叫xxx的;没有创建 # uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15)) # # uncle_bob = Person(name='Bob', birthday="2019-04-11") # a = uncle_bob.save() # print(a, type(a)) # 返回修改的行数 # grandma = Person.create(name='Grandma', birthday=date(1935, 3, 1)) # print(grandma, type(grandma), grandma.name) # 7 <Model: Person> # grandma.name = 'lisi' # grandma.save() # 外键存储 # try: # with database.atomic(): # # herb_mittens_jr = Pet.create(owner=uncle_bob, name='Mittens Jr', animal_type='cat') # herb_mittens_jr = Pet.create(owner=9, name='Mittens Jr', animal_type='cat') # except IntegrityError as e: # print(e) # print(herb_mittens_jr, type(herb_mittens_jr)) # 批量创建 Person.insert_many([('a', '2019-11-21'), ('b', '2019-11-22')], [Person.name, Person.birthday]).execute() # 以下仅提供思路 # # 或者开启一个事物,再里面create;这样会一次性提交 # from peewee import chunked # data_source = [ # {'field1': 'val1-1', 'field2': 'val1-2'}, # {'field1': 'val2-1', 'field2': 'val2-2'}, # ] # with database.atomic(): # for batch in chunked(data_source, 100): # Person.insert_many(batch).execute() # with db.atomic(): # Person.bulk_create([persons,], batch_size=100) # Person.insert_from( # Person.select(Person.user, Person.message), # fields=[Person.user, Person.message] # ).execute() def delete(): # grandma = Person.get(Person.id==1) # grandma.delete_instance() # Person.delete_by_id(6) Person.delete().where(Person.id >= 5).execute() def update(): herb_mittens_jr = Pet.create(owner=9, name='Mittens Jr', animal_type='cat') herb_mittens_jr.owner = 8 herb_mittens_jr.save() Pet.update().where(Pet.id == 1) Pet.update(counter=Pet.counter + 1) # First, create 3 users with name u1, u2, u3. u1, u2, u3 = [Person.create(name='u%s' % i) for i in (1, 2, 3)] # Now we'll modify the user instances. u1.name = 'u1-x' u2.usernamename = 'u2-y' u3.username = 'u3-z' # Update all three users with a single UPDATE query. Person.bulk_update([u1, u2, u3], fields=[Person.name], batch_size=50) # 大量数据应该指定batch_size ###################### # subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id) # User.update(num_tweets=subquery).execute() # fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg') # fn.SUM(Sample.value).filter(Sample.counter != 2).over(order_by=[Sample.id]).alias('csum') def get(): # 获取一条数据 try: grandma = Person.select().where(Person.name == 'Grandma').get() # get获取不到,会抛出异常 grandma = grandma.dicts() print(Person[1]) # id print(Person.get_by_id(1)) # id print(Person.first()) print(grandma, type(grandma)) except DoesNotExist: print("不存在") # 获取一些数据 for pet in Pet.select().where(Pet.name != '').iterator(): # 不能写空的过滤条件 print(pet.owner.name, pet.name) # 在执行pet.owner.name会导致N+1次查询 # 连表查询 # for pet in Pet.select(Pet, Person).join(Person).where(Pet.animal_type == 'cat'): # for pet in Pet.select().join(Person, on=Person.pets).where(Pet.animal_type == 'cat'): # print(pet.name, pet.owner.name) # 组合式过滤 # for person in Person.select().where((Person.birthday < "2019-12-12") | (Person.birthday > "2019-01-11")): # for person in Person.select().where(Person.birthday.between("2019-01-11", "2019-12-12")): # print(person.name, person.birthday) # per = Person.select().where(Person.id.in_((1, 6))) # for item in per: # print(item.name) def cross_table(): # print(Person.select(fn.MIN(Person.birthday), fn.MAX(Person.birthday)).scalar(as_tuple=True)) # (datetime.date(2019, 4, 15), datetime.date(2019, 12, 2)) # expression = fn.Lower(fn.Substr(Person.name, 1, 1)) == 'g' """ 查找名称以大写或小写G开头的所有人员 """ # for person in Person.select().where(expression): # print(person.name) # # Pet.select().join(Person).where(Pet.animal_type == 'cat') # query = Person.select(Person, fn.COUNT(Pet.id).alias('pet_count')).join(Pet, JOIN.LEFT_OUTER).group_by( # Person).order_by(Person.name) # for person in query: # # "pet_count" becomes an attribute on the returned model instances. # print(person.name, person.pet_count, 'pets') """ fn(),可用于调用任何SQL函数。 fn.COUNT(Pet.id).alias('pet_count') 将转换为。COUNT(pet.id) AS pet_count """ # query = Person.select(Person, Pet).join(Pet, JOIN.LEFT_OUTER).order_by(Person.name, Pet.name) # for person in query: # if hasattr(person, 'pet'): # print(person.name, person.pet.name) # else: # print(person.name, 'no pets') # query = Person.select().order_by(Person.name).prefetch(Pet) query = Person.select().prefetch(Pet) # 将属于person记录的pet记录附加到person记录上 for person in query: print(person.name) for pet in person.pets: print(' *', pet.name) def order(): # for pet in Pet.select().where(Pet.owner == 9).order_by(+Pet.name): print(Pet.select().where(Pet.owner >= 1).order_by(Pet.name.desc()).count()) for pet in Pet.select().where(Pet.owner >= 1).order_by(Pet.name.desc()).limit(5): # 降序 print(pet.name) def total(): for person in Person.select(): print(person.name, person.pets.count(), 'pets') # 同样会遇到N+1问题 # print(Person._schema._create_table().query()) # print(Person._meta.database.param) # print(Person._meta.name) # print(Person._meta.sorted_fields) # print(Person._meta.database.get_sql_context()) # print(Person._meta.fields_to_index()) def paginate(): for per in Person.select().paginate(1, 10): print(per.name) paginate()
运算符
|比较方式 |含义 | |:---|:---| |== |x等于y| |< |x小于y| |<= |x小于或等于y| | \> |x大于y| |>= |x大于或等于y| |!= |x不等于y| |<< |x IN y,其中y是列表或查询| |\>> |x IS y,其中y为None / NULL| |% |x喜欢y,其中y可能包含通配符| |** |x像y,其中y可能包含通配符| |^ |异或| |~ |一元否定(例如,NOT x)| |方法 |含义 | |:---|:---| |.in_(value)| IN查找(与相同<<)。| |.not_in(value) |不在查询中。| |.is_null(is_null)| 是NULL还是IS NOT NULL。接受布尔参数。| | .contains(substr) | 通配符搜索子字符串 | |.startswith(prefix)| 搜索以开头的值prefix。| |.endswith(suffix)| 搜索以结尾的值suffix。| |.between(low, high)| 在low和之间搜索值high。| |.regexp(exp)| 正则表达式匹配(区分大小写)。| |.iregexp(exp)| 正则表达式匹配(不区分大小写)。| |.bin_and(value)| 二进制AND。| |.bin_or(value) |二进制或。| |.concat(other)|使用串联两个字符串或对象||。| |.distinct()| 标记列以进行DISTINCT选择。| |.collate(collation)| 用给定的排序规则指定列。| |.cast(type) |将列的值强制转换为给定的类型。| |操作符 |含义 |例| |:---:|:---|:---| |& |和 |(User.is_active == True) & (User.is_admin == True)| | 1 |(管) 要么| (User.is_admin)| (User.is_superuser)| |~ |不(一元否定) |~(User.username.contains('admin'))| # demo ``` User.last_login >> None User.is_null(True) ```
进阶
from playhouse.postgres_ext import PostgresqlExtDatabase, Model, HStoreField, CharField, MySQLDatabase from playhouse.pool import PooledPostgresqlExtDatabase from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE from playhouse.sqlite_ext import SqliteDatabase from peewee import DatabaseProxy, CharField, TextField, DateTimeField, BigBitField, CompositeKey import datetime import conf database_proxy = DatabaseProxy() if conf.config['DEBUG']: database = SqliteDatabase('local.db') elif conf.config['TESTING']: database = MySQLDatabase( 'peewee_test', host='127.0.0.1', port=3306, user='root', password='12345678' ) else: database = PooledPostgresqlExtDatabase( 'peewee_test', user='postgres', password='1234', host='127.0.0.1', isolation_level=ISOLATION_LEVEL_SERIALIZABLE, # 隔离级别初始化psycopg2.extensions autoconnect=False, max_connections=8, stale_timeout=300, register_hstore=True, # 使用hstore,您可以将任意键/值对与结构化的关系数据一起存储在数据库中。 ) """ postgresql://postgres:my_password@localhost:5432/my_database mysql://user:passwd@ip:port/my_db """ database_proxy.initialize(database) database_proxy.connect(reuse_if_open=True) class BaseModel(Model): class Meta: database = database_proxy # Use proxy for our DB. class User(BaseModel): username = CharField(index=True, unique=True, choices=(('1', "ok"),), help_text="", verbose_name="") body = TextField() send_date = DateTimeField(default=datetime.datetime.now) class Meta: primary_key = CompositeKey('username', 'send_date') class House(BaseModel): address = CharField() features = HStoreField() @staticmethod def request(): House.create( address='123 Main St', features={'garage': '2 cars', 'bath': '2 bath'} ) def init(): # @database_proxy.atomic() with database_proxy.atomic() as nested_txn: # 事务 # 可以使用嵌套事务 User.create_table(True) House.create_table(True) # raise Exception(123) # nested_txn.rollback() # with database_proxy.transaction() as txn: # 在事务中显式运行代码,则可以使用 transaction() # # 不建议使用嵌套事务 # User.create(username='mickey') # txn.commit() # Changes are saved and a new transaction begins. # User.create(username='huey') # txn.rollback() # with database_proxy.savepoint() as sp: # 显示的创建保存点 # User.create(username='mickey') # sp.rollback() init()
pwiz 模型生成器
pwiz是peewee附带的一个小脚本,能够对现有数据库进行自省,并生成适合与基础数据进行交互的模型代码。如果您已经有一个数据库,pwiz可以通过生成具有正确列亲和力和外键的框架代码来给您一个很好的提升。
pwiz接受以下命令行选项:
选项 | 含义 | 例 |
---|---|---|
-H | 显示帮助 | |
-e | 数据库后端 | -e MySQL |
-H | 主机连接 | -H remote.db.server |
-p | 连接端口 | -p 9001 |
-u | 数据库用户 | -u postgres |
-P | 数据库密码 | -P(将提示输入密码) |
-s | 图式 | -s公共 |
-t | 表格生成 | -t tweet,用户,关系 |
-v | 为视图生成模型 | (无参数) |
-i | 将信息元数据添加到生成的文件 | (无参数) |
-o | 表列顺序被保留 | (无参数) |
困难
from peewee import * db = MySQLDatabase() # 循环外键依赖解决方法 class User(Model): username = CharField() favorite_tweet = DeferredForeignKey('Tweet', null=True) class Tweet(Model): message = TextField() user = ForeignKeyField(User, backref='tweets') db.create_tables([User, Tweet]) User._schema.create_foreign_key(User.favorite_tweet)