Elasticsearch 索引的全量/增量更新
當你的es 索引數據從mysql 全量導入之后,如何根據其他客戶端改變索引數據源帶來的變動來更新 es 索引數據呢。
首先用 Python 全量生成 Elasticsearch 和 ik 初始的分詞索引數據,增量更新索引實現如下:
服務端(Python+redis-sub)
# Python-redis 開啟監聽 'leon' 等待客戶端推送消息,來增量更新es文檔
#-*- coding:utf8 -*-
import sys
import redis
import json
import elasticsearch
import os
class Task(object):
def __init__(self):
es_servers = [{
"host": "server-host",
"port": "es-port"
}]
self.es_client = elasticsearch.Elasticsearch(hosts=es_servers)
pool = redis.ConnectionPool(host='redis-host', port=6379,db=0,password='user:passwd')
self.r = redis.Redis(connection_pool=pool)
self.ps = self.r.pubsub()
self.ps.subscribe('leon')
def listen_task(self):
for i in self.ps.listen():
# print i
if i['type'] == 'message':
data = json.loads(i['data'])
for key, value in data.iteritems():
# print key, 'corresponds to', data[key]
self.index = data['index']
self.doc_type = data['type']
cate = data['cate']
id = data['id']
if cate == 'update':
row_obj = data['params']
if cate == 'update':
self.update_by_id(id,row_obj)
elif cate == 'delete':
self.delete_by_id(id)
else:
self.create_by_id(id)
def update_by_id(self, id,row_obj):
"""row_obj 就是 包含了 _id 和 其他 要更新的字段的 kv [] 取id 和 剩下的根據給定的_id,更新ES文檔"""
res = self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=id)
print res
def create_by_id(self, id):
"""id ; 創建新的ES文檔"""
create_by_id = "python /workspace/django-bash/elastic/autobash/info_es.py " + str(id)
res = os.popen(create_by_id).read()
print res
def delete_by_id(self, _id):
"""
根據給定的id,刪除文檔
暫時先不用可以根據查詢 條件 isdeleted = 0 來判斷
"""
self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)
if __name__ == '__main__':
print 'listen task queue'
Task().listen_task()
創建 info single 的 Python 腳本
def crete_info_single(self,ID):
index = 'info'
type = 'full'
# 生成info的json
model = Model('ali', 'Info')
info_list = model.getAllInfoByID(ID)
all_len = len(info_list)
for i in range(0, all_len):
# 業務邏輯代碼·····
document = info_list[i]
request_timeout = 100
create_response = self.es.crete_index(index, type, document, request_timeout, ID)
客戶端(Php + redis-pub )
# php-redis sub 'leon' ,傳遞約定的格式,指定對 es 文檔的操作類型
# elk 軟刪除
$message = array(
"index" => "info",
"type"=>"full",
"id" => $info[0]['ID'],
"cate"=>'update',
"params"=> array(
"IsDeleted"=>1
)
);
$json_mess =json_encode($message);
$redis->publish('leon', $json_mess);
這樣,當客戶端更改了 mysql 時候,往redis leon 頻道 publish 一條對應的消息,服務端接收消息后,就會更新對應的 es 索引。
---------------------
作者:Npcccccc
來源:CSDN
原文:https://blog.csdn.net/qq_28018283/article/details/79277478
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
