# coding:utf-8 """ mongo操作工具 """ from pymongo import MongoClient MONGO_HOST, MONGO_PORT, MONGO_DB, MONGO_TABLE = '127.0.0.1', '27017', 'test_db', 'teat_tb' class MongoUtils: """ 鏈接mongoDB,進行各種操作 """ def __init__(self, host=MONGO_HOST, port=MONGO_PORT, db_name=MONGO_DB): """ 初始化對象,鏈接數據庫 :param host: mongo數據庫所在服務器地址 :param port: mongo數據庫端口 :param db_name: 數據庫的名稱 :return: 無返回值 """ try: self.client = None self.client = MongoClient(host, port) self.database = self.client.get_database(db_name) self.collection = None except Exception as e: self.close_conn() print('init mongo bar failed: %s' % e) def change_collection(self, table_name=MONGO_TABLE): """切換表""" self.collection = self.database.get_collection(table_name) def count_info(self, table_name=MONGO_TABLE, filter_dict=None): """ 查找表記錄條數,默認返回0 :param table_name: str 表名 :param filter_dict: dict 過濾條件 :return: int 表記錄條數 """ tab_size = 0 try: self.collection = self.database.get_collection(table_name) tab_size = self.collection.find(filter_dict).count() return tab_size except Exception as e: print('get table size failed: %s' % e) finally: return tab_size def update_info(self, filter_dict, update_dict, insert=False, multi=False): """ 更新表記錄,默認返回false :param filter_dict: dict 過濾條件,如{'campaignId':{'$in':[1,2,3]}} :param update_dict: dict 更新的字段,如{'$set':{status_key:0,'campaign.status':1},{'$unset':'campaign.name':'test_camp'}} :param insert: bool 如果需要更新的記錄不存在是否插入 :param multi: bool 是否更新所有符合條件的記錄, False則只更新一條,True則更新所有 :return: bool 是否更新成功 """ result = False try: self.collection.update(filter_dict, update_dict, insert, multi) result = True print("[INFO] update success!") except Exception as e: print('update failed: %s' % e) finally: return result def insert_info(self, insert_date): """ 更新表記錄,默認返回false :param insert_date: dict 插入的數據,如{'campaignId':{'$in':[1,2,3]}} :return: bool 是否更新成功 """ result = False try: self.collection.insert(insert_date) result = True print("insert success!") except Exception as e: print('insert failed: %s' % e) finally: return result def delete_info(self, filter_date): """ 更新表記錄,默認返回false :param filter_date: dict 刪除數據的條件,如{'campaignId':{'$in':[1,2,3]}} :return: bool 是否更新成功 """ result = False try: self.collection.remove(filter_date) result = True print("remove success!") except Exception as e: print('remove failed: %s' % e) finally: return result def find_one_info(self, filter_dict, return_dict): """ 查找一條表記錄,默認返回空字典 :param filter_dict: dict 過濾條件如{'campaignId':123} :param return_dict: dict 返回的字段如{'campaign.status':1,'updated':1,'_id':0} :return: dict 查找到的數據 """ result = {} try: result = self.collection.find_one(filter_dict, return_dict) except Exception as e: print('find data failed: %s' % e) finally: return result def find_multi_info(self, filter_dict, return_dict, limit_size=0, skip_index=0): """ 查找多條表記錄,默認返回空數組 :param filter_dict: dict filter_dict: 過濾條件如{'campaignId':123} :param return_dict: dict 返回的字段如{'campaign.status':1,'updated':1,'_id':0} :param limit_size: int 限定返回的數據條數 :param skip_index: int 游標位移 :return: list 查詢到的記錄組成的列表,每個元素是一個字典 """ result = [] try: if not limit_size: if not skip_index: result = self.collection.find(filter_dict, return_dict) else: result = self.collection.find(filter_dict, return_dict).skip(skip_index) else: if not skip_index: result = self.collection.find(filter_dict, return_dict).limit(limit_size) else: result = self.collection.find(filter_dict, return_dict).skip(skip_index).limit(limit_size) except Exception as e: print('find data failed: %s' % e) finally: return result def close_conn(self): """ 關閉數據庫鏈接 :return: 無返回值 """ if self.client: self.client.close()
