基於Python實現的Elasticsearch批量操作客戶端
by:授客 QQ:1033553122
1.代碼用途
Elasticsearch客戶端,目的在於實現批量操作,如下:
<1> 批量插入數據
<2> 批量更新文檔字段值
<3> 批量新增文檔字段值
<4> 批量刪除數據
<5> 批量復制數據
<6> 批量去除冗余數據
2.測試環境
Win7 64位
Python 3.3.2
Win elasticsearch-5.4.1
chardet-2.3.0
下載地址1:https://pypi.python.org/pypi/chardet/
下載地址2:http://pan.baidu.com/s/1nu7XzjN
3.使用方法
編輯配置文件conf/hostconfig
[DESTHOSTCONFIG]
host = 127.0.0.1
port = 9200
protocol = http
[SRCHOSTCONFIG]
host = 127.0.0.1
port = 9200
protocol = http
[README]
host = Elasticsearch所在服務器IP地址
port = Elasticsearch訪問端口
protocol = 暫且固定為http
說明:
[DESTHOSTCONFIG]: 該節點下配置需要執行批量插入,批量更新文檔,批量刪除,批量復制時的ES主機信息
[SRCHOSTCONFIG]:該節點下配置需要復制ES數據的數據源主機信息,即從該節點下的ES主機復制到[DESTHOSTCONFIG]下的主機,兩者可以是同一台主機
host = Elasticsearch所在服務器IP地址
port = Elasticsearch訪問端口
protocol = 暫且固定為http
編輯配置文件conf/runconfig.txt
[RUNCONFIG]
runtimes = 1
說明:
runtimes = 執行批量插入時,每組數據會被重復執行的次數,總插入記錄數=runtimes x 數據組數
編輯配置文件conf/esdataconfig_insertdata.txt
[INSERTDATA]
index= business_chance
type = customer_num1
{
"group_customer_code": "1",
"second_class": "服裝||手機||水果",
"customer_num": 100||200||300,
"province": "廣東省||福建省||雲南省",
"branch": "品牌1||品牌2"
}
end
{
"group_customer_code": "2",
"second_class": "服裝",
"customer_num": 400,
"province": "廣東省",
"branch": "品牌3"
}
end
type = customer_num2
{
"group_customer_code": "1",
"second_class": "服裝",
"customer_num": 600,
"province": "廣東省",
"branch": "品牌",
"rank":1
}
end
index= business_index
type = customer_type
{
"group_customer_code": "1",
"second_class": "服裝",
"customer_num": 600,
"province": "廣東省",
"branch": "品牌2",
"rank":1
}
end
說明:
[INSERTDATA] ------------->固定值
index= 索引名稱,不能為空
type = 類型名稱,不可為空
{
"group_customer_code": "1",
"second_class": "服裝||手機||水果",
"customer_num": 100||200||300,
"province": "廣東省||福建省||雲南省",
"branch": "品牌1||品牌2"
}
end
需要提交的一組數據,沒組數據遵守json格式,后面一定要跟“end” 表示數據范圍結束
"second_class": "服裝||手機||水果",
1)如果有多個參數值,以 || 分隔,運行時程序隨機選取一個
2)參數值如果是字符串類型,加以英文雙引號",否則不加雙引號
從上往下,
1)如果已填寫index,需要切換文檔類型,可直接另起一行,如下
type = customer_num2
表示接下來的數據組插入到該文檔類型,直到遇到其它索引、文檔類型
2)如果需要提交到其它新的索引,可直接另起一行,填寫新的索引和類型,如下
index= business_index
type = customer_type
表示接下來的數據組插入到新索引名稱下的新索引類型中
編輯配置文件conf/esdataconfig_updatefield.txt
[UPDATEFIELD]
index=business_chance
type = customer_num1
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
},"size":150
}
end
{
"branch": "品牌99||品牌66",
"customer_num": 900||888
}
end
type = customer_num2
查詢=
{
"query": {
"match_all": {}
},
"size": 100
}
end
{
"branch": "品牌999",
"customer_num": 990
}
end
index= business_index
type = customer_type
查詢=
{
"query": {
"match_all": {}
},
"size": 100
}
end
{
"branch": "品牌666",
"customer_num": 666
}
end
說明:
[UPDATEFIELD] ------------>固定值
index= 需要更新記錄所在索引名稱,不可為空
type = 需要更新記錄所在文檔類型,不可為空
查詢={……} 僅更新滿足查詢條件的結果,不可為空
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
},
"size":150
}
end
這里的邏輯是這樣的:先“查詢”,再對查詢出來的每條記錄進行更新
注意:
不使用size參數的話,ES默認僅僅會返回10條記錄,程序僅會對返回的記錄數進行更新,所以,如果需要更新的記錄數大於10條,需要通過"size"參數,顯示控制ES返回的記錄數,比如“需要更新的記錄數有150條,則size的值要設置大於等於150”(下同,不在贅述)
參數數據組
{
"branch": "品牌99||品牌66",
"customer_num": 900||888
}
end
同批量插入
1)如果有多個參數值,以 || 分隔,運行時程序隨機選取一個
2)參數值如果是字符串類型,加以英文雙引號",否則不加雙引號
從上往下,
1)如果已填寫index,需要切換文檔類型,可直接另起一行,如下
type = customer_num2
表示接下來的數據組更新,只更新歸屬該文檔類型的記錄,直到遇到其它索引、文檔類型
3)如果需要更新歸屬其它新索引的記錄,可直接另起一行,填寫新的索引和類型,如下
index= business_index
type = customer_type
表示接下來的數據組只更新新索引名稱下的新索引類型中的記錄,直到遇到其它索引、文檔類型
同批量插入,查詢,參數數據組,都必須跟 end,表示數據范圍結束
另外,需要注意的是:“查詢”,必須位於參數數組上方,索引類型下方
批量新增文檔字段:如果填寫的字段不存在,則會新增字段及對應值
編輯配置文件conf/esdataconfig_deletedata.txt
[DELETEDATA]
index= business_chance
type = customer_num1
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
}
}
end
index= business_index
type = customer_type
{
"query": {
"match_phrase": {
"province": "廣東省"
}
}
}
end
說明:
[DELETEDATA] --------固定值
index= 要刪除記錄所在索引
type = 要刪除記錄所在類型
查詢={……} 僅更新滿足查詢條件的結果,不可為空
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
}
}
end
這里的邏輯是這樣的:如先“查詢”,再對查詢出來的每條記錄(ES實際返回的記錄)進行刪除
其它說明同上
編輯配置文件conf/esdataconfig_deduplicatedata.txt
[DEDUPLICATEDATA]
index= business_index
type = customer_num2
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
},
"size":100
}
end
type = customer_type
查詢=
{
"query": {
"match_all": {}
},
"size": 100
}
end
index= business_chance
type = customer_num1
查詢=
{
"query": {
"match_all": {}
},
"size": 100
}
end
注意:
這里的查詢不能為空,一定要填寫
這里的實現邏輯是這樣的:先查詢,然后刪除查詢出來的全部記錄,最后再把不重復的記錄寫回到ES中。
其它說明同上
編輯配置文件conf/esdataconfig_copydata.txt
[COPYDATA]
index= business_chance
type = customer_num1
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
}
}
end
type = customer_num2
查詢=
{
"query": {
"match_phrase": {
"province": "廣東省"
}
}
}
end
格式基本同上述的批量更新文檔的配置,多少有點不一樣,需要注意如下:
1) 這里的index,type分別為數據源所在的索引和類型,即需要從該索引和類型中復制數據到目標索引和類型,不能為空
index= business_chance
type = customer_num1
2)條件= 配置需要“復制數據到”的目標索引,和目標類型,如下,以逗號分隔,一個條件僅僅支持一個目標index和type
條件 = index = business_index , type = customer_num2
end
條件和查詢都不能為空。
這里的實現邏輯是這樣的:對數據源所在的index, type通過“查詢”得到要復制的數據,然后根據“條件”設置的目標索引和類型名,復制到對應目標主機上的目標索引,目標類型中。
說明:重復復制,會生成重復數據
如果覺得麻煩,以上幾個數據配置的內容,可以寫在一個文件里,但是必須按格式填寫
cmd進入ESBatchOperator根目錄(main.py所在目錄)
python main.py
按提示,輸入數字編號 1、2、3、4、5,回車運行
源碼下載地址:基於Python實現的Elasticsearch批量操作客戶端