Python Elasticsearch批量操作客戶端


基於Python實現的Elasticsearch批量操作客戶端

by:授客 QQ1033553122

 

1. 代碼用途 1

2. 測試環境 1

3. 使用方法 1

3.1 配置ES服務器信息 1

3.2 配置ES操作數據 2

3.2.1 批量插入數據 2

3.2.2批量更新文檔字段值|新增字段值 4

3.2.3 批量刪除 7

3.2.4 批量去除冗余(重復)的數據 8

3.2.5 批量復制數據 9

3.3 運行程序 10

 

1.代碼用途

Elasticsearch客戶端,目的在於實現批量操作,如下:

<1> 批量插入數據

<2> 批量更新文檔字段值

<3> 批量新增文檔字段值

<4> 批量刪除數據

<5> 批量復制數據

<6> 批量去除冗余數據

2.測試環境

Win7 64

 

Python 3.3.2

 

Win elasticsearch-5.4.1

 

chardet-2.3.0

下載地址1https://pypi.python.org/pypi/chardet/

下載地址2http://pan.baidu.com/s/1nu7XzjN

3.使用方法

3.1 配置ES服務器信息

編輯配置文件conf/hostconfig

[DESTHOSTCONFIG]

host = 127.0.0.1

port = 9200

protocol = http

 

[SRCHOSTCONFIG]

host = 127.0.0.1

port = 9200

protocol = http

 

[README]

host = Elasticsearch所在服務器IP地址

port = Elasticsearch訪問端口

protocol = 暫且固定為http

 

說明:

[DESTHOSTCONFIG]: 該節點下配置需要執行批量插入,批量更新文檔,批量刪除,批量復制時的ES主機信息

 

[SRCHOSTCONFIG]:該節點下配置需要復制ES數據的數據源主機信息,即從該節點下的ES主機復制到[DESTHOSTCONFIG]下的主機,兩者可以是同一台主機

 

host = Elasticsearch所在服務器IP地址

port = Elasticsearch訪問端口

protocol = 暫且固定為http

 

3.2 配置ES操作數據

 

3.2.1 批量插入數據

編輯配置文件conf/runconfig.txt

[RUNCONFIG]
runtimes = 1

說明:

runtimes = 執行批量插入時,每組數據會被重復執行的次數,總插入記錄數=runtimes x 數據組數

 

 

編輯配置文件conf/esdataconfig_insertdata.txt

 

[INSERTDATA]

index= business_chance

type = customer_num1

 

{

   "group_customer_code": "1",

   "second_class": "服裝||手機||水果",

   "customer_num": 100||200||300,

   "province": "廣東省||福建省||雲南省",

   "branch": "品牌1||品牌2"

}

end

 

{

   "group_customer_code": "2",

   "second_class": "服裝",

   "customer_num": 400,

   "province": "廣東省",

   "branch": "品牌3"

}

end

 

type = customer_num2

 

{

   "group_customer_code": "1",

   "second_class": "服裝",

   "customer_num": 600,

   "province": "廣東省",

   "branch": "品牌",

   "rank":1

}

end

 

 

index= business_index

type = customer_type

 

{

   "group_customer_code": "1",

   "second_class": "服裝",

   "customer_num": 600,

   "province": "廣東省",

   "branch": "品牌2",

   "rank":1

}

end

 

說明:

[INSERTDATA] ------------->固定值

index= 索引名稱,不為空  

type = 類型名稱,不可為空

 

{

   "group_customer_code": "1",

   "second_class": "服裝||手機||水果",

   "customer_num": 100||200||300,

   "province": "廣東省||福建省||雲南省",

   "branch": "品牌1||品牌2"

}

end

 

需要提交的一組數據,沒組數據遵守json格式,后面一定要跟“end” 表示數據范圍結束

 

"second_class": "服裝||手機||水果", 

1)如果有多個參數值,以 || 分隔,運行時程序隨機選取一個

2)參數值如果是字符串類型,加以英文雙引號",否則不加雙引號

 

從上往下,

1)如果已填寫index,需要切換文檔類型,可直接另起一行,如下

type = customer_num2

表示接下來的數據組插入到該文檔類型,直到遇到其它索引、文檔類型

 

2)如果需要提交到其它新的索引,可直接另起一行,填寫新的索引和類型,如下

index= business_index

type = customer_type

表示接下來的數據組插入到新索引名稱下的新索引類型中

 

 

 

3.2.2批量更新文檔字段值|新增字段值

編輯配置文件conf/esdataconfig_updatefield.txt

[UPDATEFIELD]

index=business_chance

type = customer_num1

 

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   },"size":150

}

end

 

{

"branch": "品牌99||品牌66",

"customer_num": 900||888

}

end

 

 

type = customer_num2

 

查詢=

{

   "query": {

      "match_all": {}

   },

   "size": 100

}

end

 

{

"branch": "品牌999",

"customer_num": 990

}

end

 

index= business_index

