ElastiSearch 2.1使用java api獲取TransportClient連接ES集群、插入文檔、進行搜索,以及在線/離線插件安裝的方法


2.0之后ES的java api用法有了很大變化。在此記錄一些。

java應用程序連接ES集群,筆者使用的是TransportClient,獲取TransportClient的代碼設計為單例模式(見getClient方法)。同時包含了設置自動提交文檔的代碼。注釋比較詳細,不再贅述。

下方另有提交文檔、提交搜索請求的代碼。

1、連接ES集群代碼如下: 

  1 package elasticsearch;
  2 
  3 import com.vividsolutions.jts.geom.GeometryFactory;
  4 import com.vividsolutions.jts.geom.MultiPolygon;
  5 import com.vividsolutions.jts.geom.Polygon;
  6 import com.vividsolutions.jts.io.ParseException;
  7 import com.vividsolutions.jts.io.WKTReader;
  8 import org.apache.commons.logging.Log;
  9 import org.apache.commons.logging.LogFactory;
 10 import org.elasticsearch.action.bulk.BulkProcessor;
 11 import org.elasticsearch.action.bulk.BulkRequest;
 12 import org.elasticsearch.action.bulk.BulkResponse;
 13 import org.elasticsearch.client.transport.TransportClient;
 14 import org.elasticsearch.common.settings.Settings;
 15 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 16 import org.elasticsearch.common.unit.ByteSizeUnit;
 17 import org.elasticsearch.common.unit.ByteSizeValue;
 18 import org.elasticsearch.common.unit.TimeValue;
 19 
 20 import java.net.InetAddress;
 21 import java.util.Date;
 22 
 23 /**
 24  * Created by ZhangDong on 2015/12/25.
 25  */
 26 public class EsClient {
 27     static Log log = LogFactory.getLog(EsClient.class);
 28 
 29     //    用於提供單例的TransportClient BulkProcessor
 30     static public TransportClient tclient = null;
 31     static BulkProcessor staticBulkProcessor = null;
 32 
 33 //【獲取TransportClient 的方法】
 34     public static TransportClient getClient() {
 35         try {
 36             if (tclient == null) {
 37                 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300";
 38                 Settings settings = Settings.settingsBuilder()
 39                         .put("cluster.name", "wshare_es")//設置集群名稱
 40                         .put("tclient.transport.sniff", true).build();//自動嗅探整個集群的狀態,把集群中其它機器的ip地址加到客戶端中
 41 
 42                 tclient = TransportClient.builder().settings(settings).build();
 43                 String[] nodes = EsHosts.split(",");
 44                 for (String node : nodes) {
 45                     if (node.length() > 0) {//跳過為空的node(當開頭、結尾有逗號或多個連續逗號時會出現空node)
 46                         String[] hostPort = node.split(":");
 47                         tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
 48 
 49                     }
 50                 }
 51             }//if
 52         } catch (Exception e) {
 53             e.printStackTrace();
 54         }
 55         return tclient;
 56     }
 57      //【設置自動提交文檔】
 58     public static BulkProcessor getBulkProcessor() {
 59         //自動批量提交方式
 60         if (staticBulkProcessor == null) {
 61             try {
 62                 staticBulkProcessor = BulkProcessor.builder(getClient(),
 63                         new BulkProcessor.Listener() {
 64                             @Override
 65                             public void beforeBulk(long executionId, BulkRequest request) {
 66                                 //提交前調用
 67 //                                System.out.println(new Date().toString() + " before");
 68                             }
 69 
 70                             @Override
 71                             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 72                                 //提交結束后調用(無論成功或失敗)
 73 //                                System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures());
 74                                 log.info( "提交" + response.getItems().length + "個文檔,用時"
 75                                         + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文檔提交失敗!" : ""));
 76 //                                response.hasFailures();//是否有提交失敗
 77                             }
 78 
 79                             @Override
 80                             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
 81                                 //提交結束且失敗時調用
 82                                 log.error( " 有文檔提交失敗!after failure=" + failure);
 83                             }
 84                         })
 85                         
 86                         .setBulkActions(1000)//文檔數量達到1000時提交
 87                         .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//總文檔體積達到5MB時提交 //
 88                         .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(無論文檔數量、體積是否達到閾值)
 89                         .setConcurrentRequests(1)//加1后為可並行的提交請求數,即設為0代表只可1個請求並行,設為1為2個並行
 90                         .build();
 91 //                staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//關閉,如有未提交完成的文檔則等待完成,最多等待10分鍾
 92             } catch (Exception e) {//關閉時拋出異常
 93                 e.printStackTrace();
 94             }
 95         }//if
 96 
 97 
 98 
 99 
100 
101         return staticBulkProcessor;
102     }
103 }

 2、插入文檔的代碼(自動批量提交方式,注釋中另有手動批量提交、單個文檔提交的方式)

 1 package elasticsearch;
 2 
 3 import org.apache.commons.logging.Log;
 4 import org.apache.commons.logging.LogFactory;
 5 import org.elasticsearch.action.index.IndexRequest;
 6 
 7 
 8 /**
 9  * Created by ZhangDong on 2015/12/25.
10  */
11 public class EsInsert2 {
12     static Log log = LogFactory.getLog(EsInsert2.class);
13     public static void add(String json) {
14                 try {  //EsClient.getBulkProcessor()是位於上方EsClient類中的方法
15                     EsClient.getBulkProcessor().add(new IndexRequest("設置的index name", "設置的type name","要插入的文檔的ID").source(json));//添加文檔,以便自動提交
16                 } catch (Exception e) {
17                     log.error("add文檔時出現異常:e=" + e + " json=" + json);
18                 }
19     }
20 }
21 //手動 批量更新
22 //        BulkRequestBuilder bulkRequest = tclient.prepareBulk();
23 //        for(int i=500;i<1000;i++){
24 //            //業務對象
25 //            String json = "";
26 //            IndexRequestBuilder indexRequest = tclient.prepareIndex("twitter", "tweet")
27 //                    //指定不重復的ID
28 //                    .setSource(json).setId(String.valueOf(i));
29 //            //添加到builder中
30 //            bulkRequest.add(indexRequest);
31 //        }
32 //
33 //        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
34 //        if (bulkResponse.hasFailures()) {
35 //            // process failures by iterating through each bulk response item
36 //            System.out.println(bulkResponse.buildFailureMessage());
37 //        }
38 
39 //單個文檔提交
40 //        String json = "{\"relationship\":{},\"tags\":[\"camera\",\"video\"]}";
41 //        IndexResponse response = getClient().prepareIndex("dots", "scan", JSON.parseObject(json).getString("rid")).setSource(json).get();
42 //        return response.toString();

 

3、進行搜索的代碼,其中有適用於復雜搜索邏輯的BoolQuery用法,以及關鍵詞高亮的配置、在某個字段精確搜索、全文搜索、匹配全部文檔、搜索同時返回聚類信息的用法:

 1 package service;
 2 
 3 import elasticsearch.EsClient;
 4 import org.apache.commons.logging.Log;
 5 import org.apache.commons.logging.LogFactory;
 6 import org.elasticsearch.action.search.SearchRequestBuilder;
 7 import org.elasticsearch.action.search.SearchResponse;
 8 import org.elasticsearch.index.query.*;
 9 import org.elasticsearch.search.aggregations.AggregationBuilders;
10 import org.springframework.stereotype.Service;
11 
12 /**
13  * Created by ZhangDong on 2016/1/5.
14  */
15 @Service
16 public class SearchService2 {
17 
18     Log log = LogFactory.getLog(getClass());
19     public SearchResponse getSimpleSearchResponse( int page, int pagesize){
20 
21         BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
22         mustQuery.must(QueryBuilders.matchAllQuery()); // 添加第1條must的條件 此處為匹配所有文檔
23 
24         mustQuery.must(QueryBuilders.matchPhraseQuery("title", "時間簡史"));//添加第2條must的條件 title字段必須為【時間簡史】
25         // ↑ 放入篩選條件(termQuery為精確搜索,大小寫敏感且不支持*) 實驗發現matchPhraseQuery可對中文精確匹配term
26 
27         mustQuery.must(QueryBuilders.matchQuery("auther", "霍金")); // 添加第3條must的條件
28 
29         QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("物理")//.escape(true)//escape 轉義 設為true,避免搜索[]、結尾為!的關鍵詞時異常 但無法搜索*
30                 .defaultOperator(QueryStringQueryBuilder.Operator.AND);//不同關鍵詞之間使用and關系
31         mustQuery.must(queryBuilder);//添加第4條must的條件 關鍵詞全文搜索篩選條件
32 
33         SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch("index name").setTypes("type name")
34                 .setQuery(mustQuery)
35                 .addHighlightedField("*")/*星號表示在所有字段都高亮*/.setHighlighterRequireFieldMatch(false)//配置高亮顯示搜索結果
36                 .setHighlighterPreTags("<高亮前綴標簽>").setHighlighterPostTags("<高亮后綴標簽>");//配置高亮顯示搜索結果
37 
38                 searchRequestBuilder = searchRequestBuilder.addAggregation(AggregationBuilders.terms("agg1(聚類返回時根據此key獲取聚類結果)")
39                         .size(1000)/*返回1000條聚類結果*/.field("要在文檔中聚類的字段,如果是嵌套的則用點連接父子字段,如【person.company.name】"));
40 
41         SearchResponse searchResponse = searchRequestBuilder.setFrom((page - 1) * pagesize)//分頁起始位置(跳過開始的n個)
42                 .setSize(pagesize)//本次返回的文檔數量
43                 .execute().actionGet();//執行搜索
44 
45         log.info("response="+searchResponse);
46         return searchResponse;
47     }
48 }

 

4、ES中使用delete-by-query插件,DSL方式按條件刪除數據的方法:

ES2.1中,默認的文檔刪除方式只有按ID刪除方法:

curl -XDELETE 'localhost:9200/customer/external/2?pretty'

(參考:Deleting Documents | Elasticsearch Reference [2.1] | Elastic https://www.elastic.co/guide/en/elasticsearch/reference/2.1/_deleting_documents.html

按條件刪除需要安裝delete-by-query插件,在線安裝方式可使用命令

plugin install delete-by-query

隨后會從https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip處下載插件安裝包。但是本人使用的某個ES環境是離線的,需要手動下載上述URL對應的ZIP,放置於elasticsearch-2.1.0文件夾下,與bin、config等文件夾同級,同時還要下載 https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip.md5 校驗文件放於同一位置(XXX.sha1應該也可以),使用以下命令離線安裝:

bin/plugin install file:delete-by-query-2.1.0.zip

其中delete-by-query-2.1.0.zip是相對路徑,絕對路徑應該也可以,隨后便安裝成功了。

安裝成功后查看,發現其實就是解壓delete-by-query-2.1.0.zip的內容放置於elasticsearch-2.1.0/plugins/delete-by-query 文件夾下,猜測手動解壓也可以使用。

注意:如果是ES集群,需要對每個節點都安裝這個插件,而且每個節點安裝后要重啟ES。

使用DSL方式按條件刪除文檔的方法:

DELETE方式,請求
http://localhost:9200/index_name/type_name/_query
http payload內容:
{
  "query":{
    "match_all":{}
  }
}
上述query為匹配全部文檔。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM