SQLAlchemy
SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用對象關系映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。
pip3 install sqlalchemy
Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html
步驟一:
使用 Engine/ConnectionPooling/Dialect 進行數據庫操作,Engine使用ConnectionPooling連接數據庫,然后再通過Dialect執行SQL語句。
from` `sqlalchemy ``import` `create_engine
engine ``=` `create_engine(``"mysql+mysqldb://root:123@127.0.0.1:3306/s11"``, max_overflow``=``5``)
engine.execute(
``"INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
)
engine.execute(
``"INSERT INTO ts_test (a, b) VALUES (%s, %s)"``,
``((``555``, ``"v1"``),(``666``, ``"v1"``),)
)
engine.execute(
``"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)"``,
``id``=``999``, name``=``"v1"
)
result ``=` `engine.execute(``'select * from ts_test'``)
result.fetchall()
事務操作
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
# 事務操作
with engine.begin() as conn:
conn.execute("insert into table (x, y, z) values (1, 2, 3)")
conn.execute("my_special_procedure(5)")
conn = engine.connect()
# 事務操作
with conn.begin():
conn.execute("some statement", {'x':5, 'y':10})
注:查看數據庫連接:show status like 'Threads%';
步驟二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect
進行數據庫操作。Engine使用Schema Type創建一個特定的結構對象,之后通過SQL Expression Language
將該對象轉換成SQL語句,然后通過 ConnectionPooling
連接數據庫,再然后通過 Dialect
執行 SQL
,並獲取結果。
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
增刪改查
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
conn = engine.connect()
# 創建SQL語句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{'id':7,'name':'seven'})
conn.close()
# sql = user.insert().values(id=123, name='wu')
# conn.execute(sql)
# conn.close()
# sql = user.delete().where(user.c.id > 1)
# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == 'jack').values(name='ed')
# sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name)
# result = conn.execute(sql)
# print result.fetchall()
# conn.close()
更多內容詳見:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy無法修改表結構,如果需要可以使用SQLAlchemy開發者開源的另外一個軟件Alembic
來完成。
步驟三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有組件對數據進行操作。根據類創建對象,對象轉換成SQL,執行SQL語句。
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
# 尋找Base的所有子類,按照子類的結構在數據庫中生成對應的數據表信息
# Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# ########## 增 ##########
# u = User(id=2, name='sb')
# session.add(u)
# session.add_all([
# User(id=3, name='sb'),
# User(id=4, name='sb')
# ])
# session.commit()
# ########## 刪除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()
# ret = session.query(User).filter_by(name='sb').all()
# print ret
# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret
# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()