一、ORM 框架簡介
對象-關系映射(Object/Relation Mapping,簡稱ORM),是隨着面向對象的軟件開發方法發展而產生的。面向對象的開發方法是當今企業級應用開發環境中的主流開發方法,關系數據庫是企業級應用環境中永久存放數據的主流數據存儲系統。對象和關系數據是業務實體的兩種表現形式,業務實體在內存中表現為對象,在數據庫中表現為關系數據。內存中的對象之間存在關聯和繼承關系,而在數據庫中,關系數據無法直接表達多對多關聯和繼承關系。因此,對象-關系映射(ORM)系統一般以中間件的形式存在,主要實現程序對象到關系數據庫數據的映射。
二、SQLAlchemy介紹
1 安裝環境
pip install SQLAlchemy
2 安裝mysql
yum install mysql-server mysql
service mysqld restart
sysctmctl restart mysql.service
3 創建數據庫
create database sqlalchemy;
4 授權
GRANT ALL PRIVILEGES ON *.* TO 'fxq'@'%' IDENTIFIED BY ‘123456’;
5 初始化連接
from sqlalchemy import create_engine
engine = create_engine('mysql://fxq:123456@192.168.100.101/sqlalchemy', echo=True)
engine.table_names() # 返回數據庫中所有表的名字
- echo參數為True時,會顯示每條執行的SQL語句,可以關閉。
- create_engine()返回一個Engine的實例,它通過數據庫語法處理細節的核心接口,數據庫語法將會被解釋成python的類方法。
create_engine()
的第一個參數是 url- 例如:
sql://fxq:123456@192.168.100.101/sqlalchemy
- mysql: 指定是哪個數據庫連接。實際上由
dialaect[+driver]
組成,在默認情況下由名字判斷driver - xq: 用戶名
- 123456: fxq用戶對應的密碼
- 192.168.100.101: 數據庫的ip
- sqlalchemy: 數據庫需要連接庫的名字
- 例如:
create_engine()
函數和連接池相關的參數有:-pool_recycle
, 默認為 -1, 推薦設置為 7200, 即如果 connection 空閑了 7200 秒,自動重新獲取,以防止 connection 被 db server 關閉。-pool_size=5
, 連接數大小,默認為 5,正式環境該數值太小,需根據實際情況調大- -max_overflow=10, 超出 pool_size 后可允許的最大連接數,默認為 10, 這 10 個連接在使用過后,不放在 pool 中,而是被真正關閉的。
-pool_timeout=30
, 獲取連接的超時閾值,默認為 30 秒
6 創建表格
6.1 通過 sql語句 來創建表格
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
sql = '''create table student(
id int not null primary key,
name varchar(50),
age int,
address varchar(100));
'''
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
conn = engine.connect()
conn.execute(sql)
engine.connect() #表示獲取到數據庫連接。類似我們在MySQLdb中游標course的作用。
6.2 通過ORM方式創建表格
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
metadata = MetaData(engine)
student = Table('student', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50), ),
Column('age', Integer),
Column('address', String(10)),
)
metadata.create_all(engine)
- MetaData類主要用於保存表結構,連接字符串等數據,是一個多表共享的對象
- metadata = MetaData(engine) # 綁定一個數據源的metadata
- metadata.create_all(engine) # 是來創建表,這個操作是安全的操作,會先判斷表是否存在。
6.3 Column類
Column不僅僅用在Table類中使用,也用在模型類中使用。
Column.__init__(self, name, type_, *args, **kwargs)
store = Table('store', ModelBase.metadata,
Column('id', Integer, Sequence("yhk_store_id_seq"), primary_key=True),
Column('name', String, nullable=False, doc='商店名稱'),
Column('business_id', Integer, ForeignKey('business.id'), doc='所屬企業'),
Column('createtime', DateTime, default=datetime.datetime.now),
Column('address', String, doc='商店地址'),
Column('servicetel', String, doc='服務電話')
)
- name 列名
- type 類型,更多類型見 sqlalchemy.types
- *args 包括Constraint(約束), ForeignKey(外鍵), ColumnDefault(默認), Sequenceobjects(序列)
- key 列名的別名,默認None
- 下面是可變參數 **kwargs
- primary_key 如果為True,則是主鍵
- nullable 是否可為Null,默認是True
- default 默認值,默認是None
- index 是否是索引,默認是True
- unique 是否唯一鍵,默認是False
- onupdate 指定一個更新時候的值,這個操作是定義在SQLAlchemy中,不是在數據庫里的,當更新一條數據時設置,大部分用於updateTime這類字段
- autoincrement 設置為整型自動增長,只有沒有默認值,並且是Integer類型,默認是True
- quote 如果列明是關鍵字,則強制轉義,默認False
6.4 Table類
Table.__init__(self, name, metadata, *args, **kwargs)
store = Table('store', ModelBase.metadata,
Column('id', Integer, Sequence("yhk_store_id_seq"), primary_key=True),
Column('name', String, nullable=False, doc='商店名稱'),
Column('business_id', Integer, ForeignKey('business.id'), doc='所屬企業'),
Column('createtime', DateTime, default=datetime.datetime.now),
Column('address', String, doc='商店地址'),
Column('servicetel', String, doc='服務電話')
)
- name 表名
- metadata 共享的元數據
- *args Column 是列定義,詳見下一節Column部分
- 下面是可變參數 **kwargs 各項定義
- schema: 此表的結構名稱,默認None
- autoload: 自動從現有表中讀入表結構,默認False
- autoload_with: 從其他engine讀取結構,默認None
- include_columns: 如果autoload設置為True,則此數組中的列將被引用,沒有寫的列明將被忽略,None表示所有都列明都引用,默認為None;即引用所有列
- mustexist 如果為True,表示這個表必須在其他的python應用中定義,必須是metadata的一部分,默認False
- useexisting 如果為True,表示這個表必須被其他應用定義過,將忽略結構定義,默認False
- owner 表所有者,用於Orcal,默認None
- quote 設置為True,如果表明是SQL關鍵字,將強制轉義,默認False
- quote_schema 設置為True,如果列明是SQL關鍵字,將強制轉義,默認False
- mysql_engine mysql專用,可以設置'InnoDB'或'MyISAM'
6.5 模型類
模型類的主要作用是在python環境中使用一個類模擬/映射sql中表的結構
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base() # 生成模型類的基類
class User(Base): # 模型類必須通過繼承基類來獲得metadata
__tablename__ = 'users' # 聲明需要映射的表名
id = Column(Integer,primary_key=True)
name = Column(String(20),nullable=False)
addresses = relationship('Address')
class Address(Base):
__tablename__ = 'address'
id = Column(Integer,primary_key=True)
address = Column(String(20),nullable=False)
user_id = Column(Integer,ForeignKey('users.id'))
# 請注意,設置外鍵的時候用的是表名.字段名。
# 其實在表和模型類的抉擇中,只要參數是字符串,往往是表名;如果是對象則是模型類對象。
user = relationship('User')
- 模型類可以通過遷移來創建數據庫中的表結構,但是sqlalchemy不提供這樣的功能。可以查詢相關的工具。
- 只有session具備基於模型類操作數據的能力,是有core的connection無法操作模型類。
三、 通過 connection 操作 數據
connection的主要目的是建立與數據庫的會話,它維護你加載和關聯的所有數據庫對象。它是數據庫查詢(Query)的一個入口。它會將我們通過ORM寫好的sql expression
翻譯成sql語句,並傳入數據庫執行。
Query對象返回的結果是一組同一映射(Identity Map)對象(或者集合)。事實上,集合中的一個對象,對應於數據庫表中的一行(即一條記錄)。所謂同一映射,是指每個對象有一個唯一的ID。如果兩個對象(的引用)ID相同,則認為它們對應的是相同的對象。
1 connection
from sqlalchemy import create_engine
engine = create_engine("URL")
connection = engine.connect()
# URL的結構:dialaect[+driver]://user:password@host:port/dbname
# dialaect:數據庫的名稱,比如mysql,oracle,postgresql……
# e.g."mysql://scott:tiger@localhost/test
2 Transaction
使用事務和回滾機制防止數據庫錯誤帶來意外損失
from sqlalchemy import create_engine
engine = create_engine("URL")
connection = engine.connect()
trans = connection.begin()
ins = table1.select()
try:
r1 = connection.execute(ins)
connection.execute(table1.insert(), col1=7, col2='this is some data')
trans.commit()
except:
trans.rollback()
raise
當然,還有更加優雅的寫法
with engine.begin() as connection:
r1 = connection.execute(ins1)
r2 = connection.execute(ins2)
2 Reflection
讀取數據庫,構建SQLAlchemy表格對象
from sqlalchemy import MetaData, Table
from sqlalchemy import create_engine
engine = create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = MetaData()
# MetaData元數據:存儲數據庫信息(比如各種表格)的目錄
census = Table('census',metadata,autoload=True,autoload_with=engine)
# 第一個參數傳入數據庫的名稱
# 第二個參數autoload,默認為False,此時可以手動定義和添加column對象,若參數設定為True,則自動從數據庫中導出column對象,導出的對象可能會替換我們設定的column對象
print(repr(census)) #使用repr()功能來預覽表格的細節,可以查到列名、列數據的類型
print(metadata.tables) #以字典的形式返回metadata中的所有表格
print(metadata.tables['census']) #等價於repr(census)
print(census.columns.keys()) #返回列名組成的列表
自行建立數據庫中不存在的表,在第二章的6.2中已經提到過,過程如下:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
metadata = MetaData()
student = Table('student', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50), ),
Column('age', Integer),
Column('address', String(10)),
)
metadata.create_all(engine)
3 基本的 SQL querying 結構
- 如果使用 sql語句 查詢
from seqalchemy import create_engine
engine = create_engine('sqlite:///census.sqlite')
connection = engine.connect()
stmt = 'SELECT * FROM people'
result_proxy = connection.execute(stmt)
results = result_proxy.fetchall() #獲取全部查詢結果
first_row = results[0] #輸出第一行
print(first_row)
>> ('Illinois','M',0,89600,95012)
print(first_row.keys()) #輸出該行數據對應的列名
>>['state','sex','age','pop2000','pop2008']
print(first_row.state) #輸出具體某一列的數值
>>'Illinois'
- 如果使用SQLAlchemy expression 查詢:
- 創建
engine
- 創建
connection
- 創建
metadata
reflection table
- 選擇
query
的方法 execute
&fetchall()
execute
的結果是ResultProxy
,是一個查詢結果對象fetchall()
的結果是ResultSet
,是真實的數據值
- 創建
from sqlalchemy import create_engine, MetaData, Table, select
engine = create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = MetaData()
census = Table('census',metadata,autoload=True,autoload_with=engine)
stmt = select([census]) #select的參數是個list,這里僅包含了census一個元素
results = connection.execute(stmt).fetchall() # 結果為ResultSet
print(results[0][0]) #讀取第一行第一列的數據
print(results[0]['column name'])
- 結合上面說的事務,一條最基本的查詢語句結構如下:
metadata = MetaData()
census = Table('census',metadata,autoload=True,autoload_with=engine)
connection = engine.connect()
ins = census.select()
with engine.begin() as connection:
row = connection.execute(stmt).fetchall()
print(row[0][0])
print(row[0]['column name'])
4 Filtering, Ordering and Grouping
Filter
- 使用
where
方法來進行條件過濾
stmt = select([census])
stmt = stmt.where(census.columns.state == 'California')
results = connection.execute(stmt).fetchall()
for result in results:
print(result.state, result.age)
- 復雜的條件判斷(以下全部為columns方法)
in_()
:匹配列表中的值like()
:匹配通配符的部分值between()
:檢查返回值是否在提供的區間內
stmt = selcet([census]) # 查詢出行對象
stmt = stmt.where(census.columns.state.startwith('New'))
for result in connection.execute(stmt): #SQLAlchemy的特性,可以直接使ResultProxy作為循環的目標
print(result.state, result.pop2000)
-
連詞的使用:
and_(),not_(),or_()
注意使用前需要導入算子,
and_()
和or_()
方法也可以使用|
和&
算子來實現,記得不同條件要用括號括起來
from sqlalchemy import or_
stmt = select([census])
stmt = stmt.where(
or_(census.columns.state == 'California', census.columns.state == 'New York')
)
#寫法二:
#stmt = stmt.where(
# (census.columns.state == 'California') | (census.columns.state == 'New York')
#)
for result in connection.execute(stmt):
print(result.state, result.sex)
e.g.2
from sqlalchemy import in_
stmt = select([census])
stmt = stmt.where(census.columns.state.in_(states)) #查找名字在列表states中的州
e.g.3
from sqlalchemy import and_ , or_
stmt = select([census])
stmt = stmt.where(
and_(census.columns.state == 'New York',
or_(census.columns.age == 21,
census.columns.age == 37)
)
) #查找紐約州年紀為21歲或37歲的數據
- 簡單排序:
order_by()
語句,默認按升序排序,對於字母,即按字母表順序排序
stmt = select([census.columns.state])
stmt = stmt.order_by(census.columns.state)
#降序排序
from sqlalchemy import desc
stmt = stmt.order_by(desc(census.columns.state))
- 復雜排序:在
order_by()
語句中傳入多列,用逗號隔開,按傳入列的先后順序排序
from sqlalchemy import desc
stmt = select([census.columns.state, census.columns.age])
#州按升序排序,年齡按降序排序
stmt = stmt.order_by(census.columns.state, desc(census.columns.age))
results = connection.execute(stmt).fetchall()
print(results)
>>
[('Alabama', 85), ('Alabama', 85), ('Alabama', 84), ('Alabama', 84), ('Alabama', 83), ('Alabama', 83), ('Alabama', 82), ('Alabama', 82), ('Alabama', 81), ('Alabama', 81), ('Alabama', 80), ('Alabama', 80), ('Alabama', 79), ('Alabama', 79), ('Alabama', 78), ('Alabama', 78), ('Alabama', 77), ('Alabama', 77), ('Alabama', 76), ('Alabama', 76)]
Counting, Summing and Grouping
- 聚合函數的功能集成在
func
模塊中,注意不要直接import sum()……
,會與Python的內置函數沖突
from sqlalchemy import func
stmt = select([func.sum(census.columns.pop2008)]) #求和
results = connection.execute(stmt).scalar() # 注意scalar()只返回單一值
print(results)
>>302876613
- Group by
stmt = select([census.columns.sex, func.sum(census.columns.pop2008)]) # 這里顯示了兩個值
stmt = stmt.group_by(census.columns.sex)
results = connection.execute(stmt).fetchall()
print(results)
>>
[('F',153959198),('M',148917415)]
- SQLAlchemy 在 ResultSet 中自動為 functions 生成列名
- 列名通常為:
func_#
,比如count_1
;這樣會導致操作困難 - 使用
label()
函數來更改列名
- 列名通常為:
print(results[0].keys())
>> ['sex',u'sum_1']
#更改生成列的列名
stmt = select([census.columns.sex, func.sum(census.columns.pop2008).label('pop2008_sum')])
stmt = stmt.group_by(census.columns.sex)
results = connection.execute(stmt).fetchall()
print(results[0],keys())
>> ['sex','pop2008_sum']
#多列聚合類似於多類排序,按照傳入列的順序進行聚合
#求出不同性別下,各年齡段在2008年的總人口數
stmt = select([census.columns.sex,census.columns.age,
func.sum(census.columns.pop2008)])
stmt = stmt.group_by(census.columns.sex, census.columns.age)
results = connection.execute(stmt).fetchall()
print(results)
>>
[('F',0,2105442),('F',1,2087705),('F',2,2037280)……]
distinct()
方法:按列中不同的值分類
#求出一共統計了多少個州的數據
stmt = select([func.count(census.columns.state.distinct())])
distinct_state_count = connection.execute(stmt).scalar()
print(distinct_state_count)
>>51
#打印出統計的各個州的名稱
stmt = select([census.columns.state.distinct()])
different_state = connection.execute(stmt).fetchall()
print(different_state)
>>
[('Illinois',), ('New Jersey',), ('District of Columbia',), ('North Dakota',), ('Florida',), ('Maryland',), ('Idaho',), ('Massachusetts',), ('Oregon',), ('Nevada',), ('Michigan',), ('Wisconsin',), ('Missouri',), ('Washington',), ('North Carolina',), ('Arizona',), ('Arkansas',), ('Colorado',), ……]
#復雜聚合
from sqlalchemy import func
stmt = select([census.columns.state,func.count(census.columns.age)])
stmt = stmt.group_by(census.columns.state)
results = connection.execute(stmt).fetchall() #返回結果是list
print(results)
print(results[0].keys())
>>
[('Alabama', 172), ('Alaska', 172), ('Arizona', 172), ('Arkansas', 172), ('California', 172), ('Colorado', 172), ('Connecticut', 172), ('Delaware', 172), ('District of Columbia', 172), ('Florida', 172), ('Georgia', 172), ('Hawaii', 172), ('Idaho', 172), ('Illinois', 172), ('Indiana', 172), ('Iowa', 172), ('Kansas', 172), ('Kentucky', 172), ('Louisiana', 172), ('Maine', 172), ('Maryland', 172), ('Massachusetts', 172), ('Michigan', 172), ('Minnesota', 172), ('Mississippi', 172), ('Missouri', 172), ('Montana', 172), ……]
['state', 'count_1']
- 可以先將
func
函數的表達式寫出並賦給一個變量,同時完成新增列的命名
#求出每個州2008年的總人數
from sqlalchemy import func
pop2008_sum = func.sum(census.columns.pop2008).label('population')
stmt = select([census.columns.state,pop2008_sum])
stmt = group_by(census.columns.state)
results = connection.execute(stmt).fetchall()
print(results)
>>
[('Alabama', 4649367), ('Alaska', 664546), ('Arizona', 6480767), ('Arkansas', 2848432), ('California', 36609002), ('Colorado', 4912947), ('Connecticut', 3493783), ('Delaware', 869221), ('District of Columbia', 588910), ('Florida', 18257662), ('Georgia', 9622508), ('Hawaii', 1250676), ('Idaho', 1518914), ('Illinois', 12867077), ('Indiana', 6373299), ('Iowa', 3000490), ('Kansas', 2782245), ('Kentucky', 4254964), ('Louisiana', 4395797), ('Maine', 1312972), ('Maryland', 5604174), ('Massachusetts', 6492024), ('Michigan', 9998854), ……]
- ResultsProxy 可以直接與pandas交互轉換成DataFrame
import pandas as pd
df = pd.DataFrame(results)
df.columns = results[0].keys() #列名提取作為DataFrame的列
Advanced SQLAlchemy Queries
- 數值計算
#計算2000年到2008年之間人數最多的前5個年齡段
from sqlalchemy import desc
stmt = select([census.columns.age,
(census.columns.pop2008-census.columns.pop2000)
.label('pop_change')])
stmt = stmt.group_by(census.columns.age)
stmt = stmt.group_by(desc('pop_change'))
stmt = stmt.limit(5) #僅返回前5名
results = connection.execute(stmt).fetchall()
print(results)
- Case Statement
- 接受條件的列表來進行匹配,最終返回一個滿足條件匹配的列對象
- 條件匹配最終以else子句結束,用來處理那些不匹配條件的情況
from sqlalchemy import case,func
#求紐約2008年的人口數
stmt = select([
func.sum(
case([
(census.columns.state == 'New York',census.columns.pop2008),
else_=0 #如果數據來自紐約,則返回其2008年人口數用以求和,否則返回0
])
)
])
results = connection.execute(stmt).fetchall()
print(results)
>> [(19465159,)]
- Cast Statement
- 用來進行數據類型的轉換
- 整型轉為浮點型方便進行除法運算
- 字符串轉為日期和時間
- 參數接受列對象或者是表達式,以及目標數據類型
- 用來進行數據類型的轉換
#求出居住在紐約的總人口占比
from sqlalchemy import case, cast, Float
stmt = select([
(func.sum(
case([(census.columns.state == 'New York',
censeus.columns.pop2008)],
else_=0)) #紐約的總人口數
/cast(func.sum(census.columns.pop2008),Float)*100 #除以2008年的總人口數 *100%
).label('ny_percent')
])
results = connection.execute(stmt).fetchall()
print(results)
>> [(Decimal('6.4267619765'),)]
#為了方便閱讀也可以分開寫
NY_pop2008 = func.sum(
case([
(census.columns.state == 'New York',census.columns.pop2008)
],else_=0)
) #求紐約的人口數
total_pop2008 = cast(func.sum(census.columns.pop2008),Float) #求總的人口數
stmt = select([NY_pop2008/total_pop2008*100])
percent = connection.execute(stmt).scalar()
print(percent)
5 SQL Relationships
- 對於已經定義好表間關系的表格,使用SQLAlchemy自動結合兩張表
stmt = select([census.columns.pop2008,state_fact.columns.abbreviation])
results = connection.execute(stmt).fetchall()
- 對於沒有預定義表間關系的表格,
join
接受一個表格以及額外的表達式來解釋兩張表的關系- 給
join
子句傳入一個布爾表達式來解釋兩張表是怎樣關聯的 - only join rows from each table that can be related between the two columns
- 不允許在數據類型不同的列間建立關系
- 給
join
子句緊跟着select()
子句且在任意where()
,order_by
或group_by()
子句之前- 當我們需要創建一個請求,不再從每個列表中選擇需要的列,而是同時使用兩個表時,使用
select_from
語句來實現,join
子句插入其中 - e.g.1
stmt = select([func.sum(census.columns.pop2000)])
stmt = stmt.select_from(census.join(state_fact))
stmt = stmt.where(state_fact.columns.circuit_court == '10')
result = connection.execute(stmt).scalar()
- e.g.2
stmt = select([func.sum(census.columns.pop2000)])
stmt = stmt.select_from(census.join(state_fact,
census.columns.state == state_fact.colums.name))
stmt = stmt.where(state_fact.columns.census_division_name == 'East South Central')
result = connection.execute(stmt).scalar()
- 使用分級表
- 分級表(hierarchical tables):
- Contain a relationship with themselves
- 通常用來儲存:組織圖(organizational charts),地理資料(geographic data),網絡(networks)和關系圖(relationship graphs)等
alias()
方法用來對同一張表創建兩個不同的名字,即提供了一種通過不同名稱來訪問同一張表的方法
- 分級表(hierarchical tables):
managers = employees.alias()
stmt = select([manager.columns.name.label('manager'),
employees.colums.name.label('employee')])
stmt = stmt.select_from(employees.join(managers,
managers.columns.id == employees.colums.manager))
stmt = stmt.order_by(managers.colums.name)
print(connection.execute(stmt).fetchall())
>>
[(u'FILLMORE',u'GRANT'),(u'FILLMORE',u'ADAMS'),……]
- Dealing with Large ResultSet
- 當數據量過大時,可能會引起存儲空間不夠的問題
fetchmany()
方法允許僅讀取部分數據,將需要提取數據的數量傳入該方法- 當沒有數據時,返回空列表
- 在完成數據處理后要關閉
ResultProxy
#已經完成以下定義:
#more_results = True,字典state_count用來存儲每個州出現的次數,results_proxy是ResultsProxy類型
while more_results:
partial_results = results_proxy.fetchmany(50)
if partial_results == []:
more_results = False
for row in partial_results:
state_count[row.state] += 1
results_proxy.close()
6 Creating and Manipulating Data
Creating Databases and Tables
- 對於SQLite,可以直接使用
create_engine()
來創建數據庫
from sqlalchemy import create_engine, Metadata
from sqlalchemy import (Table, Column, String, Integer, Decimal, Boolean)
engine = create_engine(URL)
metadata = Metadata()
employees = Table('employees',metadata,
Column('id', Integer()),
#設定name字段不允許出現重復值和空值
Column('name', String(255), unique=True, nullable=False),
#設定salary字段的默認值為100
Column('salary', Decimal(),default=100.00),
#設定active字段的默認值為True
Column('active', Boolean(),default=True))
metadata.create_all(engine)
#可以使用.constraints方法來查看表中設定了哪些限制
print(employees.constraints)
- 添加數據
from sqlalchemy import insert
#insert()方法接受表名作為參數,插入的數值寫在.values()里
stmt = insert(employees).values(id=1,name='Jason',salary=1.00,active=True)
result_proxy = connection.execute(stmt) #注意insert方法不返回任何行,所以不用調用fetchall
print(result_proxy.rowcount) #.rowcount屬性可以查看添加了多少行
#添加多行的方法:
#構建一條不帶任何值的statement語句,構建一個字典的列表用來存儲需要添加的值,然后在connection中同時將列表和語句傳給execute()方法作為參數
stmt = insert(employees)
values_list = [
{'id':2, 'name':'Rebecca', 'salary':2.00, 'active':True},
{'id':3, 'name':'Bob', 'salary':0.00, 'active':False}
]
result_proxy = connection.execute(stmt,values_list)
print(result_proxy.rowcount)
>> 2
-
將CSV文件添加進表格
- 使用
CSV
模塊來建立一個csv_reader
,csv_reader
是一個閱讀器對象,可以迭代CSV文件中的行
import csv file = open('XXX.csv') csv_reader = csv.reader(file) stmt = insert(census) values_list = [] total_rowcount = 0 #使用枚舉方法迭代讀取csv文件 for idx, row in enumerate(csv_reader): data = {'state':row[0], 'sex':row[1], 'age':row[2], 'pop2000':row[3], 'pop2008':row[4]} values_list.append(data) if idx % 51 == 0:#驗證數據是否添加完整(數據中共統計了52個州,即0-51) results = connection.execute(stmt,values_list) total_rowcount += results.rowcount #求出一共添加了多少組數據
- 使用
Updating Date in a Database
- 使用
update()
語句來進行更新,語法結構類似於insert()
- 使用
where()
子句來選擇要更新的數據 - 使用
.values()
子句來更新數據的值
from sqlalchemy import update
stmt = update(employees)
stmt = stmt.where(employees.columns.id == '3')
stmt = stmt.values(active = True)
results_proxy = connection.execute(stmt)
print(result_proxy.rowcount)
#更新多條數據
stmt = update(employees)
stmt = stmt.where(employees.colums.active == True)
stmt = stmt.values(active = False, salary = 0.00)
result_proxy = connection.execute(stmt)
print(result_proxy.rowcount)
#同步更新:從本表或其他表中選出某個數據,用來作為更新的值更新現有表格中的數據
new_salary = select([employees.columns.salary])
new_salary = new_salary.order_by(desc(employees.columns.salary))
new_salary = new_salary.limit(1) #選出工資最高的值
stmt = update(employees)
stmt = stmt.values(salary = new_salary) #修改所有數據
result_proxy = connection.execute(stmt)
Deleting Date in a Database
- 使用
delete()
語句來執行刪除功能 - 添加
where()
子句來確定需要刪除的數據 - 刪除的數據不易恢復,所以執行刪除操作時請務必謹慎
- 檢查刪除的行數來防止誤刪除太多的數據
from sqlalchemy import delete
delete_stmt = delete(extra_employees)
result_proxy = connection.execute(delete_stmt) #不加任何條件限制,刪除說有數據
stmt = delete(employees).where(employees.columns.id == '3')
result_proxy = connection.execute(stmt)
- 刪除數據庫中的表格,使用
drop()
語句
extra_employees.drop(engine)
print(extra_employees.exists(engine))
>> False
- 使用
drop_all()
語句刪除所有表格
metadata.drop_all(engine)
print(engine.table_names())
>> []
四、 通過 session 操作 模型類
Session是對transcation的封裝,最重要的功能是實現原子操作。要完成數據庫查詢,就需要建立與數據庫的連接。這就需要用到Engine對象。一個Engine可能是關聯一個Session對象,也可能關聯一個數據庫表。一旦任務完成 session 會將數據庫 connection 交還給 pool。
1 建立session鏈接
ORM通過session與數據庫建立連接進行通信,如下所示:
from sqlalchemy.orm import sessionmaker
DBSession = sessionmaker(bind=engine)
session = DBSession()
...your query action...
try:
session.commit()
return query_result
except Exception as e:
session.rollback()
raise e
finally:
session.close()
通過sessionmake方法創建一個Session工廠,然后在調用工廠的方法來實例化一個Session對象。
session是數據庫的連接訪問方式,模型類(繼承自declarative_base()方法生成的基類的類)主要是模擬/映射數據庫表結構關系的類。
session也可以用於提交上面講到的sql expression。將上面的 your query expression
替換成 sql expression
即可。下面主要介紹通過session操作模型類的方法。但只有session具備操作模型類的能力。
session的四種狀態
ORM模型很方便地將數據庫中的一條條記錄轉變成了python中的一個個對象,有時候我們會想當然地把兩者完全等同起來,但是不要忘了,兩者之間還必須有session這個中間的橋梁。因為有session在中間做控制,所以必須注目對象和記錄之間一個狀態上的差別。一般而言,一個數據的對象可以有四種不同的和session關聯的狀態。從代碼的流程上看:
from sqlalchemy.orm import sessionmaker
DBSession = sessionmaker(bind=engine)
session = DBSession() #創建session對象
frank = Person(name='Frank') #數據對象得到創建,此時為Transient狀態
session.add(frank) #數據對象被關聯到session上,此時為Pending狀態
session.commit() #數據對象被推到數據庫中,此時為Persistent狀態
session.close() #關閉session對象
print (frank.name) #此時會報錯DetachedInstanceError,因為此時是Detached狀態。
new_session = DBSession()
print (new_session.query(Person).get(1).name) #可以查詢到數據
new_session.close()
四個對象的狀態分別是上面四種,Transient/Pending/Persistent/Detached。其中需要比較注意的是Detached狀態。並不是我在python中創建了一個數據記錄的對象我就可以沒有限制地訪問它,可以看到訪問繼承自Base類的這么一個對象時必須要使其身處一個session的上下文中,否則是報錯的!
2 添加數據
添加基本上就是生成模型類對象然后添加到數據庫表中:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBsession = sessionmaker(bind=engine)
session = DBsession()
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(100))
age = Column(Integer)
address = Column(String(100))
student1 = Student(id=1001, name='ling', age=25, address="beijing")
student2 = Student(id=1002, name='molin', age=18, address="jiangxi")
student3 = Student(id=1003, name='karl', age=16, address="suzhou")
session.add_all([student1, student2, student3])
session.commit()
session.close()
3 更新數據
更新不需要模型類對象,也不生成模型對象,只返回true或者false。
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
# option1 查出來在一個個更改
my_stdent = session.query(Student).filter(Student.id == 1002).first()
my_stdent.name = "fengxiaoqing"
my_stdent.address = "chengde"
session.commit()
# option2 使用update方法和更新字典。更改並不返回查詢對象,而是返回True或者False
update_dict = {
"name" : "fengxiaoqing",
"address" : "chengde"}
res = session.query(Student).filter(Student.id == 1002).update(update_dict)
student1 = session.query(Student).filter(Student.id == 1002).first()
print(student1.name, student1.address)
結果:
MariaDB [sqlalchemy]> select * from student;
+---------+---------------+------+---------+
| id | name | age | address |
+---------+---------------+------+---------+
| 1002 | molin | 18 | jiangxi |
| 1003 | karl | 16 | suzhou |
| 100011 | fengxiaoqing | 18 | chengde |
| 100021 | fengxiaqing1 | 181 | chengde |
| 1000111 | fengxiaoqing | 18 | chengde |
| 1000211 | fengxiaqing1 | 181 | chengde |
| 1000311 | fengxiaoqing2 | 182 | chengde |
+---------+---------------+------+---------+
7 rows in set (0.00 sec)
MariaDB [sqlalchemy]> select * from student;
+---------+---------------+------+---------+
| id | name | age | address |
+---------+---------------+------+---------+
| 1002 | fengxiaoqing | 18 | chengde |
| 1003 | karl | 16 | suzhou |
| 100011 | fengxiaoqing | 18 | chengde |
| 100021 | fengxiaqing1 | 181 | chengde |
| 1000111 | fengxiaoqing | 18 | chengde |
| 1000211 | fengxiaqing1 | 181 | chengde |
| 1000311 | fengxiaoqing2 | 182 | chengde |
+---------+---------------+------+---------+
7 rows in set (0.00 sec)
MariaDB [sqlalchemy]>
5 刪除數據
刪除其實也是跟查詢相關的,直接查出來,調用delete()方法直接就可以刪除掉。
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
session.query(Student).filter(Student.id == 1001).delete()
session.commit()
session.close()
6 query查詢
6.1 filter_by() 函數
通過Session的query()方法創建一個查詢對象。經過查詢后會返回一個模型類對象(或多個對象列表)。
這個函數的參數數量是可變的,參數可以是任何類或者是類的描述的集合。下面來看一個例子:
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
my_stdent = session.query(Student).filter_by(name="fengxiaoqing2").first()
print(my_stdent)
結果:
<__main__.Student object at 0x032745F0>
前面我們在賦值的時候,我們可以通過實例化一個對象,然后直接映射到數據庫中,那我們在查詢出來的數據sqlalchemy直接給映射成一個對象了(或者是每個元素為這種對象的列表),對象和我們創建表時候的class是一致的,我們就也可以直接通過對象的屬性就可以直接調用就可以了。
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
my_stdent = session.query(Student).filter_by(name="fengxiaoqing2").first()
print(my_stdent.id,my_stdent.name,my_stdent.age,my_stdent.address)
結果:
1000311 fengxiaoqing2 182 chengde
6.2 filter() 函數
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
sql = session.query(Student).filter(Student.name.like("%feng%"))
print(sql)
my_student = sql = session.query(Student).filter(Student.name.like("%feng%")).all()
print(my_student)
結果:
SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age, student.address AS student_address
FROM student
WHERE student.name LIKE %s
<__main__.Student object at 0x032745F0>
filter如果不加上all()或firtst()的話。結果就是一個sql語句。
filter函數中的其他過濾操作:
>>> equals:
query(Student).filter(Student.id == 10001)
query(Address).filter(Address.user == None)
>>> not equals:
query(Student).filter(Student.id != 100)
query(Address).filter(Address.user != None)
>>> LIKE:
query(Student).filter(Student.name.like(“%feng%”))
>>> IN:
query(Student).filter(Student.name.in_(['feng', 'xiao', 'qing']))
>>> NOT IN:
query(Student).filter(~Student.name.in_(['feng', 'xiao', 'qing']))
>>> CONTAIN:
query(User).filter(User.addresses.contains(address)) # 篩選包含某地址的用戶
>>> AND:
from sqlalchemy import and_
query(Student).filter(and_(Student.name == 'fengxiaoqing', Student.id ==10001))
或者
query(Student).filter(Student.name == 'fengxiaoqing').filter(Student.address == 'chengde')
>>> OR:
from sqlalchemy import or_
query.filter(or_(Student.name == 'fengxiaoqing', Student.age ==18))
6.3 filter() 和 filter_by() 的區別
filter: 可以像寫 sql 的 where 條件那樣寫 > < 等條件,但使用列名時,需要通過 類名.屬性名
的方式。
filter_by: 可以使用 python 的正常參數傳遞方法傳遞條件,指定列名時不需要額外指定類名。參數名對應名類中的屬性名,但不能使用 > < 等條件。
當使用filter的時候條件之間是使用“==",fitler_by使用的是"="。
user1 = session.query(User).filter_by(id=1).first()
user1 = session.query(User).filter(User.id==1).first()
filter不支持組合查詢,只能連續調用filter來變相實現。
filter_by的參數是**kwargs,直接支持組合查詢。
q = session.query(IS).filter(IS.node == node and IS.password == password).all()
q = session.query(IS).filter_by(node = node and password = password).all()
6.4 all() 函數
all() 返回一個模型類組成的列表
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
my_stdent = session.query(Student).filter(Student.name.like("%feng%")).all()
print(my_stdent)
結果:
[<__main__.Student object at 0x031405B0>, <__main__.Student object at 0x030FCA70>, <__main__.Student object at 0x031405F0>]
可以通過遍歷列表來獲取每個對象。
one() 返回且僅返回一個查詢結果。當結果的數量不足一個或者多於一個時會報錯。
first() 返回至多一個結果,而且以單項形式。當不足一個時返回為None(推薦)
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
my_stdent = session.query(Student).filter(Student.name.like("%feng%")).first()
print(my_stdent)
結果:
<__main__.Student object at 0x030A3610>
6.5 query 對象的其他返回方式
除了以上提到的 all() 和 first() 以外,查詢對象還有很多的顯示方法。
-
query() 以某些字段作為參數
session.query(Student.id,Student.name).filter(Student.name.like("F%")).all() # from sqlalchemy.orm import load_only 也可以完成類似的功能 session.query(Student).options(load_only(Student.id,Student.name)).filter(Student.name.like("F%")).all()
-
one() 如果返回行數不為1,那么就報錯;若剛好返回結果就一條就返回這條記錄的對象
my_stdent = session.query(Student).filter(Student.id = 151101).one()
-
get(k) 函數獲取固定主鍵結果
my_stdent = session.query(Student).get(151101)
-
limit(n) 最多只返回n條結果
session.query(Student).limit(10).all()
-
offset(n) 直接跳過前n條記錄,從n+1條開始返回
session.query(Student).offset(5).all()
-
order_by(Table.attribute 或者 'attribute') 返回結果按照給出的字段排序。
session.query(Student).limit(10).order_by(Student.id).all()
-
order_by(User.name.desc()) 或者 order_by('name desc') 返回結果按照給出的字段的降序排序。
session.query(User).filter(User.id > 1).order_by(User.id.desc()).offset(5).limit(3).all()
-
filter(condition1).filter(condition2) 多個拼接的filter就相當於and_(condition1,condition2...)
請注意以上所有方法都要在all()之前調用,get()除外。
-
還可以些數據庫自帶的函數,在用之前記得
from sqlalchemy import func
,就可以通過func來調用了。這些函數不是放在調用鏈中,大多數時候都是放在query方法的參數位置。from sqlalchemy import func session.query(Relation.user_id, func.count(Relation.target_user_id)) .group_by(Relation.user_id).all()
6.6 統計、分組、排序
這些函數可以結合filter使用也可以直接用。
1 統計 count()
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
print(session.query(Student).filter(Student.name.like("%feng%")).count())
2 分組 group_by()
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
std_group_by = session.query(Student).group_by(Student.age)
print(std_group_by)
std_group_by_list = std_group_by = session.query(Student).group_by(Student.age).all()
for i in std_group_by_list:
print(i.id)
結果的sql語句如下:
SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age, student.address AS student_address
FROM student GROUP BY student.age
1002
100011
100021
1000111
1000211
1000311
3 反序 desc()
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
address = Column(String(100))
engine = create_engine('mysql+pymysql://fxq:123456@192.168.100.101/sqlalchemy')
DBSession = sessionmaker(bind=engine)
session = DBSession()
std_ord_desc = session.query(Student).filter(Student.name.like("%feng%")).order_by(Student.id.desc()).all()
for i in std_ord_desc:
print(i.id)
結果:
1000311
1000211
1000111
100021
100011
1002
7 通過 session+model 多表查詢
1 模型類的relationship屬性
from sqlalchemy import create_engine,Column,String,Integer,ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship
class User(Base):
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(20),nullable=False)
addresses = relationship('Address') # 注意,這里是模型類名
__repr__(self):
return self.name
class Address(Base):
__tablename__ = 'address'
id = Column(Integer,primary_key=True)
address = Column(String(20),nullable=False)
user_id = Column(Integer,ForeignKey('users.id')) # 注意,這里是表名
user = relationship('User') # 注意,這里是模型類名
__repr__(self):
return self.address
經過上面的對表的定義,數據庫中的users和address兩張表就通過外鍵有了聯系,為了利用好這種聯系,我們就可以靈活運用User類中的addresses屬性和Address類中的user屬性了。
在類內直接調用relationship定義的屬性,就可以直接訪問所鏈接到的模型類的__repr__(self)
方法的返回結果。
name = session.query(Address).filter_by(id=xxx).first().user
# 可以根據address_id查到對應的user_name
# 因為User類的__repr__(self)方法返回的就是name屬性
name = session.query(Address).filter_by(id=xxx).first().user.name
# 當然也可以直接按照屬性查找,效果是相等的
如果想通過一個名字直接搜到他的所有郵箱地址,那么就可以直接調用屬性:
address = session.query(User).filter_by(name="xxx").first().addresses
神奇的一點是,SQLAlchemy會根據關系的對應情況自動給關系相關屬性的類型。比如這里的User下面的addresses自動是一個list類型,而Address下面的user由於設定了外鍵的緣故,一個地址最多只能應對一個用戶,所以自動識別成一個單一對象類型。
2 使用 backref 指明 relationship
上面的例子中,我們分別為User和Address兩個類分別添加了連到對方的關系。這么寫好像略顯繁瑣一點,用from sqlalchemy.orm import backref
的這個backref可以更加方便地一次性寫清雙向的關系。這就是直接把backref='user'
作為參數添加在addresses = relationship('Adress',backref='user')
中。
from sqlalchemy import create_engine,Column,String,Integer,ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship, backref
class User(Base):
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(20),nullable=False)
class Address(Base):
__tablename__ = 'address'
id = Column(Integer,primary_key=True)
address = Column(String(20),nullable=False)
user_id = Column(Integer,ForeignKey('users.id')) # 注意,這里是表名
user = relationship('User',backref='address') # 注意,這里是模型類名
關於relationship方法的一些參數
relationship方法除了傳遞一個類名和可以加上backref之外,還有一些其他的參數。比如還有uselist=True/False這樣一個參數。讓uselist成為False之后,所有原先因為有多條結果而成為list類型的關系屬性,比如上面例子中的User類下的addresses這個屬性,會變成一個單個的變量。同時對於確實是有多個結果的情況系統會給出一個警告。總之,讓uselist=False之后可以讓這個關系每次通過關系來調用數據的時候都是fetchone而不是fetchall。
relationship還有另外一個參數,就是lazy。這個參數更加復雜一些。一般而言,就像上面的例子一樣,我們通過關系屬性來通過一個表的查詢對象查詢另一個表中的數據,形式上來說是直接.addresses就得到結果了。這是lazy="select"(默認是select)的結果。有時候,直接這么獲得的數據比較多比較大,比如一個人有成千上萬個郵箱地址的話,或許我不想這么直白地列出所有數據,而是對這些數據做一個二次加工。這時候就要用到lazy="dynamic"模式。在這個模式下,.addresses得到的仍然是一個query對象,所以我們可以進一步調用filter之類的方法來縮小范圍。為了提高lazy的使用正確性,SQLAlchemy還規定了,不能再多對一,一對一以及uselist=False的這些模式的關系中使用。
3 多對多關系
在現實問題中還有很多多對多關系,比如老師和班級,一個老師可能在很多班級任教,而一個班級顯然也有很多不同科目的老師。這種就是多對多關系。
在數據庫中通常通過關系表來指明多對多關系,模型類中也一樣。在老師和班級的關系中,在老師和班級表中都不能設置外鍵,一旦設置外鍵就表明這個每一行這個字段的值只能對應一個值了,又變回一對多關系。總之多對多關系可以像下例中一樣構造:
class Class(Base):
__tablename__ = 'class'
class_id = Column(Integer,primary_key=True)
name = Column(String(20),nullable=False)
#這里不能加teacher = Column(Integer,ForeignKey('teacher.teacher_id'))之類的字段
class_teacher = relationship('ClassTeacher',backref='class')
class Teacher(Base):
__tablename__ = 'teacher'
teacher_id = Column(Integer,primary_key=True)
name = Column(String(20),nullable=False)
#同樣,這里也不用加class = xxx
teacher_class = relationship('ClassTeacher',backref='teacher')
class ClassTeacher(Base):
__tablename__ = 'class_teacher'
#這就是所謂的一張視圖表?沒有實際存在數據,但是憑借關系型數據庫的特點可以體現出一些數據關系
teacher_id = Column(Interger,ForeignKey('teacher.teacher_id'),primary_key=True)
class_id = Column(Interger,ForeignKey('class.class_id'),primary_key=True)
#這張第三表中有兩個主鍵,表示不能有class_id和teacher_id都相同的兩項
可以看到,通過第三表做橋梁,把多對多關系架構了起來。實際運用可以參考下面:
class = session.query(Class).filter(Class.name == '三年二班').first()
for class_teacher_rel in class.class_teacher:
print class_teacher_rel.teacher.name
8 聯表查詢
1. join查詢
students = session.query(Student).join(Class).filter(Class.level == 3).all()
for student in students:
print stduent.name
Query.join()
知道如何在 Student
和 Class
之間進行連接,因為我們設定了外鍵。
假如我們沒有指定外鍵,比如這樣:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
fullname = Column(String(50))
password = Column(String(12))
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer)
我們可以用下面方法來讓 join
生效:
query.join(Address, User.id==Address.user_id) # explicit condition
query.join(User.addresses) # specify relationship from left to right
query.join(Address, User.addresses) # same, with explicit target
query.join('addresses') # same, using a string
例子:
session.query(User).\
join(Address, User.id==Address.user_id).\
filter(Address.email_address=='jack@google.com').all()
2. 子查詢(subquery)
現在需要查詢每個用戶所擁有的郵箱地址數量,思路是先對 addresses 表按用戶 ID 分組,統計各組數量,這樣我們得到一張新表;然后用 JOIN 連接新表和 users 兩個表,在這里,我們應該使用 LEFT OUTER JOIN,因為使用 INTER JOIN 所得出的新表只包含兩表的交集。
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').\
label('address_count')).\
group_by(Address.user_id).subquery()
for u, count in session.query(User, stmt.c.address_count).\
outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
print(u, count)
# 執行結果
ed None
wendy None
mary None
fred None
jack 2
1 . 如果上面的暫時看不懂,我們先來看看第一個 stmt 的情況。
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').\
label('address_count')).\
group_by(Address.user_id).all()
for i in stmt:
print(i)
# 執行結果
(5, 2)
2 . 可以理解成 group_by()
方法生成了一張新的表,該表有兩列,第一列是 user_id ,第二列是該 user_id 所擁有的 addresses 的數量,這個值由 func()
跟着的方法產生,我們可以使用 c()
方法來訪問這個值。
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').\
label('address_count')).\
group_by(Address.user_id).subquery()
q = session.query(User, stmt.c.address_count).\
outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id).all()
for i in q:
print(i)
# 執行結果
(ed, None)
(wendy, None)
(mary, None)
(fred, None)
(jack, 2)
如果不用 outerjoin()
而使用 join()
,就等於使用 SQL 中的 INTER JOIN,所得出的表只為兩者交集,不會包含 None 值的列。
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').\
label('address_count')).\
group_by(Address.user_id).subquery()
q = session.query(User, stmt.c.address_count).\
join(stmt, User.id==stmt.c.user_id).order_by(User.id).all()
for i in q:
print(i)
# 執行結果
(jack, 2)
3.使用別名(aliased)
SQLAlchemy 使用 aliased()
方法表示別名,當我們需要把同一張表連接多次的時候,常常需要用到別名。
from sqlalchemy.orm import aliased
# 把 Address 表分別設置別名
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
session.query(User.name, adalias1.email_address, adalias2.email_address).\
join(adalias1, User.addresses).\
join(adalias2, User.addresses).\
filter(adalias1.email_address=='jack@google.com').\
filter(adalias2.email_address=='j25@yahoo.com'):
print(username, email1, email2)
# 執行結果
jack jack@google.com j25@yahoo.com
上述代碼查詢同時擁有兩個名為:"jack@google.com" 和 "j25@yahoo.com" 郵箱地址的用戶。
別名也可以在子查詢里使用:
from sqlalchemy.orm import aliased
stmt = session.query(Address).\
filter(Address.email_address != 'j25@yahoo.com').subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).join(adalias, User.addresses):
print(user)
print(address)
# 執行結果
jack
jack@google.com
4. EXISTS 關鍵字
EXISTS 關鍵字可以在某些場景替代 JOIN 的使用。
from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
print(name)
# 執行結果
jack
使用 any()
方法也能得到同意的效果:
for name, in session.query(User.name).filter(User.addresses.any()):
print(name)
使用 any()
方法時也可加上查詢條件:
for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%'))):
print(name)
使用 has()
方法也能起到 JOIN 的作用:
session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
注意:這里的 ~
符號是 “不” 的意思。
參考鏈接:
# Engines and Connections
https://www.jianshu.com/p/e6bba189fcbd
https://www.jianshu.com/p/d1fea79027f3
# session
https://www.jianshu.com/p/aa1241c41ef3
https://www.jianshu.com/p/0ad18fdd7eed
https://www.cnblogs.com/franknihao/p/7268752.html
# 官方文檔
https://docs.sqlalchemy.org/en/14/core/tutorial.html # SQL Expression Language Tutorial
https://docs.sqlalchemy.org/en/14/core/metadata.html # Describing Databases with MetaData
https://docs.sqlalchemy.org/en/14/core/type_basics.html # Column and Data Types
https://docs.sqlalchemy.org/en/14/core/connections.html # Working with Engines and Connections
https://docs.sqlalchemy.org/en/14/orm/session.html # Using the Session