更詳細的操作介紹:https://www.imooc.com/article/22343
定義:
SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。
說明:
SQLAchemy 本身無法操作數據庫,其本質上是依賴pymysql.MySQLdb,mssql等第三方插件。
Dialect用於和數據庫API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作。
安裝:
pip3 install SQLAlchemy
鏈接:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
#!/usr/bin/env python # -*- coding: utf-8 -*- # auth : pangguoping from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine # mysql+pymysql 鏈接方式://root:ycc962464 賬戶密碼@127.0.0.1:3306鏈接地址及端口/tests鏈接的數據庫名稱?charset=utf8mb4 字符集 engine = create_engine("mysql+pymysql://root:ycc962464@127.0.0.1:3306/tests?charset=utf8mb4", max_overflow=5) Base = declarative_base() # 創建單表 class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True ) #主鍵 autoincrement=True 自增 name = Column(String(32)) extra = Column(String(16)) num = Column(Integer) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), #索引 #Index('my_index', my_table.c.data, mysql_length=10) length 索引長度 #Index('a_b_idx', my_table.c.a, my_table.c.b, mysql_length={'a': 4,'b': 9}) # Index('my_index', my_table.c.data, mysql_prefix='FULLTEXT') 指定索引前綴 # Index('my_index', my_table.c.data, mysql_using='hash') 指定索引類型 ) class Home(Base): __tablename__ = 'home' id = Column(Integer,primary_key = True) tab = Column(String(20)) values = Column(String(60)) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) # unique 唯一 class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多對多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) #外鍵 group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) #定義初始化數據庫函數 def init_db(): Base.metadata.create_all(engine) #頂固刪除數據庫函數 def drop_db(): Base.metadata.drop_all(engine) # drop_db() # init_db() Session = sessionmaker(bind=engine) #創建session session = Session() # def search(name): # user = User.query.filter(User.name == name).first() # if user is None or user.name.strip == '': # print('用戶不存在') # else: # print(' 用戶 %s' % user.name) # 增 # obj = User(name="111guanyu", extra='hanjiang') #添加單條 # session.add(obj) #加入到隊列 # session.add_all([ #添加多條 # User(name="liubei", extra='leader'), # User(name="zhangfei", extra='xiaodi'), # ]) # session.commit() # 添加到sql #刪 # session.query(User).filter(User.id > 2).delete() # session.commit() #改 # # 更新user表中id大於2的name列為099 # session.query(User).filter(User.id > 2).update({"name" : "099"}) # # 更新user表中id大於2的name列,在原字符串后邊增加099 # session.query(User).filter(User.id > 2).update({User.name: User.name + "099"}, synchronize_session=False) # # 更新user表中id大於2的num列,使最終值在原來數值基礎上加1 # session.query(User).filter(User.id > 2).update({"num": User.num + 1}, synchronize_session="evaluate") # 數字相加,必須設置synchronize_session="evaluate" # session.commit() #查 # ret = session.query(User).all() # 查詢所有 # sql = session.query(User) # 查詢生成的sql # print(sql) # ret = session.query(User.name, User.extra).all() #查詢User表的name和extra列的所有數據 # ret = session.query(User).filter_by(name='alex').all() # 取全部name列為alex的數據 # ret = session.query(User).filter_by(name='alex').first() # 第一個匹配name列為alex的數據 # ret是一個對象列表。這個對象可以通過 “對象[索引].字段”來獲取對應的值 # ret = session.query(User).all() #查詢列表所有數據 # print(ret[0].name) #結果為元組,可通過下標的形式 # for i in ret: #循環元組, # print(i.name) #其他操作 # 條件 # ret = session.query(User).filter_by(name='alex').all() #查詢所有name = alex 內容 # ret = session.query(User).filter(User.id > 1, User.name == 'eric').all() # 且的關系 # ret = session.query(User).filter(User.id.between(1, 3), User.name == 'eric').all() # print(ret[0].name) # ret = session.query(User).filter(User.id.in_([1,3,4])).all() # print(ret) # ret = session.query(User).filter(~User.id.in_([1,3,4])).all() # ~表示非。就是not in的意思 # print(ret) # ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='eric'))).all() # 聯表查詢 # print(ret) # from sqlalchemy import and_, or_ # 且和or的關系 # ret = session.query(User).filter(and_(User.id > 3, User.name == 'eric')).all() # 條件以and方式排列 # # print(ret) # ret = session.query(User).filter(or_(User.id < 2, User.name == 'eric')).all() # 條件以or方式排列 # for i in ret: # print(i.id) # ret = session.query(User).filter( # or_( #這部分表示括號中的條件都以or的形式匹配 # User.id < 2, # 或者 or User.id < 2 # and_(User.name == 'eric', User.id > 3),# 表示括號中這部分進行and匹配 # User.extra != "" # )).all() # 通配符 ret = session.query(User).filter(User.name.like('e%')).all() # print(list(ret)) # ret = session.query(User).filter(~User.name.like('e%')).all() # 表示not like # # 限制 limit用法 ret = session.query(User)[0:3] # 等於limit ,具體功能需要自己測試 從幾行到幾行! for i in ret: lists =[i.id,i.name,i.extra,i.num] print(lists) # # 排序 # ret = session.query(User).order_by(User.name.desc()).all() # ret = session.query(User).order_by(User.name.desc(), User.id.asc()).all() # 按照name從大到小排列,如果name相同,按照id從小到大排列 # # 分組 # from sqlalchemy.sql import func # ret = session.query(User).group_by(User.extra).all() # ret = session.query( # func.max(User.id), # func.sum(User.id), # func.min(User.id)).group_by(User.name).all() # ret = session.query( # func.max(User.id), # func.sum(User.id), # func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all() # having對聚合的內容再次進行過濾 # 連表 # ret = session.query(User, Favor).filter(User.id == Favor.nid).all() # ret = session.query(Person).join(Favor).all() # # 默認是inner join # ret = session.query(Person).join(Favor, isouter=True).all() # isouter表示是left join # # 組合 # q1 = session.query(User.name).filter(User.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union(q2).all() #union默認會去重 # q1 = session.query(User.name).filter(User.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union_all(q2).all() # union_all不去重
sqlalchemy 常用數據類型:
第一種:Integer
我們在Person模型中新增一個age字段
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) age = Column(Integer)
第二種:String
新增一個name字段(括號中的20表示該字符串最大長度為20)
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) name = Column(String(20))
第三種:Float
什么情況下會用到Float類型?比如存儲體重、價格等.....
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) price = Column(Float)
嗯!!我明明寫的是123.456789,但是存儲到數據庫中卻變成了123.457,為什么會這樣呢?原因我之前說過:float單精度類型,單精度數據類型存儲到表中容易被丟失。既然我們知道了原因,哪如何解決呢??方法就是用接下來要講的定點類型(DECIMAL)。
第四種:DECIMAL
DECIMAL可以防止數據精度
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) price = Column(DECIMAL(7,3))
DECIMAL有兩個參數,第一個參數用於指定一共多少位數,第二個參數用於指定小數點后最多多少位數
例如:DECIMAL(4,2)表示一共存儲4位數字,小數點后最多有兩位
如果傳入不符合規則數值時會報如下錯誤:
第五種:Boolean
我們知道,1代表true,0代表false
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) delete = Column(Boolean)
第六種:Enum
什么情況下會用到枚舉類型呢?比如用戶填寫性別時,固定只能選男或者女,不可能不男不女,對吧!
Enum()括號中為枚舉列表,在這個里面可以羅列出可輸入的值!
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) sex = Column(Enum("男","女"))
第七種:Date
Date只能存儲指定的年月日,不能存儲時分秒
說到日期類型,相信大家都熟悉,比如某年某月某日生。嗯、下面咱們就談談這個Date類型。
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) create_time = Column(Date)
然后從datetime導入datetime這個包,將數據添加至數據庫
datetime()中的數值用於傳遞指定的年月日
from datetime import datetime p = Person(create_time = datetime(2018,8,8)) session.add(p) session.commit()
第八種:DateTime
DateTime存儲指定的年月日時分秒
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) create_time = Column(DateTime)
添加測試數據:
p = Person(create_time = datetime(2018,8,8,16,11,50))
session.add(p)
session.commit()
第九種:Time
Time只能存儲時分秒,不能存儲年月日
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) create_time = Column(Time)
插入測試數據,time()后面傳遞關鍵字參數,用於指定時分秒
from datetime import datetime,time p = Person(create_time=time(hour=12,minute=20,second=50)) session.add(p) session.commit()
第十種:Text
這個沒什么好講的啊,當字符串長度比較長時就可以使用Text類型
class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) content = Column(Text)
第十一種:LongText
由於Text的存儲長度有限,我們就可以使用LongText來存儲數據。
由於LongText類型在mysql數據庫才有,其它數據庫沒有該數據類型,在使用前,記得從mysql數據庫導入該數據類型
from sqlalchemy.dialects.mysql import LONGTEXT class Person(Base): __tablename__ = "person" id = Column(Integer , primary_key=True , autoincrement=True) content = Column(LONGTEXT)