python操作MySQL數據庫的三個模塊


  python使用MySQL主要有兩個模塊,pymysql(MySQLdb)和SQLAchemy。

    pymysql(MySQLdb)為原生模塊,直接執行sql語句,其中pymysql模塊支持python 2和python3,MySQLdb只支持python2,兩者使用起來幾乎一樣。

    SQLAchemy為一個ORM框架,將數據對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果

  另外DBUtils模塊提供了一個數據庫連接池,方便多線程場景中python操作數據庫。

1.pymysql模塊

  安裝:pip install pymysql

  創建表格操作: (注意中文格式設置)

#coding:utf-8
import pymysql

#關於中文問題
#1. mysql命令行創建數據庫,設置編碼為gbk:create databse demo2 character set utf8; 
#2. python代碼中連接時設置charset="gbk"
#3. 創建表格時設置default charset=utf8

#連接數據庫
conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服務端設置格式一樣(還可設置為gbk, gb2312)
#創建游標
cursor = conn.cursor()
#執行sql語句
cursor.execute("""create table if not exists t_sales(
                id int primary key auto_increment not null,
                 nickName varchar(128) not null,
                 color varchar(128) not null,
                  size varchar(128) not null, 
                  comment text not null,
                  saledate varchar(128) not null)engine=InnoDB default charset=utf8;""")
                  
# cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) 
                # values('%s','%s','%s','%s','%s');""" % ("zack", "黑色", "L", "大小合適", "2019-04-20"))
                
cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) 
                values(%s,%s,%s,%s,%s);""" , ("zack", "黑色", "L", "大小合適", "2019-04-20"))
#提交
conn.commit()
#關閉游標
cursor.close()
#關閉連接
conn.close()

創建表格操作

增刪改查:

注意execute執行sql語句參數的兩種情況:

execute(  "insert into t_sales(nickName, size)  values('%s','%s');" % ("zack","L")  )   #此時的%s為字符竄拼接占位符,需要引號加'%s'    (有sql注入風險)

execute(  "insert into t_sales(nickName, size)  values(%s,%s);" , ("zack","L") ) #此時的%s為sql語句占位符,不需要引號%s

#***************************增刪改查******************************************************
conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服務端設置格式一樣(還可設置為gbk, gb2312)
#創建游標
cursor = conn.cursor()

insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
#返回受影響的行數
row1 = cursor.execute(insert_sql,("Bob", "黑色", "XL", "便宜實惠", "2019-04-20"))

update_sql = "update t_sales set color='白色' where id=%s;"
#返回受影響的行數
row2 = cursor.execute(update_sql,(1,))

select_sql = "select * from t_sales where id>%s;"
#返回受影響的行數
row3 = cursor.execute(select_sql,(1,))

delete_sql = "delete from t_sales where id=%s;"
#返回受影響的行數
row4 = cursor.execute(delete_sql,(4,))

#提交,不然無法保存新建或者修改的數據(增刪改得提交)
conn.commit()
cursor.close()
conn.close()

增刪改查語句

批量插入和自增id:

#***************************批量插入******************************************************
conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服務端設置格式一樣(還可設置為gbk, gb2312)
#創建游標
cursor = conn.cursor()

insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
data = [("Bob", "黑色", "XL", "便宜實惠", "2019-04-20"),("Ted", "黃色", "M", "便宜實惠", "2019-04-20"),("Gary", "黑色", "M", "穿着舒服", "2019-04-20")]
row1 = cursor.executemany(insert_sql, data)

conn.commit()
#為插入的第一條數據的id,即插入的為5,6,7,new_id=5
new_id = cursor.lastrowid
print(new_id)
cursor.close()
conn.close()

批量插入

獲取查詢數據:

#***************************獲取查找sql的查詢數據******************************************************
conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服務端設置格式一樣(還可設置為gbk, gb2312)
#創建游標
cursor = conn.cursor()

select_sql = "select id,nickname,size from t_sales where id>%s;"
cursor.execute(select_sql, (3,))

row1 = cursor.fetchone()      #獲取第一條數據,獲取后游標會向下移動一行
row_n = cursor.fetchmany(3)  #獲取前n條數據,獲取后游標會向下移動n行
row_all = cursor.fetchall()  #獲取所有數據,獲取后游標會向下移動到末尾
print(row1)
print(row_n)
print(row_all)
#conn.commit()
cursor.close()
conn.close()

查詢數據獲取

注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:

  • cursor.scroll(1,mode='relative')  # 相對當前位置移動
  • cursor.scroll(2,mode='absolute') # 相對絕對位置移動

fetch獲取數據類型

  fetch獲取的數據默認為元組格式,還可以獲取字典類型的,如下:

#***************************獲取字典格式數據******************************************************
conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服務端設置格式一樣(還可設置為gbk, gb2312)
#創建游標
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

select_sql = "select id,nickname,size from t_sales where id>%s;"
cursor.execute(select_sql, (3,))

row1 = cursor.fetchall()      

print(row1)

conn.commit()
cursor.close()
conn.close()

2.SQLAlchmy框架

  SQLAlchemy的整體架構如下,建立在第三方的DB API上,將類和對象操作轉換為數據庫sql,然后利用DB API執sql語句得到結果。其適用於多種數據庫。另外其內部實現了數據庫連接池,方便進行多線程操作。 

  • Engine,框架的引擎
  • Connection Pooling ,數據庫連接池
  • Dialect,選擇連接數據庫的DB API種類,(pymysql,mysqldb等)
  • Schema/Types,架構和類型
  • SQL Exprression Language,SQL表達式語言
  • DB API: Python Database API Specification

 

 

 

2.1 執行原生sql

  安裝:pip install sqlalchemy

  SQLAlchmy也可以不利用ORM,使用數據庫連接池,類似pymysql模塊執行原生sql

#coding:utf-8

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer
import threading

engine = create_engine(
            "mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",
            max_overflow = 0,  #超過連接池大小外最多創建的連接,為0表示超過5個連接后,其他連接請求會阻塞 (默認為10)
            pool_size = 5,    #連接池大小(默認為5)
            pool_timeout = 30,  #連接線程池中,沒有連接時最多等待的時間,不設置無連接時直接報錯 (默認為30)
            pool_recycle = -1)  #多久之后對線程池中的線程進行一次連接的回收(重置) (默認為-1)
            
# def task():
    # conn= engine.raw_connection() #建立原生連接,和pymysql的連接一樣
    # cur = conn.cursor()
    # cur.execute("select * from t_sales where id>%s",(2,))
    # result = cur.fetchone()
    # cur.close()
    # conn.close()
    # print(result)


    
# def task():
    # conn = engine.contextual_connect() #建立上下文管理器連接,自動打開和關閉
    # with conn:
        # cur = conn.execute("select * from t_sales where id>%s",(2,))
        # result = cur.fetchone()
    # print(result)
    
    
def task():
    cur =engine.execute("select * from t_sales where id>%s",(2,))  #engine直接執行
    result = cur.fetchone()
    cur.close()
    print(result)

if __name__=="__main__":
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()

三種方式執行原生sql

create_engine()方法的所有可選參數見下面源碼:

def create_engine(*args, **kwargs):
    """Create a new :class:`.Engine` instance.

    The standard calling form is to send the URL as the
    first positional argument, usually a string
    that indicates database dialect and connection arguments::


        engine = create_engine("postgresql://scott:tiger@localhost/test")

    Additional keyword arguments may then follow it which
    establish various options on the resulting :class:`.Engine`
    and its underlying :class:`.Dialect` and :class:`.Pool`
    constructs::

        engine = create_engine("mysql://scott:tiger@hostname/dbname",
                                    encoding='latin1', echo=True)

    The string form of the URL is
    ``dialect[+driver]://user:password@host/dbname[?key=value..]``, where
    ``dialect`` is a database name such as ``mysql``, ``oracle``,
    ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as
    ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc.  Alternatively,
    the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`.

    ``**kwargs`` takes a wide variety of options which are routed
    towards their appropriate components.  Arguments may be specific to
    the :class:`.Engine`, the underlying :class:`.Dialect`, as well as the
    :class:`.Pool`.  Specific dialects also accept keyword arguments that
    are unique to that dialect.   Here, we describe the parameters
    that are common to most :func:`.create_engine()` usage.

    Once established, the newly resulting :class:`.Engine` will
    request a connection from the underlying :class:`.Pool` once
    :meth:`.Engine.connect` is called, or a method which depends on it
    such as :meth:`.Engine.execute` is invoked.   The :class:`.Pool` in turn
    will establish the first actual DBAPI connection when this request
    is received.   The :func:`.create_engine` call itself does **not**
    establish any actual DBAPI connections directly.

    .. seealso::

        :doc:`/core/engines`

        :doc:`/dialects/index`

        :ref:`connections_toplevel`

    :param case_sensitive=True: if False, result column names
       will match in a case-insensitive fashion, that is,
       ``row['SomeColumn']``.

       .. versionchanged:: 0.8
           By default, result row names match case-sensitively.
           In version 0.7 and prior, all matches were case-insensitive.

    :param connect_args: a dictionary of options which will be
        passed directly to the DBAPI's ``connect()`` method as
        additional keyword arguments.  See the example
        at :ref:`custom_dbapi_args`.

    :param convert_unicode=False: if set to True, sets
        the default behavior of ``convert_unicode`` on the
        :class:`.String` type to ``True``, regardless
        of a setting of ``False`` on an individual
        :class:`.String` type, thus causing all :class:`.String`
        -based columns
        to accommodate Python ``unicode`` objects.  This flag
        is useful as an engine-wide setting when using a
        DBAPI that does not natively support Python
        ``unicode`` objects and raises an error when
        one is received (such as pyodbc with FreeTDS).

        See :class:`.String` for further details on
        what this flag indicates.

    :param creator: a callable which returns a DBAPI connection.
        This creation function will be passed to the underlying
        connection pool and will be used to create all new database
        connections. Usage of this function causes connection
        parameters specified in the URL argument to be bypassed.

    :param echo=False: if True, the Engine will log all statements
        as well as a repr() of their parameter lists to the engines
        logger, which defaults to sys.stdout. The ``echo`` attribute of
        ``Engine`` can be modified at any time to turn logging on and
        off. If set to the string ``"debug"``, result rows will be
        printed to the standard output as well. This flag ultimately
        controls a Python logger; see :ref:`dbengine_logging` for
        information on how to configure logging directly.

    :param echo_pool=False: if True, the connection pool will log
        all checkouts/checkins to the logging stream, which defaults to
        sys.stdout. This flag ultimately controls a Python logger; see
        :ref:`dbengine_logging` for information on how to configure logging
        directly.

    :param empty_in_strategy:  The SQL compilation strategy to use when
        rendering an IN or NOT IN expression for :meth:`.ColumnOperators.in_`
        where the right-hand side
        is an empty set.   This is a string value that may be one of
        ``static``, ``dynamic``, or ``dynamic_warn``.   The ``static``
        strategy is the default, and an IN comparison to an empty set
        will generate a simple false expression "1 != 1".   The ``dynamic``
        strategy behaves like that of SQLAlchemy 1.1 and earlier, emitting
        a false expression of the form "expr != expr", which has the effect
        of evaluting to NULL in the case of a null expression.
        ``dynamic_warn`` is the same as ``dynamic``, however also emits a
        warning when an empty set is encountered; this because the "dynamic"
        comparison is typically poorly performing on most databases.

        .. versionadded:: 1.2  Added the ``empty_in_strategy`` setting and
           additionally defaulted the behavior for empty-set IN comparisons
           to a static boolean expression.

    :param encoding: Defaults to ``utf-8``.  This is the string
        encoding used by SQLAlchemy for string encode/decode
        operations which occur within SQLAlchemy, **outside of
        the DBAPI.**  Most modern DBAPIs feature some degree of
        direct support for Python ``unicode`` objects,
        what you see in Python 2 as a string of the form
        ``u'some string'``.  For those scenarios where the
        DBAPI is detected as not supporting a Python ``unicode``
        object, this encoding is used to determine the
        source/destination encoding.  It is **not used**
        for those cases where the DBAPI handles unicode
        directly.

        To properly configure a system to accommodate Python
        ``unicode`` objects, the DBAPI should be
        configured to handle unicode to the greatest
        degree as is appropriate - see
        the notes on unicode pertaining to the specific
        target database in use at :ref:`dialect_toplevel`.

        Areas where string encoding may need to be accommodated
        outside of the DBAPI include zero or more of:

        * the values passed to bound parameters, corresponding to
          the :class:`.Unicode` type or the :class:`.String` type
          when ``convert_unicode`` is ``True``;
        * the values returned in result set columns corresponding
          to the :class:`.Unicode` type or the :class:`.String`
          type when ``convert_unicode`` is ``True``;
        * the string SQL statement passed to the DBAPI's
          ``cursor.execute()`` method;
        * the string names of the keys in the bound parameter
          dictionary passed to the DBAPI's ``cursor.execute()``
          as well as ``cursor.setinputsizes()`` methods;
        * the string column names retrieved from the DBAPI's
          ``cursor.description`` attribute.

        When using Python 3, the DBAPI is required to support
        *all* of the above values as Python ``unicode`` objects,
        which in Python 3 are just known as ``str``.  In Python 2,
        the DBAPI does not specify unicode behavior at all,
        so SQLAlchemy must make decisions for each of the above
        values on a per-DBAPI basis - implementations are
        completely inconsistent in their behavior.

    :param execution_options: Dictionary execution options which will
        be applied to all connections.  See
        :meth:`~sqlalchemy.engine.Connection.execution_options`

    :param implicit_returning=True: When ``True``, a RETURNING-
        compatible construct, if available, will be used to
        fetch newly generated primary key values when a single row
        INSERT statement is emitted with no existing returning()
        clause.  This applies to those backends which support RETURNING
        or a compatible construct, including PostgreSQL, Firebird, Oracle,
        Microsoft SQL Server.   Set this to ``False`` to disable
        the automatic usage of RETURNING.

    :param isolation_level: this string parameter is interpreted by various
        dialects in order to affect the transaction isolation level of the
        database connection.   The parameter essentially accepts some subset of
        these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``,
        ``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``.
        Behavior here varies per backend, and
        individual dialects should be consulted directly.

        Note that the isolation level can also be set on a per-:class:`.Connection`
        basis as well, using the
        :paramref:`.Connection.execution_options.isolation_level`
        feature.

        .. seealso::

            :attr:`.Connection.default_isolation_level` - view default level

            :paramref:`.Connection.execution_options.isolation_level`
            - set per :class:`.Connection` isolation level

            :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`

            :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`

            :ref:`MySQL Transaction Isolation <mysql_isolation_level>`

            :ref:`session_transaction_isolation` - for the ORM

    :param label_length=None: optional integer value which limits
        the size of dynamically generated column labels to that many
        characters. If less than 6, labels are generated as
        "_(counter)". If ``None``, the value of
        ``dialect.max_identifier_length`` is used instead.

    :param listeners: A list of one or more
        :class:`~sqlalchemy.interfaces.PoolListener` objects which will
        receive connection pool events.

    :param logging_name:  String identifier which will be used within
        the "name" field of logging records generated within the
        "sqlalchemy.engine" logger. Defaults to a hexstring of the
        object's id.

    :param max_overflow=10: the number of connections to allow in
        connection pool "overflow", that is connections that can be
        opened above and beyond the pool_size setting, which defaults
        to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`.

    :param module=None: reference to a Python module object (the module
        itself, not its string name).  Specifies an alternate DBAPI module to
        be used by the engine's dialect.  Each sub-dialect references a
        specific DBAPI which will be imported before first connect.  This
        parameter causes the import to be bypassed, and the given module to
        be used instead. Can be used for testing of DBAPIs as well as to
        inject "mock" DBAPI implementations into the :class:`.Engine`.

    :param paramstyle=None: The `paramstyle <http://legacy.python.org/dev/peps/pep-0249/#paramstyle>`_
        to use when rendering bound parameters.  This style defaults to the
        one recommended by the DBAPI itself, which is retrieved from the
        ``.paramstyle`` attribute of the DBAPI.  However, most DBAPIs accept
        more than one paramstyle, and in particular it may be desirable
        to change a "named" paramstyle into a "positional" one, or vice versa.
        When this attribute is passed, it should be one of the values
        ``"qmark"``, ``"numeric"``, ``"named"``, ``"format"`` or
        ``"pyformat"``, and should correspond to a parameter style known
        to be supported by the DBAPI in use.

    :param pool=None: an already-constructed instance of
        :class:`~sqlalchemy.pool.Pool`, such as a
        :class:`~sqlalchemy.pool.QueuePool` instance. If non-None, this
        pool will be used directly as the underlying connection pool
        for the engine, bypassing whatever connection parameters are
        present in the URL argument. For information on constructing
        connection pools manually, see :ref:`pooling_toplevel`.

    :param poolclass=None: a :class:`~sqlalchemy.pool.Pool`
        subclass, which will be used to create a connection pool
        instance using the connection parameters given in the URL. Note
        this differs from ``pool`` in that you don't actually
        instantiate the pool in this case, you just indicate what type
        of pool to be used.

    :param pool_logging_name:  String identifier which will be used within
       the "name" field of logging records generated within the
       "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
       id.

    :param pool_pre_ping: boolean, if True will enable the connection pool
        "pre-ping" feature that tests connections for liveness upon
        each checkout.

        .. versionadded:: 1.2

        .. seealso::

            :ref:`pool_disconnects_pessimistic`

    :param pool_size=5: the number of connections to keep open
        inside the connection pool. This used with
        :class:`~sqlalchemy.pool.QueuePool` as
        well as :class:`~sqlalchemy.pool.SingletonThreadPool`.  With
        :class:`~sqlalchemy.pool.QueuePool`, a ``pool_size`` setting
        of 0 indicates no limit; to disable pooling, set ``poolclass`` to
        :class:`~sqlalchemy.pool.NullPool` instead.

    :param pool_recycle=-1: this setting causes the pool to recycle
        connections after the given number of seconds has passed. It
        defaults to -1, or no timeout. For example, setting to 3600
        means connections will be recycled after one hour. Note that
        MySQL in particular will disconnect automatically if no
        activity is detected on a connection for eight hours (although
        this is configurable with the MySQLDB connection itself and the
        server configuration as well).

        .. seealso::

            :ref:`pool_setting_recycle`

    :param pool_reset_on_return='rollback': set the
        :paramref:`.Pool.reset_on_return` parameter of the underlying
        :class:`.Pool` object, which can be set to the values
        ``"rollback"``, ``"commit"``, or ``None``.

        .. seealso::

            :paramref:`.Pool.reset_on_return`

    :param pool_timeout=30: number of seconds to wait before giving
        up on getting a connection from the pool. This is only used
        with :class:`~sqlalchemy.pool.QueuePool`.

    :param plugins: string list of plugin names to load.  See
        :class:`.CreateEnginePlugin` for background.

        .. versionadded:: 1.2.3

    :param strategy='plain': selects alternate engine implementations.
        Currently available are:

        * the ``threadlocal`` strategy, which is described in
          :ref:`threadlocal_strategy`;
        * the ``mock`` strategy, which dispatches all statement
          execution to a function passed as the argument ``executor``.
          See `example in the FAQ
          <http://docs.sqlalchemy.org/en/latest/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string>`_.

    :param executor=None: a function taking arguments
        ``(sql, *multiparams, **params)``, to which the ``mock`` strategy will
        dispatch all statement execution. Used only by ``strategy='mock'``.

    """

    strategy = kwargs.pop('strategy', default_strategy)
    strategy = strategies.strategies[strategy]
    return strategy.create(*args, **kwargs)

create_engine源碼

2.2 執行ORM語句

  A. 創建和刪除表

#coding:utf-8

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, DateTime, Text

Base = declarative_base()

class User(Base):
    __tablename__="users"
    id = Column(Integer,primary_key=True)
    name = Column(String(32),index=True, nullable=False) #創建索引,不為空
    email = Column(String(32),unique=True)
    ctime = Column(DateTime, default = datetime.datetime.now)  #傳入方法名datetime.datetime.now
    extra = Column(Text,nullable=True)  
    
    __table_args__ = {
    
        # UniqueConstraint('id', 'name', name='uix_id_name'), #設置聯合唯一約束
        # Index('ix_id_name', 'name', 'email'),               # 創建索引
    }

def create_tbs():
    engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
    Base.metadata.create_all(engine)   #創建所有定義的表

def drop_dbs():
    engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
    Base.metadata.drop_all(engine)   #刪除所有創建的表

if __name__=="__main__":
    create_tbs() #創建表
    #drop_dbs()   #刪除表
    

創建和刪除表格

B.表中定義外鍵關系(一對多,多對多)

    (思考:下面代碼中的一對多關系,relationship定義在了customer表中,應該定義在PurchaseOrder更合理?)

    (注意:mysql數據庫中避免使用order做為表的名字,order為一個mysql關鍵字,做為表名字時必須用反引號`order` (鍵盤數字1旁邊的符號))

#coding:utf-8

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float
from sqlalchemy.orm import relationship
import datetime

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")  #數據庫有密碼時,//root:12345678@127.0.0.1:3306/
Base = declarative_base()

class Customer(Base):
    __tablename__="customer"  #數據庫中保存的表名字
    
    id = Column(Integer,primary_key=True)
    name = Column(String(64),index=True,nullable=False)
    phone = Column(String(16),nullable=False)
    address = Column(String(256),nullable=False)
    purchase_order_id = Column(Integer,ForeignKey("purchase_order.id"))  #關鍵關系,關聯表的__tablename__="purchase_order"
    
    # 和建立表結構無關,方便外鍵關系查詢,backref反向查詢時使用order_obj.customer
    purchase_order = relationship("PurchaseOrder",backref="customer")

    
    
class PurchaseOrder(Base):  
    __tablename__ = "purchase_order"   #mysql數據庫中表的名字避免使用order,order為一個關鍵字,使用時必須用反引號`order` (鍵盤數字1旁邊的符號)
    id=Column(Integer,primary_key=True)
    cost = Column(Float,nullable=True)
    ctime = Column(DateTime,default =datetime.datetime.now)
    desc = Column(String(528))
    
    #多對多關系時,secondary為中間表
    product = relationship("Product",secondary="order_to_product",backref="purchase_order")
    
class Product(Base):
    __tablename__ = "product"
    id = Column(Integer,primary_key=True)
    name = Column(String(256))
    price = Column(Float,nullable=False)
    
class OrdertoProduct(Base):
    __tablename__ = "order_to_product"
    id = Column(Integer,primary_key=True)
    product_id = Column(Integer,ForeignKey("product.id"))
    purchase_order_id  = Column(Integer,ForeignKey("purchase_order.id"))
    


if __name__ == "__main__":
    Base.metadata.create_all(engine)
    #Base.metadata.drop_all(engine)

外鍵關系

C.增刪改查操作

#coding:utf-8

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float
from sqlalchemy.orm import relationship,sessionmaker
from sqlalchemy.sql import text
import datetime

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")  #數據庫有密碼時,//root:12345678@127.0.0.1:3306/, 設置utf8防止中文亂碼
Base = declarative_base()

class Customer(Base):
    __tablename__="customer"  #數據庫中保存的表名字
    
    id = Column(Integer,primary_key=True)
    name = Column(String(64),index=True,nullable=False)
    phone = Column(String(16),nullable=False)
    address = Column(String(256),nullable=False)
    purchase_order_id = Column(Integer,ForeignKey("purchase_order.id"))  #關鍵關系,關聯表的__tablename__="purchase_order"
    
    # 和建立表結構無關,方便外鍵關系查詢,backref反向查詢時使用order_obj.customer
    purchase_order = relationship("PurchaseOrder",backref="customer")
    
    
class PurchaseOrder(Base):  
    __tablename__ = "purchase_order"   #mysql數據庫中表的名字避免使用order,order為一個關鍵字,使用時必須用反引號`order` (鍵盤數字1旁邊的符號)
    id=Column(Integer,primary_key=True)
    cost = Column(Float,nullable=True)
    ctime = Column(DateTime,default =datetime.datetime.now)
    desc = Column(String(528))
    
    #多對多關系時,secondary為中間表
    product = relationship("Product",secondary="order_to_product",backref="purchase_order")
    
class Product(Base):
    __tablename__ = "product"
    id = Column(Integer,primary_key=True)
    name = Column(String(256))
    price = Column(Float,nullable=False)
    
class OrdertoProduct(Base):
    __tablename__ = "order_to_product"
    id = Column(Integer,primary_key=True)
    product_id = Column(Integer,ForeignKey("product.id"))
    purchase_order_id  = Column(Integer,ForeignKey("purchase_order.id"))


if __name__ == "__main__":
    #Base.metadata.create_all(engine)
    #Base.metadata.drop_all(engine)
    
    Session = sessionmaker(bind=engine)

    #每次進行數據庫操作時都要創建session
    session = Session()

    #*****************增加數據********************
    # pur_order = PurchaseOrder(cost=19.7,desc="python編程之路")

    # session.add(pur_order)
    # session.add_all(
                # [PurchaseOrder(cost=39.7,desc="linux操作系統"),
                # PurchaseOrder(cost=59.6,desc="python cookbook")])
    # session.commit()
    
    #*****************修改數據********************
    
    #session.query(PurchaseOrder).filter(PurchaseOrder.id>2).update({"cost":29.7})
    #session.query(PurchaseOrder).filter(PurchaseOrder.id==2).update({"cost":PurchaseOrder.cost+40.1},synchronize_session=False)  #synchronize_session用於query在進行delete or update操作時,對session的同步策略。
    #session.commit()
    
    #*****************刪除數據********************
    #session.query(PurchaseOrder).filter(PurchaseOrder.id==1).delete()
    #session.commit()
    
    #*****************查詢數據********************
    #ret = session.query(PurchaseOrder).all()
    # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).all()  #包含對象的列表
    # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).first() #單個對象
    # ret = session.query(PurchaseOrder).filter_by(id=2).all()  #通過列名字的表達式
    # ret = session.query(PurchaseOrder).filter_by(id=2).first()
    #ret = session.query(PurchaseOrder).filter(text("id<:value and cost>:price")).params(value=6,price=15).order_by(PurchaseOrder.cost).all()
    #ret = session.query(PurchaseOrder).from_statement(text("SELECT * FROM purchase_order WHERE cost>:price")).params(price=40).all()
    # print ret
    # for i in ret:
        # print i.id, i.cost, i.ctime,i.desc
        
    #ret2 = session.query(PurchaseOrder.id,PurchaseOrder.cost.label('totalcost')).all()  #只查詢兩列,ret2為列表
    #print ret2
    
    #關閉session
    session.close()

增刪改查

查詢語句

# 條件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 連表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

查詢語句
#coding:utf-8


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text, func
from sqlalchemy_orm2 import PurchaseOrder  #導入定義的PurchaseOrder表格類

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
Session = sessionmaker(bind=engine)
session = Session()
#查詢
ret = session.execute("select * from purchase_order where id=:value",params={"value":3})
print ret
for i in ret:
    print i.id, i.cost, i.ctime,i.desc

#插入
purchase_order  = PurchaseOrder.__table__  #拿到PurchaseOrder表格對象
ret=session.execute(purchase_order.insert(),
                [{"cost":46.3,"desc":'python2'},
                {"cost":43.3,"desc":'python3'}])
session.commit()
print(ret.lastrowid)


session.close()






# 關聯子查詢
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
FROM server 
WHERE server.id = `group`.id) AS anon_1 
FROM `group`
"""

補充

D.多線程操作

#coding:utf-8

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_orm2 import Product
from threading import Thread

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=0,pool_size=5)
Session = sessionmaker(bind=engine)

def task(name,price):
    session = Session()
    pro = Product(name=name,price=price)
    session.add(pro)
    session.commit()
    session.close()

if __name__=="__main__":
    for i in range(6):
        t = Thread(target=task,args=("pro"+str(i),i*5))
        t.start()
    

多線程

E. 通過relationship操縱一對多和多對多關系

#coding:utf-8


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text, func
from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer  #導入定義的表格類


engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
Session = sessionmaker(bind=engine)
session = Session()

# #通過定義的關鍵關系添加(id值)
# cus1 = Customer(name="zack",phone="13567682333",address="Nanjing",purchase_order_id=3)
# session.add(cus1)


# #通過relationship正向添加
# cus2 = Customer(name="zack2",phone="13567682333",address="Nanjing",purchase_order=PurchaseOrder(cost=53,desc="java"))
# session.add(cus2)
# session.commit()

#通過relationship反向添加
# purchase_order=PurchaseOrder(cost=53,desc="php")
# cus3 = Customer(name="zack3",phone="13567682333",address="Nanjing")
# cus4 = Customer(name="zack4",phone="13567682333",address="Nanjing")
# purchase_order.customer=[cus3,cus4]  #cus3,cus4的purchase_order_id都是purchase_order.id值,即同時添加了兩組外鍵關系
# session.add(purchase_order)
# session.commit()

##通過relationship正向查詢
cus = session.query(Customer).first()
print(cus.purchase_order_id)
print(cus.purchase_order.desc)

#通過relationship反向查詢
pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==3).first()
print(pur.desc)
print(pur.customer) #返回一個list

一對多
#coding:utf-8


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text, func
from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer  #導入定義的表格類

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
Session = sessionmaker(bind=engine)
session = Session()

# session.add_all([Product(name="java",price=24),
                # Product(name="python",price=34),
                # Product(name="php",price=27)])
# session.commit()

# #通過定義的關鍵關系添加(id值)
# op = OrdertoProduct(product_id=1,purchase_order_id=16)
# session.add(op)
# session.commit()

# #通過relationship添加
# pur = PurchaseOrder(cost=27,desc="xxxx")
# pur.product = [Product(name="C++",price=60),]  #正向
# session.add(pur)

# pro = Product(name="C",price=40)
# pro.purchase_order=[PurchaseOrder(cost=27,desc="xxxx"),]  #反向
# session.add(pro)
# session.commit()


#通過relationship正向查詢
pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==19).first()
print(pur.desc)
print(pur.product) #結果為列表

#通過relationship反向查詢
pro = session.query(Product).filter(Product.id==5).first()
print(pro.name)
print(pro.purchase_order) #結果為列表

session.close()

多對多

3.數據庫連接池

  對於ORM框架,其內部維護了鏈接池,可以直接通過多線程操控數據庫。對於pymysql模塊,通過多線程操控數據庫容易出錯,得加鎖串行執行。進行並發時,可以利用DBUtils模塊來維護數據庫連接池。

  3.1 多線程操控pymysql

  不采用DBUtils連接池時, pymysql多線程代碼如下:

import pymysql
import threadind


#**************************無連接池*******************************    
#每個線程都要創立一次連接,線程並發操作間可能有問題?
def func():
    conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")
    cursor = conn.cursor()
    cursor.execute("select * from user where nid>1;")
    result = cursor.fetchone()
    print(result)
    cursor.close()
    conn.close()
    
if __name__=="__main__":
    for i in range(5):
        t = threading.Thread(target=func,name="thread-%s"%i)
        t.start()

每個線程創建連接
#**************************無連接池*******************************
#創建一個連接,加鎖串行執行
from threading import Lock
import pymysql
import threading
conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")    


lock = Lock()    
def func():
    with lock:
        cursor = conn.cursor()
        cursor.execute("select * from user where nid>1;")
        result = cursor.fetchone()
        print(result)
        cursor.close()
        
        #conn.close()不能在線程中關閉連接,否則其他線程不可用了
        
if __name__=="__main__":
    threads = []
    for i in range(5):
        t = threading.Thread(target=func,name="thread-%s"%i)
        threads.append(t)
        t.start()
        
    for t in threads:
        t.join()
    
    conn.close()

一個連接串行執行

3.2 DBUtils連接池

  DBUtils連接池有兩種連接模式:PersistentDB和PooledDB   (官網文檔:https://cito.github.io/DBUtils/UsersGuide.html)

  模式一(DBUtils.PersistentDB):為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉。

PersistentDB使用代碼如下:

#coding:utf-8

from DBUtils.PersistentDB import PersistentDB
import pymysql
import threading

pool = PersistentDB(
    creator = pymysql,  # 使用鏈接數據庫的模塊
    maxusage = None,    # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],     # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping = 0,           # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable = False,    # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接)
    threadlocal = None,    # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置
    host="127.0.0.1",
    port = 3306,
    user = "root",
    password="",
    database="learningsql",
    charset = "utf8"
)

def func():
    conn = pool.connection()
    cursor = conn.cursor()
    cursor.execute("select * from user where nid>1;")
    result = cursor.fetchone()
    print(result)
    cursor.close()
    conn.close()
    
if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=func,name="thread-%s"%i)
        t.start()

PersistentDB 使用

模式二(DBUtils.PooledDB):創建一批連接到連接池,供所有線程共享使用。

   (由於pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享。)

