2.0之后ES的java api用法有了很大變化。在此記錄一些。
java應用程序連接ES集群,筆者使用的是TransportClient,獲取TransportClient的代碼設計為單例模式(見getClient方法)。同時包含了設置自動提交文檔的代碼。注釋比較詳細,不再贅述。
下方另有提交文檔、提交搜索請求的代碼。
1、連接ES集群代碼如下:
1 package elasticsearch; 2 3 import com.vividsolutions.jts.geom.GeometryFactory; 4 import com.vividsolutions.jts.geom.MultiPolygon; 5 import com.vividsolutions.jts.geom.Polygon; 6 import com.vividsolutions.jts.io.ParseException; 7 import com.vividsolutions.jts.io.WKTReader; 8 import org.apache.commons.logging.Log; 9 import org.apache.commons.logging.LogFactory; 10 import org.elasticsearch.action.bulk.BulkProcessor; 11 import org.elasticsearch.action.bulk.BulkRequest; 12 import org.elasticsearch.action.bulk.BulkResponse; 13 import org.elasticsearch.client.transport.TransportClient; 14 import org.elasticsearch.common.settings.Settings; 15 import org.elasticsearch.common.transport.InetSocketTransportAddress; 16 import org.elasticsearch.common.unit.ByteSizeUnit; 17 import org.elasticsearch.common.unit.ByteSizeValue; 18 import org.elasticsearch.common.unit.TimeValue; 19 20 import java.net.InetAddress; 21 import java.util.Date; 22 23 /** 24 * Created by ZhangDong on 2015/12/25. 25 */ 26 public class EsClient { 27 static Log log = LogFactory.getLog(EsClient.class); 28 29 // 用於提供單例的TransportClient BulkProcessor 30 static public TransportClient tclient = null; 31 static BulkProcessor staticBulkProcessor = null; 32 33 //【獲取TransportClient 的方法】 34 public static TransportClient getClient() { 35 try { 36 if (tclient == null) { 37 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300"; 38 Settings settings = Settings.settingsBuilder() 39 .put("cluster.name", "wshare_es")//設置集群名稱 40 .put("tclient.transport.sniff", true).build();//自動嗅探整個集群的狀態,把集群中其它機器的ip地址加到客戶端中 41 42 tclient = TransportClient.builder().settings(settings).build(); 43 String[] nodes = EsHosts.split(","); 44 for (String node : nodes) { 45 if (node.length() > 0) {//跳過為空的node(當開頭、結尾有逗號或多個連續逗號時會出現空node) 46 String[] hostPort = node.split(":"); 47 tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 48 49 } 50 } 51 }//if 52 } catch (Exception e) { 53 e.printStackTrace(); 54 } 55 return tclient; 56 } 57 //【設置自動提交文檔】 58 public static BulkProcessor getBulkProcessor() { 59 //自動批量提交方式 60 if (staticBulkProcessor == null) { 61 try { 62 staticBulkProcessor = BulkProcessor.builder(getClient(), 63 new BulkProcessor.Listener() { 64 @Override 65 public void beforeBulk(long executionId, BulkRequest request) { 66 //提交前調用 67 // System.out.println(new Date().toString() + " before"); 68 } 69 70 @Override 71 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 72 //提交結束后調用(無論成功或失敗) 73 // System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures()); 74 log.info( "提交" + response.getItems().length + "個文檔,用時" 75 + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文檔提交失敗!" : "")); 76 // response.hasFailures();//是否有提交失敗 77 } 78 79 @Override 80 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 81 //提交結束且失敗時調用 82 log.error( " 有文檔提交失敗!after failure=" + failure); 83 } 84 }) 85 86 .setBulkActions(1000)//文檔數量達到1000時提交 87 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//總文檔體積達到5MB時提交 // 88 .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(無論文檔數量、體積是否達到閾值) 89 .setConcurrentRequests(1)//加1后為可並行的提交請求數,即設為0代表只可1個請求並行,設為1為2個並行 90 .build(); 91 // staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//關閉,如有未提交完成的文檔則等待完成,最多等待10分鍾 92 } catch (Exception e) {//關閉時拋出異常 93 e.printStackTrace(); 94 } 95 }//if 96 97 98 99 100 101 return staticBulkProcessor; 102 } 103 }
2、插入文檔的代碼(自動批量提交方式,注釋中另有手動批量提交、單個文檔提交的方式):
1 package elasticsearch; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.elasticsearch.action.index.IndexRequest; 6 7 8 /** 9 * Created by ZhangDong on 2015/12/25. 10 */ 11 public class EsInsert2 { 12 static Log log = LogFactory.getLog(EsInsert2.class); 13 public static void add(String json) { 14 try { //EsClient.getBulkProcessor()是位於上方EsClient類中的方法 15 EsClient.getBulkProcessor().add(new IndexRequest("設置的index name", "設置的type name","要插入的文檔的ID").source(json));//添加文檔,以便自動提交 16 } catch (Exception e) { 17 log.error("add文檔時出現異常:e=" + e + " json=" + json); 18 } 19 } 20 } 21 //手動 批量更新 22 // BulkRequestBuilder bulkRequest = tclient.prepareBulk(); 23 // for(int i=500;i<1000;i++){ 24 // //業務對象 25 // String json = ""; 26 // IndexRequestBuilder indexRequest = tclient.prepareIndex("twitter", "tweet") 27 // //指定不重復的ID 28 // .setSource(json).setId(String.valueOf(i)); 29 // //添加到builder中 30 // bulkRequest.add(indexRequest); 31 // } 32 // 33 // BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 34 // if (bulkResponse.hasFailures()) { 35 // // process failures by iterating through each bulk response item 36 // System.out.println(bulkResponse.buildFailureMessage()); 37 // } 38 39 //單個文檔提交 40 // String json = "{\"relationship\":{},\"tags\":[\"camera\",\"video\"]}"; 41 // IndexResponse response = getClient().prepareIndex("dots", "scan", JSON.parseObject(json).getString("rid")).setSource(json).get(); 42 // return response.toString();
3、進行搜索的代碼,其中有適用於復雜搜索邏輯的BoolQuery用法,以及關鍵詞高亮的配置、在某個字段精確搜索、全文搜索、匹配全部文檔、搜索同時返回聚類信息的用法:
1 package service; 2 3 import elasticsearch.EsClient; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.elasticsearch.action.search.SearchRequestBuilder; 7 import org.elasticsearch.action.search.SearchResponse; 8 import org.elasticsearch.index.query.*; 9 import org.elasticsearch.search.aggregations.AggregationBuilders; 10 import org.springframework.stereotype.Service; 11 12 /** 13 * Created by ZhangDong on 2016/1/5. 14 */ 15 @Service 16 public class SearchService2 { 17 18 Log log = LogFactory.getLog(getClass()); 19 public SearchResponse getSimpleSearchResponse( int page, int pagesize){ 20 21 BoolQueryBuilder mustQuery = QueryBuilders.boolQuery(); 22 mustQuery.must(QueryBuilders.matchAllQuery()); // 添加第1條must的條件 此處為匹配所有文檔 23 24 mustQuery.must(QueryBuilders.matchPhraseQuery("title", "時間簡史"));//添加第2條must的條件 title字段必須為【時間簡史】 25 // ↑ 放入篩選條件(termQuery為精確搜索,大小寫敏感且不支持*) 實驗發現matchPhraseQuery可對中文精確匹配term 26 27 mustQuery.must(QueryBuilders.matchQuery("auther", "霍金")); // 添加第3條must的條件 28 29 QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("物理")//.escape(true)//escape 轉義 設為true,避免搜索[]、結尾為!的關鍵詞時異常 但無法搜索* 30 .defaultOperator(QueryStringQueryBuilder.Operator.AND);//不同關鍵詞之間使用and關系 31 mustQuery.must(queryBuilder);//添加第4條must的條件 關鍵詞全文搜索篩選條件 32 33 SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch("index name").setTypes("type name") 34 .setQuery(mustQuery) 35 .addHighlightedField("*")/*星號表示在所有字段都高亮*/.setHighlighterRequireFieldMatch(false)//配置高亮顯示搜索結果 36 .setHighlighterPreTags("<高亮前綴標簽>").setHighlighterPostTags("<高亮后綴標簽>");//配置高亮顯示搜索結果 37 38 searchRequestBuilder = searchRequestBuilder.addAggregation(AggregationBuilders.terms("agg1(聚類返回時根據此key獲取聚類結果)") 39 .size(1000)/*返回1000條聚類結果*/.field("要在文檔中聚類的字段,如果是嵌套的則用點連接父子字段,如【person.company.name】")); 40 41 SearchResponse searchResponse = searchRequestBuilder.setFrom((page - 1) * pagesize)//分頁起始位置(跳過開始的n個) 42 .setSize(pagesize)//本次返回的文檔數量 43 .execute().actionGet();//執行搜索 44 45 log.info("response="+searchResponse); 46 return searchResponse; 47 } 48 }
4、ES中使用delete-by-query插件,DSL方式按條件刪除數據的方法:
ES2.1中,默認的文檔刪除方式只有按ID刪除方法:
curl -XDELETE 'localhost:9200/customer/external/2?pretty'
按條件刪除需要安裝delete-by-query插件,在線安裝方式可使用命令
plugin install delete-by-query
隨后會從https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip處下載插件安裝包。但是本人使用的某個ES環境是離線的,需要手動下載上述URL對應的ZIP,放置於elasticsearch-2.1.0文件夾下,與bin、config等文件夾同級,同時還要下載 https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip.md5 校驗文件放於同一位置(XXX.sha1應該也可以),使用以下命令離線安裝:
bin/plugin install file:delete-by-query-2.1.0.zip
其中delete-by-query-2.1.0.zip是相對路徑,絕對路徑應該也可以,隨后便安裝成功了。
安裝成功后查看,發現其實就是解壓delete-by-query-2.1.0.zip的內容放置於elasticsearch-2.1.0/plugins/delete-by-query 文件夾下,猜測手動解壓也可以使用。
注意:如果是ES集群,需要對每個節點都安裝這個插件,而且每個節點安裝后要重啟ES。
使用DSL方式按條件刪除文檔的方法:
DELETE方式,請求 http://localhost:9200/index_name/type_name/_query http payload內容: { "query":{ "match_all":{} } } 上述query為匹配全部文檔。