ES系統作為集群,環境搭建非常方便簡單。 現在在我們的應用中,如何對這個集群進行操作呢?
我們利用ES系統,通常都是下面的架構:
在這里,客戶端的請求通過LB進行負載均衡,因為操作任何一個ES的實例,最終在ES集群系統中內容都是一樣的,所以,考慮各個ES節點的負載問題以及容災問題,上述的架構是當下最流行的方式。
下面,將要介紹ES JAVA REST API方式操作ES集群。
若是通過maven項目操作,相對比較簡單的,只需要在pom.xml下面加入下面的配置
1 <dependency> 2 <groupId>org.elasticsearch.client</groupId> 3 <artifactId>rest</artifactId> 4 <version>5.0.0-rc1</version> 5 </dependency>
但是,我們公司網絡讓人蛋疼,maven倉庫基本連接不上,所以,我們不得不采用Dynamic Web Project的方式,所以,需要的ES客戶端jar包需要自己下載了。大家可以在maven的網站上一個個的下載,主要是依賴下載,可以在rest的下載地址下面,找到這個包的依賴文件,其實,就是httpclient的一些jar包。
1 commons-codec 2 commons-logging 3 httpclient 4 httpcore 5 httpasyncclient 6 httpcore-nio
這些包,都不大,下載后放在自己的工程lib下面即可。
我在項目TKSearch里面創建了一個測試用的controller文件SearchProducerController,然后在service下面創建一個接口文件IESRestService.java,並做相關實現ESRestService.java. 代碼如下:
IESRestService.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.service; 6 7 import org.apache.http.HttpEntity; 8 9 /** 10 * @author chengsh05 11 * 12 */ 13 public interface IESRestService { 14 15 /** 16 * 同步方式向ES集群寫入數據 17 * 18 * @param index 19 * @param type 20 * @param id 21 * @param entity 22 * @return 23 */ 24 public boolean putSync(String index, String type, String id, HttpEntity entity); 25 26 /** 27 * 異步的方式向ES寫入數據 28 * 29 * @param index 30 * @param type 31 * @param id 32 * @param entity 33 * @return 34 */ 35 public void putAsync(String index, String type, String id, HttpEntity entity); 36 37 }
ESRestService.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.service.impl; 6 7 import java.io.IOException; 8 import java.util.Collections; 9 10 import javax.annotation.PostConstruct; 11 import javax.annotation.PreDestroy; 12 13 import org.apache.http.HttpEntity; 14 import org.apache.http.HttpHost; 15 import org.apache.http.entity.ContentType; 16 import org.apache.http.entity.StringEntity; 17 import org.elasticsearch.client.Response; 18 import org.elasticsearch.client.ResponseListener; 19 import org.elasticsearch.client.RestClient; 20 import org.springframework.stereotype.Service; 21 22 import com.tk.es.search.service.IESRestService; 23 24 /** 25 * @author chengsh05 26 * 27 */ 28 @Service 29 public class ESRestService implements IESRestService{ 30 31 private RestClient restClient = null; 32 33 /* (non-Javadoc) 34 * @see com.tk.es.search.service.IESRestService#putSync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity) 35 */ 36 @Override 37 public boolean putSync(String index, String type, String id, HttpEntity entity) { 38 Response indexResponse = null; 39 try { 40 indexResponse = restClient.performRequest( 41 "PUT", 42 "/" + index + "/" + type + "/" + id, 43 Collections.<String, String>emptyMap(), 44 entity); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } 48 return (indexResponse != null); 49 } 50 51 @PostConstruct 52 public void init(){ 53 restClient = RestClient.builder(new HttpHost("10.90.7.2", 9201, "http")).build(); #這里builder的參數,可以是很多個HttpHost的數組,若采用博文開篇的架構圖的話,這里的HttpHost就是LB的的地址 54 } 55 56 @PreDestroy 57 public void destroy(){ 58 if(restClient != null){ 59 try { 60 restClient.close(); 61 } catch (IOException e) { 62 e.printStackTrace(); 63 } 64 } 65 } 66 67 /* (non-Javadoc) 68 * @see com.tk.es.search.service.IESRestService#putAsync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity) 69 */ 70 @Override 71 public void putAsync(String index, String type, String id, HttpEntity entity) { 72 73 restClient.performRequestAsync( 74 "PUT", #HTTP的方法,可以是PUT,POST,DELETE,HEAD,GET等 75 "/" + index + "/" + type + "/" + id, #endpoint, 這個就是指數據在ES中的位置,由index,type以及id確定 76 Collections.<String, String>emptyMap(), #是一個map 77 entity, #指的是操作數,即目標數據,這個例子里面表示要存入ES的數據對象 78 new ResponseListener() { #異步操作的監聽器,在這里,注冊listener,對操作成功或者失敗進行后續的處理,比如在這里向前端反饋執行后的結果狀態 79 @Override 80 public void onSuccess(Response response) { 81 System.out.println(response); 82 } 83 84 @Override 85 public void onFailure(Exception exception) { 86 System.out.println("failure in async scenrio"); 87 } 88 }); 89 90 } 91 92 }
SearchProducerController.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.controller; 6 7 import java.util.HashMap; 8 9 import javax.annotation.Resource; 10 import javax.servlet.http.HttpServletRequest; 11 import javax.servlet.http.HttpServletResponse; 12 13 import org.apache.http.HttpEntity; 14 import org.apache.http.entity.ContentType; 15 import org.apache.http.entity.StringEntity; 16 import org.springframework.stereotype.Controller; 17 import org.springframework.web.bind.annotation.RequestMapping; 18 import org.springframework.web.bind.annotation.ResponseBody; 19 20 import com.google.gson.Gson; 21 import com.tk.es.search.service.impl.ESRestService; 22 23 /** 24 * @author chengsh05 25 * 26 */ 27 @Controller 28 public class SearchProducerController { 29 30 @Resource 31 private ESRestService esRestService; 32 33 @RequestMapping(value="/articleSync") 34 @ResponseBody 35 public String prepareArticleSync(HttpServletRequest req, HttpServletResponse rsp){ 36 HttpEntity entity = new StringEntity( 37 "{\n" + 38 " \"user\" : \"kimchy\",\n" + 39 " \"post_date\" : \"2009-11-15T14:12:12\",\n" + 40 " \"message\" : \"trying out Elasticsearch\"\n" + 41 "}", ContentType.APPLICATION_JSON); 42 43 esRestService.putSync("rest_index", "client", "1", entity); 44 Gson gson = new Gson(); 45 HashMap<String, String> res = new HashMap<String, String>(); 46 res.put("result", "successful"); 47 return gson.toJson(res); 48 } 49 50 @RequestMapping(value="/articleAsync") 51 @ResponseBody 52 public String prepareArticleAsync(HttpServletRequest req, HttpServletResponse rsp){ 53 HttpEntity entity = new StringEntity( 54 "{\n" + 55 " \"user\" : \"shihuc\",\n" + 56 " \"post_date\" : \"2016-10-26T13:30:12\",\n" + 57 " \"message\" : \"Demo REST Client to operate Elasticsearch\"\n" + 58 "}", ContentType.APPLICATION_JSON); 59 60 esRestService.putAsync("rest_index", "client", "2", entity); 61 Gson gson = new Gson(); 62 HashMap<String, String> res = new HashMap<String, String>(); 63 res.put("result", "successful"); 64 return gson.toJson(res); 65 } 66 }
啟動項目,在地址欄輸入http://10.90.9.20:8080/TKSearch/articleSync將會以同步的方式在ES集群中創建一條記錄。
輸入http://10.90.9.20:8080/TKSearch/articleAsync,將會以異步的方式創建一個記錄。
將RestClient以一個Service進行包裝,在Spring啟動的時候,注入bean過程中進行初始化,在bean銷毀前進行連接的關閉操作。利用Spring支持的兩個注解@PostConstruct和@PreDestroy完成連接的建立和銷毀,結構干凈簡單。
到此,客戶端以Java REST Client的方式操作ES集群的demo就演示結束。 后續將介紹Elasticsearch Java API的方式操作ES集群的實施過程。