PooledDB使用代碼如下:

 

from DBUtils.PooledDB import PooledDB
import pymysql
import threading
import time

pool = PooledDB(
    creator = pymysql,  # 使用鏈接數據庫的模塊
    maxconnections = 6,  # 連接池允許的最大連接數,0和None表示不限制連接數
    mincached = 2,   # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
    maxcached = 5,   # 鏈接池中最多閑置的鏈接,0和None不限制
    maxshared = 3,   # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
    blocking = True,  # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
    maxusage = None,   # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession = [],   # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping = 0,           # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host="127.0.0.1",
    port = 3306,
    user="root",
    password="",
    database = "learningsql",
    charset = "utf8"
)

def func():
    conn = pool.connection()
    cursor = conn.cursor()
    cursor.execute("select * from user where nid>1;")
    result = cursor.fetchone()
    print(result)
    time.sleep(5)  #為了查看mysql端的線程數量
    cursor.close()
    conn.close()
    
if __name__=="__main__":
    for i in range(5):
        t = threading.Thread(target=func,name="thread-%s"%i)
        t.start()

PooledDB使用

上述代碼中加入了sleep(5)使線程連接數據庫時間延長,方便查看mysql數據庫連接線程情況,下圖分別為代碼執行中和執行后的線程連接情況,可以發現,代碼執行時,同時有6個線程連接上了數據庫(有一個為mysql命令客戶端),代碼執行后,只有一個線程連接數據庫,但仍有5個線程等待連接。

(show status like "Threads%" 查看線程連接情況)

 

 

 

 

轉載:https://www.cnblogs.com/silence-cho/p/10747776.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM