存在则更新,不存在则插入
import pandas as pd
import pymongo
from pymongo import IndexModel, ASCENDING
from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
from pymongo.errors import BulkWriteError
import sys, os,time
base_path = os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))
sys.path.append(base_path)
from common.config import tentcent_mongodb_config as mongodb_config
port = mongodb_config.get('port')
ip = mongodb_config.get('ip')
db_name = mongodb_config.get('db_name')
class MongoDBPipeline(object):
def __init__(self,table_name):
clinet = pymongo.MongoClient(ip, port)
db = clinet[db_name]
self.collection = db[table_name]
def process_item(self, docs=None,index_name=None, update=True):
idx = IndexModel([(index_name, ASCENDING)], unique=True)
self.collection.create_indexes([idx])
if not docs:
return
# $set 的时候, 会更新数据, setOnInsert只插入不更新
update_key = "$set" if update else "$setOnInsert"
bulk = pymongo.bulk.BulkOperationBuilder(self.collection, ordered=False)
for i in docs:
if i.get(index_name) != None:
bulk.find({index_name: i[index_name]}).upsert().update_one({update_key: i})
else:
bulk.insert(i)
result = bulk.execute()
return result
def update_mongo(self, docs=None,index_name=None, update=True):
update_key = "$set" if update else "$setOnInsert"
requests = []
result = None
for item in docs:
if item.get(index_name):
operation = UpdateOne(
{index_name: item[index_name]},
{update_key: item},
upsert=True
)
requests.append(operation)
# print(requests)
# result = self.collection.bulk_write(requests, ordered=False)
# print(result.bulk_api_result)
try:
result = self.collection.bulk_write(requests, ordered=False)
print(result.bulk_api_result)
print('Successful')
except BulkWriteError as err:
LOG.warning("Updating compounds failed")
raise err
print('BulkWriteError')
return result
def process_item_write(self, docs=None,index_name=None, update=True):
idx = IndexModel([(index_name, ASCENDING)], unique=True)
self.collection.create_indexes([idx])
if not docs:
return
# $set 的时候, 会更新数据, setOnInsert只插入不更新
update_key = "$set" if update else "$setOnInsert"
for i in docs:
if i.get(index_name) != None:
requests=[
{
UpdateOne :{
"filter" : { index_name : i[index_name]},
"update" : i,
'upsert': True
}
}
]
result=self.collection.bulk_write([
UpdateOne (
{ index_name : i[index_name]},
{"$set" : i},
# {'upsert': True}
)
])
else:
result=self.collection.bulk_write(i)
#except:
# print('mongodb-bulk_write写入错误')
#result = bulk.execute()
print(result.bulk_api_result)
return result
def insert_process(self,data,index_name):
# 写入数据库
try:
self.collection.index_information()
except pymongo.errors.OperationFailure:
# 索引Sample_ID
self.collection.create_index(index_name, unique=True)
#result_info = self.process_item(data,index_name)
result_info = self.update_mongo(data,index_name)
print("-" * 30)
return result_info
def read_mongodb(self, no_id=True):
# client = MongoClient('67.216.204.220', 27017)
# db = client.pt
# table = db.pt_gztown_net_torrents_new_1
# data = pd.DataFrame(pd.DataFrame(list(self.collection.find())))
data = list(self.collection.find())
if no_id:
# del data['_id']
pass
return data
if __name__ == "__main__":
port = mongodb_config.get('port')
ip = mongodb_config.get('ip')
db_name = mongodb_config.get('db_name')
table_name = 'tmp_mongodb_v3'
mydict = [{"code": "SH600520"}]
#mydict = [{"code": "SH000010"}, {"code": 'SZ399006'}, {"code": 'SZ399300'}]
db = MongoDBPipeline(table_name)
result = db.insert_process(mydict,'code')
print('nMatched',result.bulk_api_result['nMatched'])
#data = db.read_mongodb()
#print(data)
参考
批量写入操作-PyMongo 3.11.3文档]
Python Examples of pymongo.UpdateOne]