Elasticsearch【JAVA REST Client】客戶端操作


ES系統作為集群,環境搭建非常方便簡單。 現在在我們的應用中,如何對這個集群進行操作呢?

 

我們利用ES系統,通常都是下面的架構:

 

在這里,客戶端的請求通過LB進行負載均衡,因為操作任何一個ES的實例,最終在ES集群系統中內容都是一樣的,所以,考慮各個ES節點的負載問題以及容災問題,上述的架構是當下最流行的方式。

 

下面,將要介紹ES JAVA REST API方式操作ES集群。

若是通過maven項目操作,相對比較簡單的,只需要在pom.xml下面加入下面的配置

1 <dependency>
2     <groupId>org.elasticsearch.client</groupId>
3     <artifactId>rest</artifactId>
4     <version>5.0.0-rc1</version>
5 </dependency>

 

但是,我們公司網絡讓人蛋疼,maven倉庫基本連接不上,所以,我們不得不采用Dynamic Web Project的方式,所以,需要的ES客戶端jar包需要自己下載了。大家可以在maven的網站上一個個的下載,主要是依賴下載,可以在rest的下載地址下面,找到這個包的依賴文件,其實,就是httpclient的一些jar包。

1 commons-codec
2 commons-logging
3 httpclient
4 httpcore
5 httpasyncclient
6 httpcore-nio

這些包,都不大,下載后放在自己的工程lib下面即可。

 

我在項目TKSearch里面創建了一個測試用的controller文件SearchProducerController,然后在service下面創建一個接口文件IESRestService.java,並做相關實現ESRestService.java. 代碼如下:

IESRestService.java

 1 /**
 2  * @author "shihuc"
 3  * @date   2016年10月26日
 4  */
 5 package com.tk.es.search.service;
 6 
 7 import org.apache.http.HttpEntity;
 8 
 9 /**
10  * @author chengsh05
11  *
12  */
13 public interface IESRestService {
14 
15     /**
16      * 同步方式向ES集群寫入數據
17      * 
18      * @param index
19      * @param type
20      * @param id
21      * @param entity
22      * @return
23      */
24     public boolean putSync(String index, String type, String id, HttpEntity entity);
25     
26     /**
27      * 異步的方式向ES寫入數據
28      * 
29      * @param index
30      * @param type
31      * @param id
32      * @param entity
33      * @return
34      */
35     public void putAsync(String index, String type, String id, HttpEntity entity);
36         
37 }

 

ESRestService.java

 1 /**
 2  * @author "shihuc"
 3  * @date   2016年10月26日
 4  */
 5 package com.tk.es.search.service.impl;
 6 
 7 import java.io.IOException;
 8 import java.util.Collections;
 9 
10 import javax.annotation.PostConstruct;
11 import javax.annotation.PreDestroy;
12 
13 import org.apache.http.HttpEntity;
14 import org.apache.http.HttpHost;
15 import org.apache.http.entity.ContentType;
16 import org.apache.http.entity.StringEntity;
17 import org.elasticsearch.client.Response;
18 import org.elasticsearch.client.ResponseListener;
19 import org.elasticsearch.client.RestClient;
20 import org.springframework.stereotype.Service;
21 
22 import com.tk.es.search.service.IESRestService;
23 
24 /**
25  * @author chengsh05
26  *
27  */
28 @Service
29 public class ESRestService implements IESRestService{
30 
31     private RestClient restClient = null;
32     
33     /* (non-Javadoc)
34      * @see com.tk.es.search.service.IESRestService#putSync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity)
35      */
36     @Override
37     public boolean putSync(String index, String type, String id, HttpEntity entity) {        
38         Response indexResponse = null;
39         try {
40             indexResponse = restClient.performRequest(
41                     "PUT",
42                     "/" + index + "/" + type + "/" + id,
43                     Collections.<String, String>emptyMap(),
44                     entity);
45         } catch (IOException e) {
46             e.printStackTrace();
47         }
48         return (indexResponse != null);
49     }
50     
51     @PostConstruct
52     public void init(){
53         restClient = RestClient.builder(new HttpHost("10.90.7.2", 9201, "http")).build();     #這里builder的參數,可以是很多個HttpHost的數組,若采用博文開篇的架構圖的話,這里的HttpHost就是LB的的地址 54     }
55     
56     @PreDestroy
57     public void destroy(){
58         if(restClient != null){
59             try {
60                 restClient.close();
61             } catch (IOException e) {                
62                 e.printStackTrace();
63             }
64         }
65     }
66 
67     /* (non-Javadoc)
68      * @see com.tk.es.search.service.IESRestService#putAsync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity)
69      */
70     @Override
71     public void putAsync(String index, String type, String id, HttpEntity entity) {
72 
73         restClient.performRequestAsync(
74                 "PUT",                        #HTTP的方法,可以是PUT,POST,DELETE,HEAD,GET等 75                 "/" + index + "/" + type + "/" + id,       #endpoint, 這個就是指數據在ES中的位置,由index,type以及id確定 76                 Collections.<String, String>emptyMap(),      #是一個map 77                 entity,                        #指的是操作數,即目標數據,這個例子里面表示要存入ES的數據對象 78                 new ResponseListener() {              #異步操作的監聽器,在這里,注冊listener,對操作成功或者失敗進行后續的處理,比如在這里向前端反饋執行后的結果狀態 79                     @Override
80                     public void onSuccess(Response response) {
81                         System.out.println(response);                            
82                     }
83 
84                     @Override
85                     public void onFailure(Exception exception) {
86                         System.out.println("failure in async scenrio");
87                     }
88                });
89         
90     }
91 
92 }

 

SearchProducerController.java

 1 /**
 2  * @author "shihuc"
 3  * @date   2016年10月26日
 4  */
 5 package com.tk.es.search.controller;
 6 
 7 import java.util.HashMap;
 8 
 9 import javax.annotation.Resource;
10 import javax.servlet.http.HttpServletRequest;
11 import javax.servlet.http.HttpServletResponse;
12 
13 import org.apache.http.HttpEntity;
14 import org.apache.http.entity.ContentType;
15 import org.apache.http.entity.StringEntity;
16 import org.springframework.stereotype.Controller;
17 import org.springframework.web.bind.annotation.RequestMapping;
18 import org.springframework.web.bind.annotation.ResponseBody;
19 
20 import com.google.gson.Gson;
21 import com.tk.es.search.service.impl.ESRestService;
22 
23 /**
24  * @author chengsh05
25  *
26  */
27 @Controller
28 public class SearchProducerController {
29     
30     @Resource
31     private ESRestService esRestService;
32     
33     @RequestMapping(value="/articleSync")
34     @ResponseBody
35     public String prepareArticleSync(HttpServletRequest req, HttpServletResponse rsp){
36         HttpEntity entity = new StringEntity(
37                 "{\n" +
38                 "    \"user\" : \"kimchy\",\n" +
39                 "    \"post_date\" : \"2009-11-15T14:12:12\",\n" +
40                 "    \"message\" : \"trying out Elasticsearch\"\n" +
41                 "}", ContentType.APPLICATION_JSON);
42         
43         esRestService.putSync("rest_index", "client", "1", entity);
44         Gson gson = new Gson();
45         HashMap<String, String> res = new HashMap<String, String>();
46         res.put("result", "successful");
47         return gson.toJson(res);
48     }
49     
50     @RequestMapping(value="/articleAsync")
51     @ResponseBody
52     public String prepareArticleAsync(HttpServletRequest req, HttpServletResponse rsp){
53         HttpEntity entity = new StringEntity(
54                 "{\n" +
55                 "    \"user\" : \"shihuc\",\n" +
56                 "    \"post_date\" : \"2016-10-26T13:30:12\",\n" +
57                 "    \"message\" : \"Demo REST Client to operate Elasticsearch\"\n" +
58                 "}", ContentType.APPLICATION_JSON);
59         
60         esRestService.putAsync("rest_index", "client", "2", entity);
61         Gson gson = new Gson();
62         HashMap<String, String> res = new HashMap<String, String>();
63         res.put("result", "successful");
64         return gson.toJson(res);
65     }
66 }

 

啟動項目,在地址欄輸入http://10.90.9.20:8080/TKSearch/articleSync將會以同步的方式在ES集群中創建一條記錄。

輸入http://10.90.9.20:8080/TKSearch/articleAsync,將會以異步的方式創建一個記錄。

 

將RestClient以一個Service進行包裝,在Spring啟動的時候,注入bean過程中進行初始化,在bean銷毀前進行連接的關閉操作。利用Spring支持的兩個注解@PostConstruct和@PreDestroy完成連接的建立和銷毀,結構干凈簡單。

到此,客戶端以Java REST Client的方式操作ES集群的demo就演示結束。 后續將介紹Elasticsearch Java API的方式操作ES集群的實施過程。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM