使用SQLAlchemy操作MySQL以及執行原生的sql語句


場景應用

老大我讓爬取內部網站獲取數據,插入到新建的表中,並每天進行爬取更新數據(后面做了定時任務)。然后根據該表統計每日的新增數量/更新數量進行制圖制表,向上級匯報。

思路構建

選用sqlalchemy+mysqlconnector,連接數據庫,創建表,對指定表進行CRUD

from sqlalchemy import exists, Column, Integer, String, ForeignKey, DateTime, Text, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from conf.parseConfig import parseConf

# 從配置文件中獲取數據庫信息
host = parseConf.get_conf('MySQLInfo', 'host')
port = parseConf.get_conf('MySQLInfo', 'port')
dbname = parseConf.get_conf('MySQLInfo', 'dbname')
usernm = parseConf.get_conf('MySQLInfo', 'usernm')
passwd = parseConf.get_conf('MySQLInfo', 'passwd')
# 連接數據庫
engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname)
# 創建的數據庫引擎
engine = create_engine(engine_str, encoding='utf-8')

#創建session類型
DBSession = sessionmaker(bind=engine)
# 創建session對象,進行增刪改查:
session = DBSession()

# 實例化官宣模型 - Base 就是 ORM 模型
Base = declarative_base()


# 創建服務單表 繼承Base基類
class ServiceOrder(Base):
    __tablename__ = 'serviceOrderTable'
    serviceOrderId = Column(String(32), primary_key=True, comment='服務單ID')
    serviceDesc = Column(String(512), comment='服務說明')
    transferTimes = Column(String(32), comment='轉派次數')
    # 創建更新時間,對數據的更新進行記錄
    updateTime = Column(DateTime, server_default=func.now(), onupdate=func.now())


def init_db():
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


if __name__ == "__main__":
    # 每次執行時 會判斷表的存在性 對於數據庫中不存在的表進行創建 已存在的表則可以直接進行增刪改查
    init_db()

    ### 首先講一下使用sqlalchemy執行原生的sql語句###
    # 方式一:
    res = session.execute('select * from ServiceOrder')  # res是獲取的對象
    all_res_list = res.fetchall()  # all_res_list具體的結果 是列表
    print(all_res_list ) # 結果: [('數據提取',), ('非數據提取',)]
    # 方式二:
    conn = engine.connect()
    res = conn.execute('select * from ServiceOrder') 
    all_res_list = res.fetchall()

    ### 使用創建好的session對象進行增刪改查 ###
    # 插入單條數據
    # 創建新service0rder對象
    new_serviceorder = ServiceOrder(serviceOrderId='001', serviceDesc='ack', transferTimes='9')
    # 添加到session
    session.add(new_serviceorder)
    # 提交即保存到數據庫
    session.commit()

    # 插入多條數據
    serviceorder_list = [ServiceOrder(serviceOrderId='002', serviceDesc='好的', transferTimes='9'),ServiceOrder(serviceOrderId='003', serviceDesc='起床', transferTimes='9')]
    session.add_all(serviceorder_list)
    session.commit()
    # session.close()

    # 查詢
    # 查詢是否存在 結果是布爾值
    it_exists = session.query(
        exists().where(ServiceOrder.serviceOrderId == '002')
    ).scalar()
    # 創建Query查詢,filter是where條件
    # 調用one() first()返回唯一行,如果調用all()則已列表的形式返回所有行:
    server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == '003').first()
    print(server_order.serviceDesc)
    serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == '好的').all()

    # 改 更新數據
    # 數據更新,將值為Mack的serviceDesc修改為Danny
    update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').update({"serviceDesc": "Danny"})
    # 或則
    update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').first()
    update_objp.serviceDesc = 'Danny'
    session.commit()

    # 刪除
    update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').delete()
    # 或則
    update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').one()
    update_objkp.delete()
    session.commit()
    session.close()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM