轉載:https://www.cnblogs.com/harrychinese/archive/2012/09/12/My_Own_Tutorial_For_SqlAlchemy.html
使用 sqlalchemy 有3種方式:
方式1, 使用raw sql;
方式2, 使用SqlAlchemy的sql expression;
方式3, 使用ORM.
前兩種方式可以統稱為 core 方式. 本文講解 core 方式訪問數據庫, 不涉及 ORM.
對於絕大多數應用, 推薦使用 SqlAlchemy. 即使是使用raw sql, SqlAlchemy 也可以帶來如下好處:
1. 內建數據庫連接池. [注意]如果是sqlalchemy+cx_oracle的話, 需要禁掉 connection pool, 否則會有異常. 方法是設置sqlalchemy.poolclass為sqlalchemy.pool.NullPool
2. 強大的log功能
3. 數據庫中立的寫法, 包括: sql參數寫法, limit語法
4. 特別提一下, where()條件的==your_value, 如果your_value等於None, 真正的Sql會轉為Is None
SqlAlchemy的sql expression和raw sql的比較:
1. sql expression 寫法是純python代碼, 閱讀性更好, 尤其是在使用insert()方法時, 字段名和取值成對出現.
2. raw sql 比 sql expression 更靈活, 如果SQL/DDL很復雜, raw sql就更有優勢了.
==============================
sqlalchemy 超簡單教程
==============================
http://solovyov.net/en/2011/04/23/basic-sqlalchemy/
http://flask.pocoo.org/docs/patterns/sqlalchemy/#sql-abstraction-layer
http://www.blog.pythonlibrary.org/2010/09/10/sqlalchemy-connecting-to-pre-existing-databases/
http://www.blog.pythonlibrary.org/2010/02/03/another-step-by-step-sqlalchemy-tutorial-part-1-of-2/
http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html
http://mapfish.org/doc/tutorials/sqlalchemy.html
sqlalchemy 官網的pdf文檔, 可以作為 reference 使用
==============================
常用的數據庫連接字符串
==============================
#sqlite
sqlite_db = create_engine('sqlite:////absolute/path/database.db3')
sqlite_db = create_engine('sqlite://') # in-memory database
sqlite_db = create_engine('sqlite:///:memory:') # in-memory database
# postgresql
pg_db = create_engine('postgres://scott:tiger@localhost/mydatabase')
# mysql
mysql_db = create_engine('mysql://scott:tiger@localhost/mydatabase')
# oracle
oracle_db = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
# oracle via TNS name
oracle_db = create_engine('oracle://scott:tiger@tnsname')
# mssql using ODBC datasource names. PyODBC is the default driver.
mssql_db = create_engine('mssql://mydsn')
mssql_db = create_engine('mssql://scott:tiger@mydsn')
# firebird
firebird_db = create_engine('firebird://scott:tiger@localhost/sometest.gdm')
==============================
關於一些非主流數據庫缺少DB API接口的問題
==============================
比如teradata, 沒有專門的DB API實現, 但 odbc driver肯定會提供的, 否則就無法在江湖上混了. pypyodbc + ODBC driver 應該是一個選項. pypyodbc 和 pyodbc接口一致, 同時它是純 python實現, 理論上應該支持 python/ironpython/jython. 另外, 我猜想sqlalchemy應該能基於這一組合訪問所有的數據庫, 待驗證.
pypyodbc 主頁: http://code.google.com/p/pypyodbc/
==============================
#connnectionless執行和connnection執行
==============================
1. 直接使用engine執行sql的方式, 叫做connnectionless執行,
2. 先使用 engine.connect()獲取conn, 然后通過conn執行sql, 叫做connection執行
如果要在transaction模式下執行, 推薦使用connection方式, 如果不涉及transaction, 兩種方法效果是一樣的.
==============================
#sqlalchemy推薦使用text()函數封裝一下sql字符串
==============================
好處巨多:
1. 不同數據庫, 可以使用統一的sql參數傳遞寫法. 參數須以:號引出. 在調用execute()的時候, 使用dict結構將實參傳進去.
from sqlalchemy import text
result = db.execute(text('select * from table where id < :id and typeName=:type'), {'id': 2,'type':'USER_TABLE'})
2. 如果不指定parameter的類型, 默認為字符串類型; 如果要傳日期參數, 需要使用text()的bindparams參數來聲明
from sqlalchemy import DateTime
date_param=datetime.today()+timedelta(days=-1*10)
sql="delete from caw_job_alarm_log where alarm_time<:alarm_time_param"
t=text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
db.execute(t,{"alarm_time_param": date_param})
參數bindparam可以使用type_來指定參數的類型, 也可以使用 initial 值來指定參數類型
bindparam('alarm_time_param', type_=DateTime) #直接指定參數類型
bindparam('alarm_time_param', DateTime()) #使用初始值指定參數類型
3. 如要轉換查詢的結果中的數據類型, 可以通過text()的參數typemap參數指定. 這點比mybatis還靈活,
t = text("SELECT id, name FROM users",
typemap={
'id':Integer,
'name':Unicode
}
)
4. 還有其他, 詳見sqlalchemy\sql\expression.py中的docstring.
==============================
#sqlalchemy訪問數據庫的示例
==============================
#-----------------------------------
#獲取數據庫
#-----------------------------------
from sqlalchemy import create_engine
db=create_engine("sqlite:///:memory:", echo=True)
#-----------------------------------
#DDL
#-----------------------------------
db.execute("create table users(userid char(10), username char(50))")
#-----------------------------------
#DML
#-----------------------------------
resultProxy=db.execute("insert into users (userid,username) values('user1','tony')")
resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement
#-----------------------------------
#Query
#-----------------------------------
resultProxy=db.execute("select * from users")
resultProxy.close(), resultProxy 用完之后, 需要close
resultProxy.scalar(), 可以返回一個標量查詢的值
ResultProxy 類是對Cursor類的封裝(在文件sqlalchemy\engine\base.py),
ResultProxy 類有個屬性cursor即對應着原來的cursor.
ResultProxy 類有很多方法對應着Cursor類的方法, 另外有擴展了一些屬性/方法.
resultProxy.fetchall()
resultProxy.fetchmany()
resultProxy.fetchone()
resultProxy.first()
resultProxy.scalar()
resultProxy.returns_rows #True if this ResultProxy returns rows.
resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement. It is not intended to provide the number of rows present from a SELECT.
****遍歷ResultProxy時, 得到的每一個行都是RowProxy對象, 獲取字段的方法非常靈活, 下標和字段名甚至屬性都行. rowproxy[0] == rowproxy['id'] == rowproxy.id, 看得出 RowProxy 已經具備基本 POJO 類特性.
#-----------------------------------
#使用transaction
#-----------------------------------
#SqlAlchemy支持支持事務, 甚至事務可以嵌套. 缺省事務是自動提交,即執行一條SQL就自動提交。
#-如果更精准地控制事務, 最簡單的方法是使用 connection, 然后通過connection獲取transaction對象
connection = db.connect()
trans = connection.begin()
try:
dosomething(connection)
trans.commit()
except:
trans.rollback()
#-還有一種方式是,在創建engine時指定strategy='threadlocal'參數,這樣會自動創建一個線程局部的連接,對於后續的無連接的執行都會自動使用這個連接,這樣在處理事務時,只要使用 engine 對象來操作事務就行了。如:
#參見 http://hi.baidu.com/limodou/blog/item/83f4b2194e94604043a9ad9c.html
db = create_engine(connection, strategy='threadlocal')
db.begin()
try:
dosomething()
except:
db.rollback()
else:
db.commit()
#-缺省事務是自動提交,即執行一條SQL就自動提交. 也可以在connection和statement上通過execution_options()方法修改為手動commit模式
conn.execution_options(autocommit=False)
設置為手動提交模式后, 要提交, 需要調用conn.commit()
#-----------------------------------
#如何使用 pyDbRowFactory
#-----------------------------------
#pyDbRowFactory是我開發的一個通用RowFactory, 可以綁定cursor和你的 model pojo 類, 新版本的pyDbRowFactoryResultProxy. 下面示例是pyDbRowFactory的最基本用法
#方法1, 使用 cursor對象
cursor=resultProxy.cursor
from pyDbRowFactory import DbRowFactory
rowFactory=DbRowFactory(cursor, "your_module.your_row_class")
lst=factory.fetchAllRowObjects()
#方法2, 直接使用 resultProxy
from pyDbRowFactory import DbRowFactory
factory=DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, "your_module.your_row_class")
lst=factory.fetchAllRowObjects()
前面講過, SQLAlchemy使用 ResultProxy封裝了cursor, ResultProxy的每一個行記錄是一個RowProxy 類對象. RowProxy 使用起來非常方便, 對於查詢select userName from users,
每一個行結果都可以使用rowproxy來訪問, 寫法相當靈活. rowproxy.userName=rowproxy["userName"]==rowproxy[0], 所以有了RowProxy, 很多時候, 沒有必要再為每個表創建一個 model pojo 類.
#-----------------------------------
#連接池
#-----------------------------------
sqlalchemy 默認的連接池算法選用規則為:
1.連接內存中的sqlite, 默認的連接池算法為 SingletonThreadPool 類, 即每個線程允許一個連接
2.連接基於文件的sqlite, 默認的連接池算法為 NullPool 類, 即沒有連接池
3.對於其他情況, 默認的連接池算法為 QueuePool 類
當然, 我們也可以實現自己的連接池算法,
db = create_engine('sqlite:///file.db', poolclass=YourPoolClass)
create_engine()函數和連接池相關的參數有:
-pool_recycle, 默認為-1, 推薦設置為7200, 即如果connection空閑了7200秒, 自動重新獲取, 以防止connection被db server關閉.
-pool_size=5, 連接數大小,默認為5,正式環境該數值太小,需根據實際情況調大
-max_overflow=10, 超出pool_size后可允許的最大連接數,默認為10, 這10個連接在使用過后, 不放在pool中, 而是被真正關閉的.
-pool_timeout=30, 獲取連接的超時閾值, 默認為30秒
#-----------------------------------
#log輸出
#-----------------------------------
--如果只需在sys.stdout輸出, 用不着引用 logging 模塊就能實現
db = create_engine('sqlite:///file.db', echo=True)
--如果要在文件中輸出, log文件不具備rotate功能, 不推薦在生產環境中使用.
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
#-----------------------------------
#使用 sqlalchemy core 的最佳實踐
#-----------------------------------
我不太喜歡使用ORM方式, 主要是ORM學習成本比較高, 另外, 構建復雜的查詢也比較困難. 更多的時候是使用raw sql和sql expression方法.
1. declarative 是 SqlAlchemy的一個新的擴展, 只能用在 ORM 中, 不能用在SQL Expression中
2. 如果要使用ORM時, table必須有主鍵; 使用 raw sql和sql expression, 沒有這個約束.
使用心得:
1. 查詢不管是否復雜, 直接使用 raw sql; 增刪改多是單表操作, 使用sql expression 就足夠了.
2. 具體講, 對於增刪改, 比如一個User類, 可包含一個固定的 _table 的成員, _table=Table('users', metadata, autoload=True), 增刪改直接使用_table對象來完成.
3. 對於查詢, 若結果集能映射到一個實體對象, 使用pyDbRowFactory完成對象實例化. 若結果集涉及多個實體, 直接使用ResultProxy, ResultProxy的每一行對象也具有基本的對象特征, 多數情況下沒有必要再專門映射成一個特別的類.
4. 表之間關系的處理, 比如: users 表和 addresses 表, 存在 1:n的關系, 對應地, User 類也會有個AddressList的成員, 在實體化一個User對象后, 我們可立即查詢 addresses 表, 獲取該用戶的address列表, 分2步就可以完成這種1:n的關系映射.
使用 sqlalchemy 的寫法太靈活了, 下面僅僅是我喜歡的一種寫法, 僅僅從排版看, 就相當漂亮.
構建insert語句: _table.insert().values(f1=value1,f2=value2,)
構建update語句: _table.update().values(f1=newvalue1,f2=newvalue2).where(_table.c.f1==value1).where(_table.c.f2==value2)
構建delete語句: _table.delete().where(_table.c.f1==value1).where(_table.c.f2==value2)
批量insert/update/delete, 將每行數據組成一個dict, 再將這些dict組成一個list, 和_table.insert()/update()/delete()一起作為參數傳給 conn.execute().
conn.execute(_table.insert(), [
{’user_id’: 1, ’email_address’ : ’jack@yahoo.com’},
{’user_id’: 1, ’email_address’ : ’jack@msn.com’},
{’user_id’: 2, ’email_address’ : ’www@www.org’},
{’user_id’: 2, ’email_address’ : ’wendy@aol.com’},
])
sql expression 也可以像raw sql的text函數一樣使用bindparam, 方法是, 在調用insert()/update()/delete()時聲明參數, 然后在conn.execute()執行時候, 將實參傳進去.
d=_table.delete().where(_table.c.hiredate<=bindparam("hire_day",DateTime(), required=True))
conn.execute(d, {"hire_day":datetime.today()})
where()和ORM中的filter()接受的參數是一樣, 各種SQL條件都支持.
#equals:
where(_table.c.name == ’ed’)
#not equals:
where(_table.c.name != ’ed’)
#LIKE:
where(_table.c.name.like(’%ed%’))
#IN:
where(_table.c.name.in_([’ed’, ’wendy’, ’jack’]))
#NOT IN:
where(~_table.c.name.in_([’ed’, ’wendy’, ’jack’]))
#IS NULL:
where(_table.c.name == None)
#IS NOT NULL:
where(_table.c.name != None)
#AND:
from sqlalchemy import and_
where(and_(_table.c.name == ’ed’, _table.c.fullname == ’Ed Jones’))
#AND也可以通過多次調用where()來實現
where(_table.c.name == ’ed’).where(_table.c.fullname == ’Ed Jones’)
#OR:
from sqlalchemy import or_
where(or_(_table.c.name == ’ed’, _table.c.name == ’wendy’))
#match: The contents of the match parameter are database backend specific.
where(_table.c.name.match(’wendy’))
--==========================
--python file: mydatabase.py
--==========================
from sqlalchemy import create_engine
from sqlalchemy.schema import MetaData
#db = create_engine('sqlite:///:memory:', echo=True)
db = create_engine('sqlite:///c://caw.sqlite.db', echo=True)
metadata = MetaData(bind=db)
--==========================
--python file: dal.py
--==========================
from sqlalchemy.sql.expression import text, bindparam
from sqlalchemy.sql import select,insert, delete, update
from sqlalchemy.schema import Table
from mydatabase import db,metadata
from pyDbRowFactory import DbRowFactory
class caw_job(object):
FULL_NAME="dal.caw_job"
tablename="caw_job"
_table=Table(tablename, metadata, autoload=True)
def __init__(self):
self.app_domain =None
self.job_code =None
self.job_group =None
self.cron_year =None
self.cron_month =None
self.cron_day =None
self.cron_week =None
self.cron_day_of_week =None
self.cron_hour =None
self.cron_minute =None
self.description =None
@classmethod
def getEntity(cls, app_domain, jobCode):
sql="select * from caw_job where app_domain=:app_domain and job_code=:job_code";
resultProxy=db.execute(text(sql),{'app_domain':app_domain,
'job_code':jobCode})
DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, cls.FULL_NAME)
return DbRowFactory.fetchOneRowObject()
def insert(self):
i=self._table.insert().values(
app_domain =self.app_domain ,
job_code =self.job_code ,
job_group =self.job_group ,
cron_year =self.cron_year ,
cron_month =self.cron_month ,
cron_day =self.cron_day ,
cron_week =self.cron_week ,
cron_day_of_week=self.cron_day_of_week,
cron_hour =self.cron_hour ,
cron_minute =self.cron_minute ,
description =self.description ,
)
db.execute(i)
def update(self):
u=self._table.update().values(
app_domain =self.app_domain ,
job_code =self.job_code ,
job_group =self.job_group ,
cron_year =self.cron_year ,
cron_month =self.cron_month ,
cron_day =self.cron_day ,
cron_week =self.cron_week ,
cron_day_of_week =self.cron_day_of_week ,
cron_hour =self.cron_hour ,
cron_minute =self.cron_minute ,
description =self.description , ,
).where(self._table.c.app_domain==self.app_domain)\
.where(self._table.c.job_code==self.job_code)
db.execute(u)
def delete(self):
d=self._table.delete().where(self._table.c.app_domain==self.app_domain)\
.where(self._table.c.job_code==self.job_code)
db.execute(d)
#-----------------------------------
#使用sqlalchemy.ext.declarative 來生成表, 所有的表都必須有主鍵.
#在系統初期, 數據模型往往需要經常調整, 使用這種方式修改表結構更方便些.
#-----------------------------------
--python file: models.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String , Boolean, DateTime, Float
engine = create_engine('sqlite:///:memory:', echo=True)
Base = declarative_base()
# ddl_caw_job 是專門用來生成數據庫對象的, 沒有其他用處
class ddl_caw_job(Base):
__tablename__="caw_job"
job_name =Column(String(200), primary_key=True)
job_group =Column(String(200))
def init_db():
Base.metadata.create_all(bind=engine,)