type = customer_type

 

查詢=

{

   "query": {

      "match_all": {}

   },

   "size": 100

}

end

 

{

"branch": "品牌666",

"customer_num": 666

}

end

 

說明:

[UPDATEFIELD]   ------------>固定值

index= 需要更新記錄所在索引名稱,不可為空

type = 需要更新記錄所在文檔類型,不可為空

 

 

查詢={……} 僅更新滿足查詢條件的結果,不可為空

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   },

   "size":150

}

end

 

這里的邏輯是這樣的:先“查詢”,再對查詢出來的每條記錄進行更新

注意:

不使用size參數的話,ES默認僅僅會返回10條記錄,程序僅會對返回的記錄數進行更新,所以,如果需要更新的記錄數大於10條,需要通過"size"參數,顯示控制ES返回的記錄數,比如“需要更新的記錄數有150條,則size的值要設置大於等於150”(下同,不在贅述)

 

參數數據組

{

"branch": "品牌99||品牌66", 

"customer_num": 900||888

}

end

 

同批量插入

1)如果有多個參數值,以 || 分隔,運行時程序隨機選取一個

2)參數值如果是字符串類型,加以英文雙引號",否則不加雙引號

 

從上往下,

1)如果已填寫index,需要切換文檔類型,可直接另起一行,如下

type = customer_num2

表示接下來的數據組更新,只更新歸屬該文檔類型的記錄,直到遇到其它索引、文檔類型

 

3)如果需要更新歸屬其它新索引的記錄,可直接另起一行,填寫新的索引和類型,如下

index= business_index

type = customer_type

表示接下來的數據組只更新新索引名稱下的新索引類型中的記錄,直到遇到其它索引、文檔類型

 

同批量插入,查詢,參數數據組,都必須跟 end,表示數據范圍結束

 

另外,需要注意的是:“查詢”,必須位於參數數組上方,索引類型下方

 

批量新增文檔字段:如果填寫的字段不存在,則會新增字段及對應值

 

3.2.3 批量刪除

編輯配置文件conf/esdataconfig_deletedata.txt

 

[DELETEDATA]
index= business_chance
type = customer_num1

查詢=
{
   "query": {
      "match_phrase": {
         "province": "廣東省"
      }
   }

}
end

index= business_index
type = customer_type

{
   "query": {
      "match_phrase": {
         "province": "廣東省"
      }
   }
}

end

 

說明:

[DELETEDATA] --------固定值
index= 要刪除記錄所在索引
type = 要刪除記錄所在類型


查詢={……} 僅更新滿足查詢條件的結果,不可為空

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   }

}

end

 

這里的邏輯是這樣的:如先“查詢”,再對查詢出來的每條記錄(ES實際返回的記錄)進行刪除

 

其它說明同上

3.2.4 批量去除冗余(重復)的數據

編輯配置文件conf/esdataconfig_deduplicatedata.txt

[DEDUPLICATEDATA]

index= business_index

type = customer_num2

 

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   },

   "size":100

}

end

 

type = customer_type

查詢=

{

   "query": {

      "match_all": {}

   },

   "size": 100

}

end

 

index= business_chance

type = customer_num1

查詢=

{

   "query": {

      "match_all": {}

   },

   "size": 100

}

end

 

注意:

這里的查詢不能為空,一定要填寫

 

這里的實現邏輯是這樣的:先查詢,然后刪除查詢出來的全部記錄,最后再把不重復的記錄寫回到ES中。

 

其它說明同上

 

3.2.5 批量復制數據

編輯配置文件conf/esdataconfig_copydata.txt

 

[COPYDATA]

index= business_chance

type = customer_num1

 

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   }

}

end

 

type = customer_num2

查詢=

{

   "query": {

      "match_phrase": {

         "province": "廣東省"

      }

   }

}

end

 

格式基本同上述的批量更新文檔的配置,多少有點不一樣,需要注意如下:

1) 這里的index,type分別為數據源所在的索引和類型,即需要從該索引和類型中復制數據到目標索引和類型,不能為空

index= business_chance

type = customer_num1

 

 

2)條件= 配置需要“復制數據到”的目標索引,和目標類型,如下,以逗號分隔,一個條件僅僅支持一個目標indextype

 

條件 = index = business_index  , type = customer_num2

end

 

條件和查詢都不能為空。

 

這里的實現邏輯是這樣的:對數據源所在的index, type通過“查詢”得到要復制的數據,然后根據“條件”設置的目標索引和類型名,復制到對應目標主機上的目標索引,目標類型中。

說明:重復復制,會生成重復數據

 

如果覺得麻煩,以上幾個數據配置的內容,可以寫在一個文件里,但是必須按格式填寫

 

3.3 運行程序

cmd進入ESBatchOperator根目錄(main.py所在目錄)

python main.py

按提示,輸入數字編號 12345,回車運行

 

 

源碼下載地址:基於Python實現的Elasticsearch批量操作客戶端

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM