python使用MySQL主要有兩個模塊,pymysql(MySQLdb)和SQLAchemy。
pymysql(MySQLdb)為原生模塊,直接執行sql語句,其中pymysql模塊支持python 2和python3,MySQLdb只支持python2,兩者使用起來幾乎一樣。
SQLAchemy為一個ORM框架,將數據對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果
另外DBUtils模塊提供了一個數據庫連接池,方便多線程場景中python操作數據庫。
1.pymysql模塊
安裝:pip install pymysql
創建表格操作: (注意中文格式設置)
#coding:utf-8 import pymysql #關於中文問題 #1. mysql命令行創建數據庫,設置編碼為gbk:create databse demo2 character set utf8; #2. python代碼中連接時設置charset="gbk" #3. 創建表格時設置default charset=utf8 #連接數據庫 conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服務端設置格式一樣(還可設置為gbk, gb2312) #創建游標 cursor = conn.cursor() #執行sql語句 cursor.execute("""create table if not exists t_sales( id int primary key auto_increment not null, nickName varchar(128) not null, color varchar(128) not null, size varchar(128) not null, comment text not null, saledate varchar(128) not null)engine=InnoDB default charset=utf8;""") # cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) # values('%s','%s','%s','%s','%s');""" % ("zack", "黑色", "L", "大小合適", "2019-04-20")) cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);""" , ("zack", "黑色", "L", "大小合適", "2019-04-20")) #提交 conn.commit() #關閉游標 cursor.close() #關閉連接 conn.close() 創建表格操作
增刪改查:
注意execute執行sql語句參數的兩種情況:
execute( "insert into t_sales(nickName, size) values('%s','%s');" % ("zack","L") ) #此時的%s為字符竄拼接占位符,需要引號加'%s' (有sql注入風險)
execute( "insert into t_sales(nickName, size) values(%s,%s);" , ("zack","L") ) #此時的%s為sql語句占位符,不需要引號%s
#***************************增刪改查****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服務端設置格式一樣(還可設置為gbk, gb2312) #創建游標 cursor = conn.cursor() insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);" #返回受影響的行數 row1 = cursor.execute(insert_sql,("Bob", "黑色", "XL", "便宜實惠", "2019-04-20")) update_sql = "update t_sales set color='白色' where id=%s;" #返回受影響的行數 row2 = cursor.execute(update_sql,(1,)) select_sql = "select * from t_sales where id>%s;" #返回受影響的行數 row3 = cursor.execute(select_sql,(1,)) delete_sql = "delete from t_sales where id=%s;" #返回受影響的行數 row4 = cursor.execute(delete_sql,(4,)) #提交,不然無法保存新建或者修改的數據(增刪改得提交) conn.commit() cursor.close() conn.close() 增刪改查語句
批量插入和自增id:
#***************************批量插入****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服務端設置格式一樣(還可設置為gbk, gb2312) #創建游標 cursor = conn.cursor() insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);" data = [("Bob", "黑色", "XL", "便宜實惠", "2019-04-20"),("Ted", "黃色", "M", "便宜實惠", "2019-04-20"),("Gary", "黑色", "M", "穿着舒服", "2019-04-20")] row1 = cursor.executemany(insert_sql, data) conn.commit() #為插入的第一條數據的id,即插入的為5,6,7,new_id=5 new_id = cursor.lastrowid print(new_id) cursor.close() conn.close() 批量插入
獲取查詢數據:
#***************************獲取查找sql的查詢數據****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服務端設置格式一樣(還可設置為gbk, gb2312) #創建游標 cursor = conn.cursor() select_sql = "select id,nickname,size from t_sales where id>%s;" cursor.execute(select_sql, (3,)) row1 = cursor.fetchone() #獲取第一條數據,獲取后游標會向下移動一行 row_n = cursor.fetchmany(3) #獲取前n條數據,獲取后游標會向下移動n行 row_all = cursor.fetchall() #獲取所有數據,獲取后游標會向下移動到末尾 print(row1) print(row_n) print(row_all) #conn.commit() cursor.close() conn.close() 查詢數據獲取
注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:
- cursor.scroll(1,mode='relative') # 相對當前位置移動
- cursor.scroll(2,mode='absolute') # 相對絕對位置移動
fetch獲取數據類型
fetch獲取的數據默認為元組格式,還可以獲取字典類型的,如下:
#***************************獲取字典格式數據****************************************************** conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服務端設置格式一樣(還可設置為gbk, gb2312) #創建游標 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) select_sql = "select id,nickname,size from t_sales where id>%s;" cursor.execute(select_sql, (3,)) row1 = cursor.fetchall() print(row1) conn.commit() cursor.close() conn.close()
2.SQLAlchmy框架
SQLAlchemy的整體架構如下,建立在第三方的DB API上,將類和對象操作轉換為數據庫sql,然后利用DB API執sql語句得到結果。其適用於多種數據庫。另外其內部實現了數據庫連接池,方便進行多線程操作。
- Engine,框架的引擎
- Connection Pooling ,數據庫連接池
- Dialect,選擇連接數據庫的DB API種類,(pymysql,mysqldb等)
- Schema/Types,架構和類型
- SQL Exprression Language,SQL表達式語言
- DB API: Python Database API Specification
2.1 執行原生sql
安裝:pip install sqlalchemy
SQLAlchmy也可以不利用ORM,使用數據庫連接池,類似pymysql模塊執行原生sql
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, String, Integer import threading engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8", max_overflow = 0, #超過連接池大小外最多創建的連接,為0表示超過5個連接后,其他連接請求會阻塞 (默認為10) pool_size = 5, #連接池大小(默認為5) pool_timeout = 30, #連接線程池中,沒有連接時最多等待的時間,不設置無連接時直接報錯 (默認為30) pool_recycle = -1) #多久之后對線程池中的線程進行一次連接的回收(重置) (默認為-1) # def task(): # conn= engine.raw_connection() #建立原生連接,和pymysql的連接一樣 # cur = conn.cursor() # cur.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # cur.close() # conn.close() # print(result) # def task(): # conn = engine.contextual_connect() #建立上下文管理器連接,自動打開和關閉 # with conn: # cur = conn.execute("select * from t_sales where id>%s",(2,)) # result = cur.fetchone() # print(result) def task(): cur =engine.execute("select * from t_sales where id>%s",(2,)) #engine直接執行 result = cur.fetchone() cur.close() print(result) if __name__=="__main__": for i in range(10): t = threading.Thread(target=task) t.start() 三種方式執行原生sql
create_engine()方法的所有可選參數見下面源碼:
def create_engine(*args, **kwargs): """Create a new :class:`.Engine` instance. The standard calling form is to send the URL as the first positional argument, usually a string that indicates database dialect and connection arguments:: engine = create_engine("postgresql://scott:tiger@localhost/test") Additional keyword arguments may then follow it which establish various options on the resulting :class:`.Engine` and its underlying :class:`.Dialect` and :class:`.Pool` constructs:: engine = create_engine("mysql://scott:tiger@hostname/dbname", encoding='latin1', echo=True) The string form of the URL is ``dialect[+driver]://user:password@host/dbname[?key=value..]``, where ``dialect`` is a database name such as ``mysql``, ``oracle``, ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc. Alternatively, the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`. ``**kwargs`` takes a wide variety of options which are routed towards their appropriate components. Arguments may be specific to the :class:`.Engine`, the underlying :class:`.Dialect`, as well as the :class:`.Pool`. Specific dialects also accept keyword arguments that are unique to that dialect. Here, we describe the parameters that are common to most :func:`.create_engine()` usage. Once established, the newly resulting :class:`.Engine` will request a connection from the underlying :class:`.Pool` once :meth:`.Engine.connect` is called, or a method which depends on it such as :meth:`.Engine.execute` is invoked. The :class:`.Pool` in turn will establish the first actual DBAPI connection when this request is received. The :func:`.create_engine` call itself does **not** establish any actual DBAPI connections directly. .. seealso:: :doc:`/core/engines` :doc:`/dialects/index` :ref:`connections_toplevel` :param case_sensitive=True: if False, result column names will match in a case-insensitive fashion, that is, ``row['SomeColumn']``. .. versionchanged:: 0.8 By default, result row names match case-sensitively. In version 0.7 and prior, all matches were case-insensitive. :param connect_args: a dictionary of options which will be passed directly to the DBAPI's ``connect()`` method as additional keyword arguments. See the example at :ref:`custom_dbapi_args`. :param convert_unicode=False: if set to True, sets the default behavior of ``convert_unicode`` on the :class:`.String` type to ``True``, regardless of a setting of ``False`` on an individual :class:`.String` type, thus causing all :class:`.String` -based columns to accommodate Python ``unicode`` objects. This flag is useful as an engine-wide setting when using a DBAPI that does not natively support Python ``unicode`` objects and raises an error when one is received (such as pyodbc with FreeTDS). See :class:`.String` for further details on what this flag indicates. :param creator: a callable which returns a DBAPI connection. This creation function will be passed to the underlying connection pool and will be used to create all new database connections. Usage of this function causes connection parameters specified in the URL argument to be bypassed. :param echo=False: if True, the Engine will log all statements as well as a repr() of their parameter lists to the engines logger, which defaults to sys.stdout. The ``echo`` attribute of ``Engine`` can be modified at any time to turn logging on and off. If set to the string ``"debug"``, result rows will be printed to the standard output as well. This flag ultimately controls a Python logger; see :ref:`dbengine_logging` for information on how to configure logging directly. :param echo_pool=False: if True, the connection pool will log all checkouts/checkins to the logging stream, which defaults to sys.stdout. This flag ultimately controls a Python logger; see :ref:`dbengine_logging` for information on how to configure logging directly. :param empty_in_strategy: The SQL compilation strategy to use when rendering an IN or NOT IN expression for :meth:`.ColumnOperators.in_` where the right-hand side is an empty set. This is a string value that may be one of ``static``, ``dynamic``, or ``dynamic_warn``. The ``static`` strategy is the default, and an IN comparison to an empty set will generate a simple false expression "1 != 1". The ``dynamic`` strategy behaves like that of SQLAlchemy 1.1 and earlier, emitting a false expression of the form "expr != expr", which has the effect of evaluting to NULL in the case of a null expression. ``dynamic_warn`` is the same as ``dynamic``, however also emits a warning when an empty set is encountered; this because the "dynamic" comparison is typically poorly performing on most databases. .. versionadded:: 1.2 Added the ``empty_in_strategy`` setting and additionally defaulted the behavior for empty-set IN comparisons to a static boolean expression. :param encoding: Defaults to ``utf-8``. This is the string encoding used by SQLAlchemy for string encode/decode operations which occur within SQLAlchemy, **outside of the DBAPI.** Most modern DBAPIs feature some degree of direct support for Python ``unicode`` objects, what you see in Python 2 as a string of the form ``u'some string'``. For those scenarios where the DBAPI is detected as not supporting a Python ``unicode`` object, this encoding is used to determine the source/destination encoding. It is **not used** for those cases where the DBAPI handles unicode directly. To properly configure a system to accommodate Python ``unicode`` objects, the DBAPI should be configured to handle unicode to the greatest degree as is appropriate - see the notes on unicode pertaining to the specific target database in use at :ref:`dialect_toplevel`. Areas where string encoding may need to be accommodated outside of the DBAPI include zero or more of: * the values passed to bound parameters, corresponding to the :class:`.Unicode` type or the :class:`.String` type when ``convert_unicode`` is ``True``; * the values returned in result set columns corresponding to the :class:`.Unicode` type or the :class:`.String` type when ``convert_unicode`` is ``True``; * the string SQL statement passed to the DBAPI's ``cursor.execute()`` method; * the string names of the keys in the bound parameter dictionary passed to the DBAPI's ``cursor.execute()`` as well as ``cursor.setinputsizes()`` methods; * the string column names retrieved from the DBAPI's ``cursor.description`` attribute. When using Python 3, the DBAPI is required to support *all* of the above values as Python ``unicode`` objects, which in Python 3 are just known as ``str``. In Python 2, the DBAPI does not specify unicode behavior at all, so SQLAlchemy must make decisions for each of the above values on a per-DBAPI basis - implementations are completely inconsistent in their behavior. :param execution_options: Dictionary execution options which will be applied to all connections. See :meth:`~sqlalchemy.engine.Connection.execution_options` :param implicit_returning=True: When ``True``, a RETURNING- compatible construct, if available, will be used to fetch newly generated primary key values when a single row INSERT statement is emitted with no existing returning() clause. This applies to those backends which support RETURNING or a compatible construct, including PostgreSQL, Firebird, Oracle, Microsoft SQL Server. Set this to ``False`` to disable the automatic usage of RETURNING. :param isolation_level: this string parameter is interpreted by various dialects in order to affect the transaction isolation level of the database connection. The parameter essentially accepts some subset of these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``, ``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``. Behavior here varies per backend, and individual dialects should be consulted directly. Note that the isolation level can also be set on a per-:class:`.Connection` basis as well, using the :paramref:`.Connection.execution_options.isolation_level` feature. .. seealso:: :attr:`.Connection.default_isolation_level` - view default level :paramref:`.Connection.execution_options.isolation_level` - set per :class:`.Connection` isolation level :ref:`SQLite Transaction Isolation <sqlite_isolation_level>` :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>` :ref:`MySQL Transaction Isolation <mysql_isolation_level>` :ref:`session_transaction_isolation` - for the ORM :param label_length=None: optional integer value which limits the size of dynamically generated column labels to that many characters. If less than 6, labels are generated as "_(counter)". If ``None``, the value of ``dialect.max_identifier_length`` is used instead. :param listeners: A list of one or more :class:`~sqlalchemy.interfaces.PoolListener` objects which will receive connection pool events. :param logging_name: String identifier which will be used within the "name" field of logging records generated within the "sqlalchemy.engine" logger. Defaults to a hexstring of the object's id. :param max_overflow=10: the number of connections to allow in connection pool "overflow", that is connections that can be opened above and beyond the pool_size setting, which defaults to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`. :param module=None: reference to a Python module object (the module itself, not its string name). Specifies an alternate DBAPI module to be used by the engine's dialect. Each sub-dialect references a specific DBAPI which will be imported before first connect. This parameter causes the import to be bypassed, and the given module to be used instead. Can be used for testing of DBAPIs as well as to inject "mock" DBAPI implementations into the :class:`.Engine`. :param paramstyle=None: The `paramstyle <http://legacy.python.org/dev/peps/pep-0249/#paramstyle>`_ to use when rendering bound parameters. This style defaults to the one recommended by the DBAPI itself, which is retrieved from the ``.paramstyle`` attribute of the DBAPI. However, most DBAPIs accept more than one paramstyle, and in particular it may be desirable to change a "named" paramstyle into a "positional" one, or vice versa. When this attribute is passed, it should be one of the values ``"qmark"``, ``"numeric"``, ``"named"``, ``"format"`` or ``"pyformat"``, and should correspond to a parameter style known to be supported by the DBAPI in use. :param pool=None: an already-constructed instance of :class:`~sqlalchemy.pool.Pool`, such as a :class:`~sqlalchemy.pool.QueuePool` instance. If non-None, this pool will be used directly as the underlying connection pool for the engine, bypassing whatever connection parameters are present in the URL argument. For information on constructing connection pools manually, see :ref:`pooling_toplevel`. :param poolclass=None: a :class:`~sqlalchemy.pool.Pool` subclass, which will be used to create a connection pool instance using the connection parameters given in the URL. Note this differs from ``pool`` in that you don't actually instantiate the pool in this case, you just indicate what type of pool to be used. :param pool_logging_name: String identifier which will be used within the "name" field of logging records generated within the "sqlalchemy.pool" logger. Defaults to a hexstring of the object's id. :param pool_pre_ping: boolean, if True will enable the connection pool "pre-ping" feature that tests connections for liveness upon each checkout. .. versionadded:: 1.2 .. seealso:: :ref:`pool_disconnects_pessimistic` :param pool_size=5: the number of connections to keep open inside the connection pool. This used with :class:`~sqlalchemy.pool.QueuePool` as well as :class:`~sqlalchemy.pool.SingletonThreadPool`. With :class:`~sqlalchemy.pool.QueuePool`, a ``pool_size`` setting of 0 indicates no limit; to disable pooling, set ``poolclass`` to :class:`~sqlalchemy.pool.NullPool` instead. :param pool_recycle=-1: this setting causes the pool to recycle connections after the given number of seconds has passed. It defaults to -1, or no timeout. For example, setting to 3600 means connections will be recycled after one hour. Note that MySQL in particular will disconnect automatically if no activity is detected on a connection for eight hours (although this is configurable with the MySQLDB connection itself and the server configuration as well). .. seealso:: :ref:`pool_setting_recycle` :param pool_reset_on_return='rollback': set the :paramref:`.Pool.reset_on_return` parameter of the underlying :class:`.Pool` object, which can be set to the values ``"rollback"``, ``"commit"``, or ``None``. .. seealso:: :paramref:`.Pool.reset_on_return` :param pool_timeout=30: number of seconds to wait before giving up on getting a connection from the pool. This is only used with :class:`~sqlalchemy.pool.QueuePool`. :param plugins: string list of plugin names to load. See :class:`.CreateEnginePlugin` for background. .. versionadded:: 1.2.3 :param strategy='plain': selects alternate engine implementations. Currently available are: * the ``threadlocal`` strategy, which is described in :ref:`threadlocal_strategy`; * the ``mock`` strategy, which dispatches all statement execution to a function passed as the argument ``executor``. See `example in the FAQ <http://docs.sqlalchemy.org/en/latest/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string>`_. :param executor=None: a function taking arguments ``(sql, *multiparams, **params)``, to which the ``mock`` strategy will dispatch all statement execution. Used only by ``strategy='mock'``. """ strategy = kwargs.pop('strategy', default_strategy) strategy = strategies.strategies[strategy] return strategy.create(*args, **kwargs) create_engine源碼
2.2 執行ORM語句
A. 創建和刪除表
#coding:utf-8 import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, String, Integer, DateTime, Text Base = declarative_base() class User(Base): __tablename__="users" id = Column(Integer,primary_key=True) name = Column(String(32),index=True, nullable=False) #創建索引,不為空 email = Column(String(32),unique=True) ctime = Column(DateTime, default = datetime.datetime.now) #傳入方法名datetime.datetime.now extra = Column(Text,nullable=True) __table_args__ = { # UniqueConstraint('id', 'name', name='uix_id_name'), #設置聯合唯一約束 # Index('ix_id_name', 'name', 'email'), # 創建索引 } def create_tbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.create_all(engine) #創建所有定義的表 def drop_dbs(): engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5) Base.metadata.drop_all(engine) #刪除所有創建的表 if __name__=="__main__": create_tbs() #創建表 #drop_dbs() #刪除表 創建和刪除表格
B.表中定義外鍵關系(一對多,多對多)
(思考:下面代碼中的一對多關系,relationship定義在了customer表中,應該定義在PurchaseOrder更合理?)
(注意:mysql數據庫中避免使用order做為表的名字,order為一個mysql關鍵字,做為表名字時必須用反引號`order` (鍵盤數字1旁邊的符號))
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float from sqlalchemy.orm import relationship import datetime engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") #數據庫有密碼時,//root:12345678@127.0.0.1:3306/ Base = declarative_base() class Customer(Base): __tablename__="customer" #數據庫中保存的表名字 id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False) phone = Column(String(16),nullable=False) address = Column(String(256),nullable=False) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) #關鍵關系,關聯表的__tablename__="purchase_order" # 和建立表結構無關,方便外鍵關系查詢,backref反向查詢時使用order_obj.customer purchase_order = relationship("PurchaseOrder",backref="customer") class PurchaseOrder(Base): __tablename__ = "purchase_order" #mysql數據庫中表的名字避免使用order,order為一個關鍵字,使用時必須用反引號`order` (鍵盤數字1旁邊的符號) id=Column(Integer,primary_key=True) cost = Column(Float,nullable=True) ctime = Column(DateTime,default =datetime.datetime.now) desc = Column(String(528)) #多對多關系時,secondary為中間表 product = relationship("Product",secondary="order_to_product",backref="purchase_order") class Product(Base): __tablename__ = "product" id = Column(Integer,primary_key=True) name = Column(String(256)) price = Column(Float,nullable=False) class OrdertoProduct(Base): __tablename__ = "order_to_product" id = Column(Integer,primary_key=True) product_id = Column(Integer,ForeignKey("product.id")) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) if __name__ == "__main__": Base.metadata.create_all(engine) #Base.metadata.drop_all(engine) 外鍵關系
C.增刪改查操作
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float from sqlalchemy.orm import relationship,sessionmaker from sqlalchemy.sql import text import datetime engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") #數據庫有密碼時,//root:12345678@127.0.0.1:3306/, 設置utf8防止中文亂碼 Base = declarative_base() class Customer(Base): __tablename__="customer" #數據庫中保存的表名字 id = Column(Integer,primary_key=True) name = Column(String(64),index=True,nullable=False) phone = Column(String(16),nullable=False) address = Column(String(256),nullable=False) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) #關鍵關系,關聯表的__tablename__="purchase_order" # 和建立表結構無關,方便外鍵關系查詢,backref反向查詢時使用order_obj.customer purchase_order = relationship("PurchaseOrder",backref="customer") class PurchaseOrder(Base): __tablename__ = "purchase_order" #mysql數據庫中表的名字避免使用order,order為一個關鍵字,使用時必須用反引號`order` (鍵盤數字1旁邊的符號) id=Column(Integer,primary_key=True) cost = Column(Float,nullable=True) ctime = Column(DateTime,default =datetime.datetime.now) desc = Column(String(528)) #多對多關系時,secondary為中間表 product = relationship("Product",secondary="order_to_product",backref="purchase_order") class Product(Base): __tablename__ = "product" id = Column(Integer,primary_key=True) name = Column(String(256)) price = Column(Float,nullable=False) class OrdertoProduct(Base): __tablename__ = "order_to_product" id = Column(Integer,primary_key=True) product_id = Column(Integer,ForeignKey("product.id")) purchase_order_id = Column(Integer,ForeignKey("purchase_order.id")) if __name__ == "__main__": #Base.metadata.create_all(engine) #Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) #每次進行數據庫操作時都要創建session session = Session() #*****************增加數據******************** # pur_order = PurchaseOrder(cost=19.7,desc="python編程之路") # session.add(pur_order) # session.add_all( # [PurchaseOrder(cost=39.7,desc="linux操作系統"), # PurchaseOrder(cost=59.6,desc="python cookbook")]) # session.commit() #*****************修改數據******************** #session.query(PurchaseOrder).filter(PurchaseOrder.id>2).update({"cost":29.7}) #session.query(PurchaseOrder).filter(PurchaseOrder.id==2).update({"cost":PurchaseOrder.cost+40.1},synchronize_session=False) #synchronize_session用於query在進行delete or update操作時,對session的同步策略。 #session.commit() #*****************刪除數據******************** #session.query(PurchaseOrder).filter(PurchaseOrder.id==1).delete() #session.commit() #*****************查詢數據******************** #ret = session.query(PurchaseOrder).all() # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).all() #包含對象的列表 # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).first() #單個對象 # ret = session.query(PurchaseOrder).filter_by(id=2).all() #通過列名字的表達式 # ret = session.query(PurchaseOrder).filter_by(id=2).first() #ret = session.query(PurchaseOrder).filter(text("id<:value and cost>:price")).params(value=6,price=15).order_by(PurchaseOrder.cost).all() #ret = session.query(PurchaseOrder).from_statement(text("SELECT * FROM purchase_order WHERE cost>:price")).params(price=40).all() # print ret # for i in ret: # print i.id, i.cost, i.ctime,i.desc #ret2 = session.query(PurchaseOrder.id,PurchaseOrder.cost.label('totalcost')).all() #只查詢兩列,ret2為列表 #print ret2 #關閉session session.close() 增刪改查
查詢語句
# 條件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() 查詢語句
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder #導入定義的PurchaseOrder表格類 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() #查詢 ret = session.execute("select * from purchase_order where id=:value",params={"value":3}) print ret for i in ret: print i.id, i.cost, i.ctime,i.desc #插入 purchase_order = PurchaseOrder.__table__ #拿到PurchaseOrder表格對象 ret=session.execute(purchase_order.insert(), [{"cost":46.3,"desc":'python2'}, {"cost":43.3,"desc":'python3'}]) session.commit() print(ret.lastrowid) session.close() # 關聯子查詢 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ 補充
D.多線程操作
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy_orm2 import Product from threading import Thread engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=0,pool_size=5) Session = sessionmaker(bind=engine) def task(name,price): session = Session() pro = Product(name=name,price=price) session.add(pro) session.commit() session.close() if __name__=="__main__": for i in range(6): t = Thread(target=task,args=("pro"+str(i),i*5)) t.start() 多線程
E. 通過relationship操縱一對多和多對多關系
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer #導入定義的表格類 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() # #通過定義的關鍵關系添加(id值) # cus1 = Customer(name="zack",phone="13567682333",address="Nanjing",purchase_order_id=3) # session.add(cus1) # #通過relationship正向添加 # cus2 = Customer(name="zack2",phone="13567682333",address="Nanjing",purchase_order=PurchaseOrder(cost=53,desc="java")) # session.add(cus2) # session.commit() #通過relationship反向添加 # purchase_order=PurchaseOrder(cost=53,desc="php") # cus3 = Customer(name="zack3",phone="13567682333",address="Nanjing") # cus4 = Customer(name="zack4",phone="13567682333",address="Nanjing") # purchase_order.customer=[cus3,cus4] #cus3,cus4的purchase_order_id都是purchase_order.id值,即同時添加了兩組外鍵關系 # session.add(purchase_order) # session.commit() ##通過relationship正向查詢 cus = session.query(Customer).first() print(cus.purchase_order_id) print(cus.purchase_order.desc) #通過relationship反向查詢 pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==3).first() print(pur.desc) print(pur.customer) #返回一個list 一對多
#coding:utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text, func from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer #導入定義的表格類 engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8") Session = sessionmaker(bind=engine) session = Session() # session.add_all([Product(name="java",price=24), # Product(name="python",price=34), # Product(name="php",price=27)]) # session.commit() # #通過定義的關鍵關系添加(id值) # op = OrdertoProduct(product_id=1,purchase_order_id=16) # session.add(op) # session.commit() # #通過relationship添加 # pur = PurchaseOrder(cost=27,desc="xxxx") # pur.product = [Product(name="C++",price=60),] #正向 # session.add(pur) # pro = Product(name="C",price=40) # pro.purchase_order=[PurchaseOrder(cost=27,desc="xxxx"),] #反向 # session.add(pro) # session.commit() #通過relationship正向查詢 pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==19).first() print(pur.desc) print(pur.product) #結果為列表 #通過relationship反向查詢 pro = session.query(Product).filter(Product.id==5).first() print(pro.name) print(pro.purchase_order) #結果為列表 session.close() 多對多
3.數據庫連接池
對於ORM框架,其內部維護了鏈接池,可以直接通過多線程操控數據庫。對於pymysql模塊,通過多線程操控數據庫容易出錯,得加鎖串行執行。進行並發時,可以利用DBUtils模塊來維護數據庫連接池。
3.1 多線程操控pymysql
不采用DBUtils連接池時, pymysql多線程代碼如下:
import pymysql import threadind #**************************無連接池******************************* #每個線程都要創立一次連接,線程並發操作間可能有問題? def func(): conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() 每個線程創建連接
#**************************無連接池******************************* #創建一個連接,加鎖串行執行 from threading import Lock import pymysql import threading conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8") lock = Lock() def func(): with lock: cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() #conn.close()不能在線程中關閉連接,否則其他線程不可用了 if __name__=="__main__": threads = [] for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) threads.append(t) t.start() for t in threads: t.join() conn.close() 一個連接串行執行
3.2 DBUtils連接池
DBUtils連接池有兩種連接模式:PersistentDB和PooledDB (官網文檔:https://cito.github.io/DBUtils/UsersGuide.html)
模式一(DBUtils.PersistentDB):為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉。
PersistentDB使用代碼如下:
#coding:utf-8 from DBUtils.PersistentDB import PersistentDB import pymysql import threading pool = PersistentDB( creator = pymysql, # 使用鏈接數據庫的模塊 maxusage = None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable = False, # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接) threadlocal = None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置 host="127.0.0.1", port = 3306, user = "root", password="", database="learningsql", charset = "utf8" ) def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) cursor.close() conn.close() if __name__ == "__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() PersistentDB 使用
模式二(DBUtils.PooledDB):創建一批連接到連接池,供所有線程共享使用。
(由於pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享。)
PooledDB使用代碼如下:
from DBUtils.PooledDB import PooledDB import pymysql import threading import time pool = PooledDB( creator = pymysql, # 使用鏈接數據庫的模塊 maxconnections = 6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached = 2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached = 5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared = 3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking = True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage = None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession = [], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping = 0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host="127.0.0.1", port = 3306, user="root", password="", database = "learningsql", charset = "utf8" ) def func(): conn = pool.connection() cursor = conn.cursor() cursor.execute("select * from user where nid>1;") result = cursor.fetchone() print(result) time.sleep(5) #為了查看mysql端的線程數量 cursor.close() conn.close() if __name__=="__main__": for i in range(5): t = threading.Thread(target=func,name="thread-%s"%i) t.start() PooledDB使用
上述代碼中加入了sleep(5)使線程連接數據庫時間延長,方便查看mysql數據庫連接線程情況,下圖分別為代碼執行中和執行后的線程連接情況,可以發現,代碼執行時,同時有6個線程連接上了數據庫(有一個為mysql命令客戶端),代碼執行后,只有一個線程連接數據庫,但仍有5個線程等待連接。
(show status like "Threads%" 查看線程連接情況)