本人是一個沒有做筆記習慣的低級程序員,但是我還是喜歡編程,最近發現自己的記憶力嚴重的下降,原來自己學過的東西過段差不多都忘記了,現在才想起用博客記錄一下,方便以后自己查看,同時也是為了能更好的提高自己的能力。最近在學習flask框架和mysql,所以再此總結一下。
一 flask框架mysql文件
通過看別的大佬的項目最常見的配置mysql就是
1)創建一個config.py文件
class BaseConfig(object): # 數據庫的配置 DIALCT = "mysql" DRITVER = "pymysql" HOST = '127.0.0.1' PORT = "3306" USERNAME = "root" PASSWORD = "123456" DBNAME = 'test_auto' SQLALCHEMY_DATABASE_URI = f"{DIALCT}+{DRITVER}://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}?charset=utf8" SQLALCHEMY_TRACK_MODIFICATIONS = True

2)導入config文件 通過app.config添加配置文件
from flask import Flask from config import BaseConfig from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) #添加配置文件 app.config.from_object(BaseConfig) #初始化擴展,傳入app 創建db db = SQLAlchemy(app)

3 )config.py配置文件
SQLALCHEMY_DATABASE_URI 以及SQLALCHEMY_TRACK_MODIFICATIONS注意不要寫錯 這些變量可以在\Python37\Lib\site-packages\flask_sqlalchemy\__init__.py目錄下查看SQLAlchemy的默認配置文件 和\Python37\Lib\site-packages\flask\app.py目錄下查看config的默認配置文件
app.config的默認配置文件
default_config = ImmutableDict( { "ENV": None, "DEBUG": None, "TESTING": False, "PROPAGATE_EXCEPTIONS": None, "PRESERVE_CONTEXT_ON_EXCEPTION": None, "SECRET_KEY": None, "PERMANENT_SESSION_LIFETIME": timedelta(days=31), "USE_X_SENDFILE": False, "SERVER_NAME": None, "APPLICATION_ROOT": "/", "SESSION_COOKIE_NAME": "session", "SESSION_COOKIE_DOMAIN": None, "SESSION_COOKIE_PATH": None, "SESSION_COOKIE_HTTPONLY": True, "SESSION_COOKIE_SECURE": False, "SESSION_COOKIE_SAMESITE": None, "SESSION_REFRESH_EACH_REQUEST": True, "MAX_CONTENT_LENGTH": None, "SEND_FILE_MAX_AGE_DEFAULT": None, "TRAP_BAD_REQUEST_ERRORS": None, "TRAP_HTTP_EXCEPTIONS": False, "EXPLAIN_TEMPLATE_LOADING": False, "PREFERRED_URL_SCHEME": "http", "JSON_AS_ASCII": True, "JSON_SORT_KEYS": True, "JSONIFY_PRETTYPRINT_REGULAR": False, "JSONIFY_MIMETYPE": "application/json", "TEMPLATES_AUTO_RELOAD": None, "MAX_COOKIE_SIZE": 4093, } )
sqlalchemy的默認配置文件
if ( 'SQLALCHEMY_DATABASE_URI' not in app.config and 'SQLALCHEMY_BINDS' not in app.config ): warnings.warn( 'Neither SQLALCHEMY_DATABASE_URI nor SQLALCHEMY_BINDS is set. ' 'Defaulting SQLALCHEMY_DATABASE_URI to "sqlite:///:memory:".' ) app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') app.config.setdefault('SQLALCHEMY_BINDS', None) app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None) app.config.setdefault('SQLALCHEMY_ECHO', False) app.config.setdefault('SQLALCHEMY_RECORD_QUERIES', None) app.config.setdefault('SQLALCHEMY_POOL_SIZE', None) app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None) app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', None) app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None) app.config.setdefault('SQLALCHEMY_COMMIT_ON_TEARDOWN', False) track_modifications = app.config.setdefault( 'SQLALCHEMY_TRACK_MODIFICATIONS', None )
4)創建數據查詢模型 並繼承db.Model

二,直接使用SQLAlchemy連接mysql,不用通過flask框架
1),SQLAlchemy和pymsql的安裝
在使用SQLAlchemy連接mysql前需要先給Python安裝MySQL驅動,由於MySQL不支持Python3,所以可以同pymsql與SQLAlchemy進行交互
pip install pymysql
pip install sqlalchemy
2),連接數據庫 連接數據庫的引擎參數形式
engine = create_engine("數據庫類型+數據庫驅動://數據庫用戶名:數據庫密碼@IP地址:端口號/數據庫?編碼...", 其它參數)
注意:charset是utf8而不是utf-8,不能帶- 不然會包異常
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True)
echo=True, # 設置為True,程序運行時反饋執行過程中的關鍵對象,包括 ORM 構建的 sql 語句
pool_size=5, # 數據庫連接池初始化的容量,保持的連接數,初始化時,並不產生連接。只有慢慢需要連接時,才會產生連接
max_overflow=10, # 連接池最大溢出容量,該容量+初始容量=最大容量。超出會堵塞等待,等待時間為timeout參數值默認30
pool_timeout=30, #從連接池里獲取連接,如果此時無空閑的連接。且連接數已經到達了pool_size+max_overflow。此時獲取連接的進程會等待pool_timeout秒。如果超過這個時間,還沒有獲得將會拋出異常。
sqlalchemy默認30秒
pool_recycle=7200 # 重連周期
create_engine()返回的是Engine的一個實例,代表了操作數據庫的核心接口,處理數據庫和數據庫的API,可以直接使用engine.execute()或者engine.connect()來直接建立一個DBAPI的連接,但是如果我們要使用ORM, 就不能直接使用engine,初次調用create_engine()並不會真正連接數據庫,只有在真正執行一條命令的時候才會嘗試建立連接,目的是節省資源
連接oracle時
#db_confug:是字典 db_url = 'oracle+cx_oracle://{oracle_user}:{oracle_password}@{oracle_host}:{oracle_port}/{oracle_sid}'.format(**db_confug) create_engine(db_url) #如果在連接的時候報oracle ORA-12505, TNS:listener does not currently know of SID #解決方法是 db_url = "oracle+cx_oracle://{username}:{password}@{host}:{port}/?service_name={service}".format(**db_cfg)
3),映射說明
在使用ORM時 主要有兩個配置過程;一是數據庫表的信息描述處理;二是將類映射到這些表上。它們在SQLAlchemy中一起完成,被稱為Declarative,
它們在SQLAlchemy中一起完成,被稱為Declarative。使用Declarative參與的ORM映射的類需要被定義為一個指定基類的子類,這個基類含有ORM映射中相關類和表的信息。這樣的基類稱為declarative base class。這個基類可以通過declarative_base來創建。
from sqlalchemy.ext.declarative import declarative_base Base = declarative_base()
4)建立映射關系
數據庫與 Python 對象的映射主要在體現三個方面,其中前兩者是比較常見的映射
- 數據庫表 (table)映射為 Python 的類 (class),稱為 model
- 表的字段 (field) 映射為 Column
- 表的記錄 (record)以類的實例 (instance) 來表示
在sqlalchemy中表字段的常見類型如下:
Interger:整型,映射到數據庫中是int類型
Float:浮點類型,float
Double; 雙精度類型
String; 字符串類型 需要指定字符串的長度
Boolean; 布爾類型
Decimal: 定點類型,專門為解決浮點類型精度丟失的問題而設定。Decimal需要傳入兩個參數,第一個參數標記該字段能存儲多少位數,第二個參數表示小數點后有又多少個小數位。
Enum:枚舉類型;
Date:日期類型,年月日;
DateTime: 時間類型,年月日時分毫秒;
Time:時間類型,時分秒;
Text:長字符串,可存儲6萬多個字符,text;
LongText:長文本類型,longtext.
在指定表字段的映射為Column時 不但要指定表字段的數據類型,往往還需要指定哪些字段是主
primary_key #是否為主鍵 unique #是否唯一 index #如果為True,為該列創建索引,提高查詢效率 nullable #是否允許為空 default #默認值 name #在數據表中的字段映射 autoincrement #是否自動增長 onupdate #更新時執行的函數 comment #字段描述
5) 創建類
根據以上字段的信息了解 可以自定義一個類
- __tablename__指定表名(要注意大小寫)
- Column類指定對應的字段,必須指定
from sqlalchemy import create_engine,Column,Integer,String,Enum from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True) #學生 class Student(Base): #繼承Base __tablename__ = "student" #表名
id = Column(Integer, primary_key=True,comment="學生ID") #為主鍵 name = Column(String(60), default=None, nullable=False, comment="學生姓名") #學生名 nullable能否為空 False不能為空 True可以為空 phone = Column(String(11), default=None, nullable=True,comment="學生電話") #電話 可以為空 默認是None gender = Column(Enum("男","女"), default="男", nullable=False, comment="性別") #性別 Enum枚舉 不能為空
#這個為可選項,只是增加對於表的描述,便於以后測試,也可以描述的更加詳細
def __repr__(self): return "<User(name='%s', phone='%s', gender='%s')>" % (self.name,self.phone, self.gender)
if __name__ == '__main__':
Base.metadata.create_all(engine) # 通過基類與數據庫進行交互創建表結構,此時表內還沒有數據
# Base.metadata.drop_all(engine) #刪除數據庫的表
在實際編碼的時候,常見的方式是先在數據庫中建表,然后再用代碼操作數據庫。上面這種聲明式定義映射模型,對 Column 的聲明是很枯燥的。如果表的字段很多,這種枯燥的代碼編寫也是很痛苦的事情。
解決辦法有兩個:
方法一:安裝 sqlacodegen 庫 (pip install sqlacodegen),然后通過下面的命令,基於數據庫中的表自動生成 model 映射的代碼。sqlacodegen 用法如下:
sqlacodegen --tables 表名 --outfile=路徑名稱 database url
database url 是與sqlalchemy的相同
sqlacodegen --table teacher --outfile=teacher.py mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8

方法二,在構建 model 的時候,使用 autoload = True,sqlalchemy 依據數據庫表的字段結構,自動加載 model 的 Column。使用這種方法時,在構建 model 之前,Base 類要與 engine 進行綁定。下面的代碼演示了 autoload 模式編寫 model 映射的方法:
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql.schema import Table engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True) Base = declarative_base() metadata = Base.metadata metadata.bind = engine class Course(Base): __tablename__= Table("course", metadata, autoload=True )
6)創建會話
- 在一個會話中操作數據庫,會話建立在連接上,連接被引擎管理。當第一次使用數據庫時,從引擎維護的連接池中獲取一個連接使用。
- session對象多線程不安全。所以不同線程應該使用不用的session對象。Session類和engine有一個就行
SQLAlchemy的Session是用於管理數據庫操作的一個像容器一樣的工廠對象。Session工廠對象中提供query(), add(), add_all(), commit(), delete(), flush(), rollback(), close()等方法
6.1單線程創建session
from sqlalchemy import create_engine,Column,Integer,String,Enum from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True) Base = declarative_base() session_factory = sessionmaker(bind=engine) Session = session_factory()
6.2多線程利用數據庫連接池創建session
#數據庫模塊model.py from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True,
max_overflow=10, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置)
)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
####業務模塊thread.py import threading from model import Session class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(20)) fullname = Column(String(20)) password = Column(String(20)) age = Column(Integer) class MyThread(threading.Thread): def __init__(self, threadName): super(MyThread, self).__init__() self.name = threading.current_thread().name def run(self): session = Session() #每個線程都可以直接使用數據庫模塊定義的Session,執行時,每一個session都相當於一個connection session.query(User).all() user = User(name="hawk-%s"%self.name, fullname="xxxx",password="xxxx",age=10) session.add(user) time.sleep(1) if self.name == "thread-9": session.commit() Session.remove() if __name__ == "__main__": arr = [] for i in xrange(10): arr.append(MyThread('thread-%s' % i)) for i in arr: i.start() for i in arr: i.join()
7) sqlalchemy對數據庫的基本操作
7.1創建一個表
Base.metadata.create_all(engine) # 通過基類與數據庫進行交互創建表結構,此時表內還沒有數據 # Base.metadata.drop_all(engine) #刪除數據庫的表
7.2添加 add():增加一個對象
new_stu = Student(name='張三', gender="男", phone='15814725896') Session.add(new_stu) # 添加一個 Session.commit() # 需要調用commit()方法提交事務

7.3添加 add_all():可迭代對象,元素是對象
new_stu1 = Student(name='李四', gender="女", phone='15814725897') new_stu2 = Student(name='王二', gender="男", phone='15814725898') new_stu3 = Student(name='往二', gender="女", phone='15814725899') Session.add_all([new_stu1,new_stu2,new_stu3]) #一次性添加一批 Session.commit() # 需要調用commit()方法提交事務

#查找 后添加first(), all()等返回對應的查詢對象
7.4 query() 不進行條件查詢
Session.query(Student).all() #查詢所有 返回的對象是list列表
Session.query(Student).first() #查詢第一個


7.5 query()進行條件篩選
Query 對象提供了 filter() 方法和 filter_by() 方法用於數據篩選。filter_by() 適用於簡單的基於關鍵字參數的篩選。 filter() 適用於復雜條件的表達
Session.query(Student).filter(Student.id<=3).all()
Session.query(Student).filter(Student.name=='張三').first()
Session.query(Student).filter_by(name='張三').first()

如果我們要找出所有id< =3 的數據只能用 filter() 方法,因為 filter_by 只支持關鍵字參數,不能實現
如果有兩個限制條件是AND關系,可以直接使用兩次filter()處理
Session.query(Student).filter(Student.name == '張三').filter(Student.gender == '男').first() #或者是 Session.query(Student).filter_by(name='張三').filter(Student.gender=='男').first()

當然也有其他的展示方法
#filter operator : == #相等 != #不相等 like(’%關鍵字%’) #通配符搜索 ilike() #通配符搜索(不敏感) .in_([‘張三’, ‘張三1’, ‘張三2’]) #存在於 ~User.name.in_([‘張三’, ‘張三1’, ‘張三2’]) #不存在於 ==None #為空 .filter(Student.name == ‘張三’, User.gender == ‘女’) #and or_(Student.name == ‘張三’, Student.name == ‘張三2’) #OR #檢索返回的列表,以及列表的標量 : all() #返回所有 first() #返回第一行 one() #檢查是不是只有一行結果 如果存在則返回改數據如果不存在則報異常 one_or_none() #檢查是不是一行或者沒有結果 如果存在則返回該數據 若不存在則返回None
大致常見條件的表達,因為比較直觀
def test_filter_le(self): Stu = Session.query(Student).filter(Student.id <= 3).all() print(Stu) def test_filter_ne(self): Stu = Session.query(Student).filter(Student.id != 2).all() print(Stu) def test_filter_like(self): Stu = Session.query(Student).filter(Student.id.like('%9')).all() print(Stu) def test_filter_in(self): Stu = Session.query(Student).filter(Student.EDUCATION.in_(['Bachelor', 'Master'])).all() print(Stu) def test_filter_notin(self): Stu = Session.query(Student).filter(~Student.EDUCATION.in_(['Bachelor', 'Master'])).all() print(Stu) def test_filter_isnull(self): Stu = Session.query(Student).filter(Student.MARITAL_STAT == None).all() print(Stu) def test_filter_isnotnull(self): Stu = Session.query(Student).filter(Student.MARITAL_STAT != None).all() print(Stu) def test_filter_and(self): Stu = Session.query(Student).filter(Student.GENDER=='Female', Student.EDUCATION=='Bachelor').all() print(Stu) def test_filter_and2(self): Stu = Session.query(Student).filter(and_(Student.GENDER=='Female', Student.EDUCATION=='Bachelor')).all() print(Stu) def test_filter_or(self): Stu = Session.query(Student).filter(or_(Student.MARITAL_STAT=='Single', Student.NR_OF_CHILDREN==0)).all() print(Stu)
7.6更新 (注意要重新commit()提交)
stu = Session.query(Student).filter(Student.name == '張三').first() #第一步 查詢出要修改的數據 print(f"修改之前={stu}") stu.gender = "女" #第2步 修改字段值 Session.add(stu) #第3步然后重新添加 Session.commit() #第4步最后再重新提交
#或者
Session.query(Student).filter(Student.name == '張三').update({"gender":"女"})
Session.commit() #最后再重新提交

7.8 刪除
stu = Session.query(Student).filter(Student.name == '張三').first() #查詢數據 Session.delete(stu) #刪除數據 Session.commit() #再次提交
#或者
stu = Session.query(Student).filter(Student.name == '張三').delete() #查詢數據並刪除
print(stu) #輸出的是刪除的個數
Session.commit() #再次提交

8) 執行原生的SQL語句
從SQLAlchemy中導出text方法,可以通過text(SQL語句)嵌入使用SQL語句。
-
方法一:使用Session 進行執行原生的sql語句
from sqlalchemy import text
#查詢 stu = Session.query(Student).filter(text('name="張三"')).first() print(f'查詢數據:{stu}')
#返回字典的類型的數組
sql = "select * from student WHERE name='張三'"
res_rows = Session.execute(text(sql)).fetchall()
result = [dict(zip(result.keys(), result)) for result in res_rows]
print(result)
#執行結果:
[{'id': 1, 'name': '張三', 'phone': '15814725896', 'gender': '男'}]
注意:對於使用原生SQL查詢出來的結果是一個list,
首先,使用一個變量接收你以上的查詢結果。
其次,在這個list中包含着一個或多個tuple,其實這並不是標准的Python tuple,而是一個特殊的類型"<class 'sqlalchemy.util._collections.result'>",
這是一個 AbstractKeyedTuple 對象,它擁有一個 keys() 方法。
最后,我們可以通過這個方法將查詢結果轉換為字典,需要傳到前端展示只需要將其裝換為json格式即可。
示例:data = [dict(zip(result.keys(), result)) for result in results]
# 添加 cursor = session.execute('insert into users(name) values(:value)', params={"value": 'abc'}) session.commit() print(cursor.lastrowid)

-
方法二通過Engine對象執行原生的SQL語句
sql = "select * from student WHERE name='張三'" with engine.connect() as conn: stu = conn.execute(sql) result= stu.fetchall() print(f'執行原生SQ結果={result}')
conn.close()

9)建立表與表之間的關系(一對多,多對一)

一個學生可以有多張成績單,可以通過獲取學生的信息 ,查詢到成績單表的信息 此時用ForeignKey 去關聯少的一方;同是還需要使用relationship函數反向關聯多的一方,實現通過獲取獲取成績單的數g,然后調用backref的值獲取學生表信息
from sqlalchemy import create_engine,Column,Integer,String,Enum from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship from sqlalchemy import ForeignKey # engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8",echo=True) engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_auto?charset=utf8") Base = declarative_base() Session_factory = sessionmaker(bind=engine) Session = Session_factory() #學生 class Student(Base): #繼承Base __tablename__ = "student" id = Column(Integer, primary_key=True,comment="學生ID") #主鍵 name = Column(String(60), default=None, nullable=False, comment="學生姓名") #學生名 nullable能否為空 phone = Column(String(20), default=None, nullable=True,comment="學生電話") #電話 可以為空 默認是None gender = Column(Enum("男","女"), default="男", nullable=False, comment="性別") # Enum枚舉 不能為空 #relationship("被關聯着的類名",backref="任意字段") #backref 標示可通過relationship反向查找,即通過Grade.back_stu查詢Student表數據 grade = relationship("Grade",backref="back_stu") #是Student的數據可以通過grade 訪問到Grade類的所有屬性值 def __repr__(self): return "<User(name='%s', phone='%s', gender='%s')>" % (self.name,self.phone, self.gender) #成績表 class Grade(Base): __tablename__ = "grade" id = Column(Integer, primary_key=True, comment="成績ID") # 主鍵 grade = Column(String(60), default=None, nullable=False, comment="成績") #成績 #ForeignKey("__tablename__.主鍵") student_id = Column(Integer, ForeignKey("student.id"), nullable=False,comment="學生ID") # 建立外鍵關系,理解為該表外鍵student_id關聯student表的id字段 def __repr__(self): return "<Grade(grade='%s', student_id='%s')>" % (self.grade,self.student_id) # 從一 訪問 多 通過獲取學生Student信息 並查詢到成績表Grade的所有屬性值 stu = Session.query(Student).filter(Student.id==2).first() print(f'從一 訪問 多={stu.grade}') #通過Student屬性grade獲取到成績表Grade的信息 # 從多 訪問 一 通過獲取成績表Grade的信息 並查詢到學生Student的所有屬性值 g = Session.query(Grade).filter(Grade.grade=='99').first() print(f'從多 訪問 一={g.back_stu}') #通過返回結果 g調用backref的值 獲取相應的學生Student所有屬性值

10)sqlalchemy批量插入數據(性能問題)
#方式1:每插入一條數據進行一次commit() first_time = datetime.utcnow() for i in range(10000): user = User(username=username + str(i), password=password) db.session.add(user) db.session.commit() second_time = datetime.utcnow() print((second_time - first_time).total_seconds()) # 耗時:38.14347s #方式2: 數據插入完后一起commit()提交 second_time = datetime.utcnow() db.session.bulk_save_objects( [User(username=username + str(i), password=password) for i in range(10000) ] ) db.session.commit() third_time = datetime.utcnow() print((third_time - second_time).total_seconds()) # 耗時;2.121589s #方式3:耗時db.session.bulk_insert_mappings() third_time = datetime.utcnow() db.session.bulk_insert_mappings( User, [dict(username="NAME INSERT " + str(i), password=password) for i in range(10000)] ) db.session.commit() fourth_time = datetime.utcnow() print((fourth_time - third_time).total_seconds()) # 耗時:1.13548s #方式4: fourth_time = datetime.utcnow() db.session.execute( User.__table__.insert(), [{"username": 'Execute NAME ' + str(i), "password": password} for i in range(10000)] ) db.session.commit() five_time = datetime.utcnow() print((five_time - fourth_time).total_seconds()) # 耗時:0.888822s
感謝https://www.cnblogs.com/wt7018/p/11617878.html
感謝https://blog.csdn.net/js010111/article/details/119844734的分享
感謝https://blog.csdn.net/weixin_40006963/article/details/113461425的分享
