原文鏈接:http://www.cnblogs.com/iwangzc/p/4112078.html(感謝作者的分享)
sqlalchemy 官方文檔:http://docs.sqlalchemy.org/en/latest/contents.html
1.版本檢查
import sqlalchemy
sqlalchemy.__version__
2.連接
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:',echo=True)
echo參數為True時,會顯示每條執行的SQL語句,可以關閉。create_engine()返回一個Engine的實例,並且它表示通過數據庫語法處理細節的核心接口,在這種情況下,數據庫語法將會被解釋稱Python的類方法。
3.聲明映像
當使用ORM【1】時,構造進程首先描述數據庫的表,然后定義我們用來映射那些表的類。在現版本的SQLAlchemy中,這兩個任務通常一起執行,通過使用Declarative方法,我們可以創建一些包含描述要被映射的實際數據庫表的准則的映射類。
使用Declarative方法定義的映射類依據一個基類,這個基類是維系類和數據表關系的目錄——我們所說的Declarative base class。在一個普通的模塊入口中,應用通常只需要有一個base的實例。我們通過declarative_base()功能創建一個基類:
from sqlalchemy.ext.declarativeimportdeclarative_base
Base = declarative_base()
有了這個base,我們可以依據這個base定義任意數量的映射類。一個簡單的user例子:
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__= 'users'
id= Column(Integer, primary_key=True)
name = Column(String)
用Declarative構造的一個類至少需要一個__tablename__屬性,一個主鍵行。
4.構造模式(項目中沒用到)
5.創建映射類的實例
ed_user = User(name='ed',fullname='Ed Jones', password='edspassword')
6.創建會話
現在我們已經准備毫和數據庫開始會話了。ORM通過Session與數據庫建立連接的。當應用第一次載入時,我們定義一個Session類(聲明create_engine()的同時),這個Session類為新的Session對象提供工廠服務。
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
這個定制的Session類會創建綁定到數據庫的Session對象。如果需要和數據庫建立連接,只需要實例化一個Session:
session = Session()
雖然上面的Session已經和數據庫引擎Engine關聯,但是還沒有打開任何連接。當它第一次被使用時,就會從Engine維護的一個連接池中檢索是否存在連接,如果存在便會保持連接知道我們提交所有更改並且/或者關閉session對象。
7.添加新對象(簡略)
ed_user = User(name='ed', fullname='Ed Jones', password='edspassword')
session.add(ed_user)
至此,我們可以認為,新添加的這個對象實例仍在等待中;ed_user對象現在並不代表數據庫中的一行數據。直到使用flush進程,Session才會讓SQL保持連接。如果查詢這條數據的話,所有等待信息會被第一時間刷新,查詢結果也會立即發行。
session.commit()
通過commit()可以提交所有剩余的更改到數據庫。
8.回滾
session.rollback()
9.查詢
通過Session的query()方法創建一個查詢對象。這個函數的參數數量是可變的,參數可以是任何類或者是類的描述的集合。下面是一個迭代輸出User類的例子:
for instance in session.query(User).order_by(User.id):
print instance.name,instance.fullname
Query也支持ORM描述作為參數。任何時候,多個類的實體或者是基於列的實體表達都可以作為query()函數的參數,返回類型是元組:
for name, fullname in session.query(User.name,User.fullname):
print name, fullname
Query返回的元組被命名為KeyedTuple類的實例元組。並且可以把它當成一個普通的Python數據類操作。元組的名字就相當於屬性的屬性名,類的類名一樣。
for row in session.query(User, User.name).all():
print row.User,row.name
<User(name='ed',fullname='Ed Jones', password='f8s7ccs')>ed
label()不知道怎么解釋,看下例子就明白了。相當於row.name
for row in session.query(User.name.label('name_label')).all():
print(row.name_label)
aliased()我的理解是類的別名,如果有多個實體都要查詢一個類,可以用aliased()
from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias,user_alias.name).all():
print row.user_alias
Query的 基本操作包括LIMIT和OFFSET,使用Python數組切片和ORDERBY結合可以讓操作變得很方便。
for u in session.query(User).order_by(User.id)[1:3]:
#只查詢第二條和第三條數據
9.1使用關鍵字變量過濾查詢結果,filter 和 filter_by都適用。【2】使用很簡單,下面列出幾個常用的操作:
query.filter(User.name == 'ed') #equals
query.filter(User.name != 'ed') #not equals
query.filter(User.name.like('%ed%')) #LIKE
uery.filter(User.name.in_(['ed','wendy', 'jack'])) #IN
query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))#IN
query.filter(~User.name.in_(['ed','wendy', 'jack']))#not IN
query.filter(User.name == None)#is None
query.filter(User.name != None)#not None
from sqlalchemy import and_
query.filter(and_(User.name =='ed',User.fullname =='Ed Jones')) # and
query.filter(User.name == 'ed',User.fullname =='Ed Jones') # and
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# and
from sqlalchemy import or_
query.filter(or_(User.name =='ed', User.name =='wendy')) #or
query.filter(User.name.match('wendy')) #match
9.2.返回列表和數量(標量?)
all()返回一個列表:可以進行Python列表的操作。
query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
query.all()
[<User(name='ed',fullname='EdJones', password='f8s7ccs')>,<User(name='fred', fullname='FredFlinstone', password='blah')>]
first()適用於限制一個情況,返回查詢到的第一個結果作為標量?:好像只能作為屬性,類
query.first()
<User(name='ed',fullname='Ed Jones', password='f8s7ccs')>
one()完全獲取所有行,並且如果查詢到的不只有一個對象或是有復合行,就會拋出異常。
from sqlalchemy.orm.exc import MultipleResultsFound
user = query.one()
try:
user = query.one()
except MultipleResultsFound, e:
print e
Multiple rows were found for one()
如果一行也沒有:
from sqlalchemy.orm.exc import NoResultFound
try:
user = query.filter(User.id == 99).one()
except NoResultFound, e:
print e
No row was found for one()
one()方法對於想要解決“no items found”和“multiple items found”是不同的系統是極好的。(這句有語病啊)例如web服務返回,本來是在no results found情況下返回”404“的,結果在多個results found情況下也會跑出一個應用異常。
scalar()作為one()方法的依據,並且在one()成功基礎上返回行的第一列。
query = session.query(User.id).filter(User.name == 'ed')
query.scalar()
7
9.3.使用字符串SQL
字符串能使Query更加靈活,通過text()構造指定字符串的使用,這種方法可以用在很多方法中,像filter()和order_by()。
from sqlalchemy import text
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all()
綁定參數可以指定字符串,用params()方法指定數值。
session.query(User).filter(text("id<:value and name=:name")).\
params(value=224, name='fred').order_by(User.id).one()
如果要用一個完整的SQL語句,可以使用from_statement()。
ession.query(User).from_statement(text("SELECT* FROM users where name=:name")).\
params(name='ed').all()
也可以用from_statement()獲取完整的”raw”,用字符名確定希望被查詢的特定列:
session.query("id","name", "thenumber12").\
from_statement(text("SELECT id, name, 12 as ""thenumber12 FROM users where name=:name")).\
params(name='ed').all()
[(1,u'ed', 12)]
感覺這個不太符合ORM的思想啊。。。
9.4 計數
count()用來統計查詢結果的數量。
session.query(User).filter(User.name.like('%ed')).count()
func.count()方法比count()更高級一點【3】
from sqlalchemy import func
session.query(func.count(User.name),User.name).group_by(User.name).all()
[(1,u'ed'), (1,u'fred'), (1,u'mary'), (1,u'wendy')]
為了實現簡單計數SELECT count(*) FROM table,可以這么寫:
session.query(func.count('*')).select_from(User).scalar()
如果我們明確表達計數是根據User表的主鍵的話,可以省略select_from(User):
session.query(func.count(User.id)).scalar()
上面兩行結果均為4。
Go to (下)
10.建立聯系(外鍵)
是時候考慮怎樣映射和查詢一個和Users表關聯的第二張表了。假設我們系統的用戶可以存儲任意數量的email地址。我們需要定義一個新表Address與User相關聯。
from sqlalchemyimport ForeignKey
from sqlalchemy.ormimport relationship, backref
class Address(Base):
__tablename__ = 'addresses'
id= Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", backref=backref('addresses',order_by=id))
def__repr__(self):
return"<Address(email_address='%s')>"%self.email_address
構造類和外鍵簡單,就不過多贅述。主要說明以下relationship()函數:這個函數告訴ORM,Address類應該和User類連接起來,通過使用addresses.user。relationship()使用外鍵明確這兩張表的關系。決定Adderess.user屬性是多對一的。relationship()的子函數backref()提供表達反向關系的細節:relationship()對象的集合被User.address引用。多對一的反向關系總是一對多。更多的細節參考Basic RelRational Patterns。
這兩個互補關系:Address.user和User.addresses被稱為雙向關系。這是SQLAlchemy ORM的一個非常關鍵的功能。更多關系backref的細節參見Linking Relationships with Backref。
假設聲明的方法已經開始使用,relationship()中和其他類關聯的參數可以通過strings指定。在上文的User類中,一旦所有映射成功,為了產生實際的參數,這些字符串會被當做Python的表達式。下面是一個在User類中創建雙向聯系的例子:
class User(Base):
addresses = relationship("Address", order_by="Address.id", backref="user")
一些知識:
在大多數的外鍵約束(盡管不是所有的)關系數據庫只能鏈接到一個主鍵列,或具有唯一約束的列。
外鍵約束如果是指向多個列的主鍵,並且它本身也具有多列,這種被稱為“復合外鍵”。
外鍵列可以自動更新自己來相應它所引用的行或者列。這被稱為級聯,是一種建立在關系數據庫的功能。
外鍵可以參考自己的表格。這種被稱為“自引”外鍵。
我們需要在數據庫中創建一個addresses表,所以我們會創建另一個元數據,這將會跳過已經創建的表。
11.操作主外鍵關聯的對象
現在我們已經在User類中創建了一個空的addresser集合,可變集合類型,例如set和dict,都可以用,但是默認的集合類型是list。
jack = User(name='jack', fullname='Jack Bean', password='gjffdd')
jack.addresses
[]
現在可以直接在User對象中添加Address對象。只需要指定一個完整的列表:
jack.addresses = [Address(email_address='jack@google.com'),Address(email_address='j25@yahoo.com')]
當使用雙向關系時,元素在一個類中被添加后便會自動在另一個類中添加。這種行為發生在Python的更改事件屬性中而不是用SQL語句:
>>> jack.addresses[1]
<Address(email_address='j25@yahoo.com')>
>>> jack.addresses[1].user
<User(name='jack', fullname='Jack Bean', password='gjffdd')>
把jack提交到數據庫中,再次查詢Jack,(No SQL is yet issued for Jack’s addresses:)這句實在是翻譯不了了,看看代碼就明白是什么意思:
>>> jack = session.query(User).\
...
filter_by(name='jack').one()
>>> jack
<User(name='jack',fullname='Jack Bean', password='gjffdd')>
>>>jack.addresses
[<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]
當我們訪問uaddresses集合時,SQL會被突然執行,這是一個延遲加載(lazy loading)關系的典型例子。現在addresses集合加載完成並且可以像對待普通列表一樣對其進行操作。以后我們會優化這種加載方式。
12.使用JOINS查詢
現在我們有了兩張表,可以進行更多的查詢操作,特別是怎樣對兩張表同時進行查詢,Wikipediapage on SQL JOIN提供了很詳細的說明,其中一些我們將在這里說明。之前用Query.filter()時,我們已經用過JOIN了,filter是一種簡單的隱式join:
>>>for u, a in session.query(User, Address).filter(User.id==Address.user_id).filter(Address.email_address=='jack@google.com').all():
print u
print a
<User(name='jack',fullname='JackBean', password='gjffdd')>
<Address(email_address='jack@google.com')>
用Query.join()方法會更加簡單:
>>>session.query(User).join(Address).\
... filter(Address.email_address=='jack@google.com').\
... all()
[<User(name='jack',fullname='JackBean', password='gjffdd')>]
之所以Query.join()知道怎么join兩張表是因為它們之間只有一個外鍵。如果兩張表中沒有外鍵或者有一個以上的外鍵,當下列幾種形式使用的時候,Query.join()可以表現的更好:
query.join(Address,User.id==Address.user_id)# 明確的條件
query.join(User.addresses)# 指定從左到右的關系
query.join(Address,User.addresses) #同樣,有明確的目標
query.join('addresses') # 同樣,使用字符串
outerjoin()和join()用法相同
query.outerjoin(User.addresses)# LEFT OUTER JOIN
12.1使用別名
當在多個表中查詢時,如果同一張表需要被引用好幾次,SQL通常要求對這個表起一個別名,因此,SQL可以區分對這個表進行的其他操作。Query也支持別名的操作。下面我們joinAddress實體兩次,找到同時擁有兩個不同email的用戶:
>>>from sqlalchemy.ormimport aliased
>>>adalias1 = aliased(Address)
>>>adalias2 = aliased(Address)
>>>for username, email1, email2 in\
... session.query(User.name,adalias1.email_address,adalias2.email_address).\
... join(adalias1, User.addresses).\
... join(adalias2, User.addresses).\
... filter(adalias1.email_address=='jack@google.com').\
... filter(adalias2.email_address=='j25@yahoo.com'):
... print username, email1, email2
jack jack@google.com j25@yahoo.com
12.1使用子查詢(暫時理解不了啊,多看代碼研究吧:()
from sqlalchemy.sqlimport func
stmt = session.query(Address.user_id,func.count('*').\
... label('address_count')).\
... group_by(Address.user_id).subquery()
>>> for u, count in session.query(User,stmt.c.address_count).\
... outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
print u, count
<User(name='ed',fullname='EdJones', password='f8s7ccs')> None
<User(name='wendy',fullname='Wendy Williams', password='foobar')> None
<User(name='mary',fullname='Mary Contrary', password='xxg527')> None
<User(name='fred',fullname='Fred Flinstone', password='blah')> None
<User(name='jack',fullname='Jack Bean', password='gjffdd')> 2
12.2從子查詢中選擇實體?
上面的代碼中我們只返回了包含子查詢的一個列的結果。如果想要子查詢映射到一個實體的話,使用aliased()設置一個要映射類的子查詢別名:
>>> stmt = session.query(Address).\
... filter(Address.email_address!= 'j25@yahoo.com').\
... subquery()
>>> adalias = aliased(Address, stmt) #?為什么有兩個參數?
>>> for user, address in session.query(User, adalias).\
... join(adalias, User.addresses):
... print user
... print address
<User(name='jack',fullname='Jack Bean', password='gjffdd')>
<Address(email_address='jack@google.com')>
12.3使用EXISTS(存在?)
如果表達式返回任何行,EXISTS為真,這是一個布爾值。它可以用在jions中,也可以用來定位在一個關系表中沒有相應行的情況:
>>>from sqlalchemy.sqlimport exists
>>> stmt = exists().where(Address.user_id==User.id)
>>>for name, in session.query(User.name).filter(stmt):
print name
jack
等價於:
>>>for name, in session.query(User.name).\
... filter(User.addresses.any()):
... print name
jack
any()限制行匹配:
>>>for name, in session.query(User.name).\
... filter(User.addresses.any(Address.email_address.like('%google%'))):
... print name
jack
has()和any()一樣在應對多對一關系的情況下(注意“~“意味着”NOT”)
>>> session.query(Address).\
... filter(~Address.user.has(User.name=='jack')).all()
[]
12.4 常見的關系運算符
== != None 都是用在多對一中,而contains()用在一對多的集合中:
query.filter(Address.user == someuser)
query.filter(User.addresses.contains(someaddress))
Any()(用於集合中):
query.filter(User.addresses.any(Address.email_address == 'bar'))#also takes keyword arguments:
query.filter(User.addresses.any(email_address='bar'))
as()(用在標量?不在集合中):
query.filter(Address.user.has(name='ed'))
Query.with_parent()(所有關系都適用):
session.query(Address).with_parent(someuser,'addresses')
13 預先加載(跟性能有關)和lazy loading相對,建議直接查看文檔吧
待補充。。。
