python 多進程,多線程,使用 sqlalchemy 對數據庫進行操作
創建引擎 & 獲取數據庫會話:
使用類的方式,然后在對象方法中去創建數據庫引擎(使用單例,確保只創建一個對象,方法里對引擎做判斷,確保只創建一個數據庫引擎)
# mysql全局基類方法
class MysqlGlobal(object):
__instance = None
__engine = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def gen_engine(self):
if not MysqlGlobal.__engine:
engine = create_engine("mysql+{driver}://{username}:{password}@{server}/{database}?charset={charset}"
.format(driver=MYSQL_DRIVER,
username=MYSQL_USERNAME,
password=MYSQL_PASSWORD,
server=MYSQL_SERVER,
database=DB_NAME,
charset=DB_CHARSET),
pool_size=100,
max_overflow=100,
# pool_recycle=7200,
pool_recycle=2,
echo=False)
engine.execute("SET NAMES {charset};".format(charset=DB_CHARSET))
MysqlGlobal.__engine = engine
return MysqlGlobal.__engine
@property
def mysql_session(self):
self.gen_engine()
mysql_db = sessionmaker(bind=MysqlGlobal.__engine)
return mysql_db()
數據表模型類
# 數據庫orm映射綁定引擎
MapBase = declarative_base(bind=MysqlGlobal().gen_engine())
class WfCarInfo(MapBase):
__tablename__ = "wf_carinfo"
untreated_status = 0
treated_status = 1
id = Column(Integer, primary_key=True, nullable=True, autoincrement=True)
status = Column(Integer, nullable=True, default=0) # 接入狀態,0:待處理,1:已處理
# create_time = Column(DateTime, nullable=True, default=func.now()) # 記錄的創建時間
# update_time = Column(DateTime, nullable=True, default=func.now(), onupdate=func.now()) # 記錄的更新時間
。。。省略
數據庫會話閉包裝飾器
包裝了數據庫會話 session
def mysql_session(method):
@functools.wraps(method)
def wrapper(*args, **kwargs):
session = MysqlGlobal().mysql_session
return method(*args, session, **kwargs)
return wrapper
裝飾器裝飾模型類的類方法
這樣在外面調用類方法進行數據庫操作的時候,就不需要傳數據庫會話過來(session 參數)
@classmethod
@mysql_session
def query_all(cls, session):
carinfo_obj_list = session.query(cls).all()
info_list = [carinfo_obj.to_dict() for carinfo_obj in carinfo_obj_list]
return info_list
@classmethod
@mysql_session
def query_all_by_status(cls, status, session):
carinfo_obj_list = session.query(cls).filter(cls.status == status).all()
info_list = [carinfo_obj.to_dict() for carinfo_obj in carinfo_obj_list]
return info_list
@classmethod
@mysql_session
def query_all_by_index(cls, current_index, next_index, session):
carinfo_obj_list = session.query(cls).filter(and_(cls.id > current_index, cls.id <= next_index)).all()
info_list = [carinfo_obj.to_dict() for carinfo_obj in carinfo_obj_list]
return info_list
@classmethod
@mysql_session
def insert_one(cls, row, session):
if not row:
return
new_record = cls(**row)
session.add(new_record)
session.commit()
return new_record
@classmethod
@mysql_session
def update_status(cls, record_id, status, session):
session.query(cls).filter(cls.recordId == record_id).update({"status": status})
session.commit()
return 1
@classmethod
@mysql_session
def delete_treated_data(cls, session):
# session.query(cls).filter(cls.status == cls.treated_status).delete()
treated_obj_list = session.query(cls).filter(cls.status == cls.treated_status).all()
count = len(treated_obj_list)
[session.delete(treated_obj) for treated_obj in treated_obj_list]
session.commit()
return count
附加(數據庫初始化)
# 數據庫初始化
def init_db_data():
MapBase.metadata.create_all()
logging.info("init mysql_db success")
print("init mysql_db success")
end ~
