使用Java操作Elasticsearch(Elasticsearch的java api使用)


1、Elasticsearch是基於Lucene開發的一個分布式全文檢索框架,向Elasticsearch中存儲和從Elasticsearch中查詢,格式是json。

索引index,相當於數據庫中的database。

類型type相當於數據庫中的table。

主鍵id相當於數據庫中記錄的主鍵,是唯一的。

向Elasticsearch中存儲數據,其實就是向es中的index下面的type中存儲json類型的數據。

2、Elasticsearch是RestFul風格的api,通過http的請求形式(注意,參數是url拼接還是請求的json形式哦),發送請求,對Elasticsearch進行操作。
查詢,請求方式應該是get。刪除,請求方式應該是delete。添加,請求方式應該是put/post。修改,請求方式應該是put/post。
RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必須提供的。id是可以選擇的,不提供es會自動生成,index、type將信息進行分層,利於管理。

3、如何使用java連接Elasticsearch。由於使用的是maven項目,pom.xml的依賴如下所示:

 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>elasticsearch-hello</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <properties>
11         <maven.compiler.source>1.8</maven.compiler.source>
12         <maven.compiler.target>1.8</maven.compiler.target>
13         <encoding>UTF-8</encoding>
14     </properties>
15 
16     <dependencies>
17         <!-- elasticsearch的客戶端 -->
18         <dependency>
19             <groupId>org.elasticsearch.client</groupId>
20             <artifactId>transport</artifactId>
21             <version>5.4.3</version>
22         </dependency>
23         <!-- elasticsearch依賴2.x的log4j -->
24         <dependency>
25             <groupId>org.apache.logging.log4j</groupId>
26             <artifactId>log4j-api</artifactId>
27             <version>2.8.2</version>
28         </dependency>
29         <dependency>
30             <groupId>org.apache.logging.log4j</groupId>
31             <artifactId>log4j-core</artifactId>
32             <version>2.8.2</version>
33         </dependency>
34         <!-- junit單元測試 -->
35         <dependency>
36             <groupId>junit</groupId>
37             <artifactId>junit</artifactId>
38             <version>4.12</version>
39         </dependency>
40     </dependencies>
41 
42 
43 </project>

使用查詢的方式,先簡單測試一下是否連通es集群,和對比查詢的數據是否一致。

 1 package com.bie.elasticsearch;
 2 
 3 import java.net.InetAddress;
 4 
 5 import org.elasticsearch.action.get.GetResponse;
 6 import org.elasticsearch.client.transport.TransportClient;
 7 import org.elasticsearch.common.settings.Settings;
 8 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 9 import org.elasticsearch.transport.client.PreBuiltTransportClient;
10 
11 /**
12  * 
13  * @author biehl
14  *
15  */
16 public class HelloElasticsearch {
17 
18     public static void main(String[] args) {
19         try {
20             // 設置集群名稱biehl01,Settings設置es的集群名稱,使用的設計模式,鏈式設計模式、build設計模式。
21             Settings settings = Settings.builder().put("cluster.name", "biehl01").build();
22             // 讀取es集群中的數據,創建client。
23             @SuppressWarnings("resource")
24             TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
25                     // 用java訪問ES用的端口是9300。es的9200是restful的請求端口號
26                     // 由於我使用的是偽集群,所以就配置了一台機器,如果是集群方式,將競選主節點的加進來即可。
27                     // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
28                     // 9300),
29                     // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
30                     // 9300),
31                     new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
32             // 搜索數據(.actionGet()方法是同步的,沒有返回就等待)
33             // 方式是先去索引里面查詢出索引數據,再去文檔里面查詢出數據。
34             GetResponse response = client.prepareGet("news", "fulltext", "2").execute().actionGet();
35             // 輸出結果
36             System.out.println(response);
37             // 關閉client
38             client.close();
39         } catch (Exception e) {
40             e.printStackTrace();
41         }
42 
43     }
44 
45 }

查詢的結果如下所示:

4、如何使用java api創建索引Index、類型Type、以及指定字段,是否創建索引,是否存儲,是否即分詞,又建立索引(analyzed)、是否建索引不分詞(not_analyzed)等等。

  1 package com.bie.elasticsearch;
  2 
  3 import java.io.IOException;
  4 import java.net.InetAddress;
  5 import java.util.HashMap;
  6 
  7 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
  8 import org.elasticsearch.client.AdminClient;
  9 import org.elasticsearch.client.IndicesAdminClient;
 10 import org.elasticsearch.client.transport.TransportClient;
 11 import org.elasticsearch.common.settings.Settings;
 12 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 13 import org.elasticsearch.common.xcontent.XContentBuilder;
 14 import org.elasticsearch.common.xcontent.XContentFactory;
 15 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 16 import org.junit.Before;
 17 import org.junit.Test;
 18 
 19 /**
 20  * 
 21  * @author biehl
 22  *
 23  */
 24 public class AdminAPI {
 25 
 26     private TransportClient client = null;
 27 
 28     // 在所有的測試方法之前執行
 29     @SuppressWarnings("resource")
 30     @Before
 31     public void init() throws Exception {
 32         // 設置集群名稱biehl01
 33         Settings settings = Settings.builder().put("cluster.name", "biehl01")
 34                 // 自動感知的功能(可以通過當前指定的節點獲取所有es節點的信息)
 35                 .put("client.transport.sniff", true).build();
 36         // 創建client
 37         client = new PreBuiltTransportClient(settings).addTransportAddresses(
 38                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 39                 // 9300),
 40                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 41                 // 9300),
 42                 // 建議指定2個及其以上的節點。
 43                 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
 44     }
 45 
 46     /**
 47      * 
 48      * AdminClient創建索引,並配置一些參數,用來指定一些映射關系等等
 49      * 
 50      * 這里創建一個索引Index,並且指定分區、副本的數量
 51      * 
 52      */
 53     @Test
 54     public void createIndexWithSettings() {
 55         // 獲取Admin的API
 56         AdminClient admin = client.admin();
 57         // 使用Admin API對索引進行操作
 58         IndicesAdminClient indices = admin.indices();
 59         // 准備創建索引
 60         indices.prepareCreate("food")
 61                 // 配置索引參數
 62                 .setSettings(
 63                         // 參數配置器
 64                         Settings.builder()// 指定索引分區的數量。shards分區
 65                                 .put("index.number_of_shards", 5)
 66                                 // 指定索引副本的數量(注意:不包括本身,如果設置數據存儲副本為1,實際上數據存儲了2份)
 67                                 // replicas副本
 68                                 .put("index.number_of_replicas", 1))
 69                 // 真正執行
 70                 .get();
 71     }
 72 
 73     /**
 74      * 你可以通過dynamic設置來控制這一行為,它能夠接受以下的選項: true:默認值。
 75      * 
 76      * 動態添加字段 false:忽略新字段
 77      * 
 78      * strict:如果碰到陌生字段,拋出異常
 79      * 
 80      * 給索引添加mapping信息(給表添加schema信息)
 81      * 
 82      * @throws IOException
 83      */
 84     @Test
 85     public void elasticsearchSettingsMappings() throws IOException {
 86         // 1:settings
 87         HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
 88         // shards分區的數量4
 89         settings_map.put("number_of_shards", 4);
 90         // 副本的數量1
 91         settings_map.put("number_of_replicas", 1);
 92 
 93         // 2:mappings(映射、schema)
 94         // field("dynamic", "true")含義是動態字段
 95         XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true")
 96                 // 設置type中的屬性
 97                 .startObject("properties")
 98                 // id屬性
 99                 .startObject("id")
100                 // 類型是integer
101                 .field("type", "integer")
102                 // 不分詞,但是建索引
103                 .field("index", "not_analyzed")
104                 // 在文檔中存儲
105                 .field("store", "yes").endObject()
106                 // name屬性
107                 .startObject("name")
108                 // string類型
109                 .field("type", "string")
110                 // 在文檔中存儲
111                 .field("store", "yes")
112                 // 建立索引
113                 .field("index", "analyzed")
114                 // 使用ik_smart進行分詞
115                 .field("analyzer", "ik_smart").endObject().endObject().endObject();
116 
117         CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer");
118         // 管理索引(user_info)然后關聯type(user)
119         prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get();
120     }
121 
122     /**
123      * index這個屬性,no代表不建索引
124      * 
125      * not_analyzed,建索引不分詞
126      * 
127      * analyzed 即分詞,又建立索引
128      * 
129      * expected [no],[not_analyzed] or [analyzed]。即可以選擇三者任意一個值
130      * 
131      * @throws IOException
132      */
133 
134     @Test
135     public void elasticsearchSettingsPlayerMappings() throws IOException {
136         // 1:settings
137         HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
138         // 分區的數量4
139         settings_map.put("number_of_shards", 4);
140         // 副本的數量1
141         settings_map.put("number_of_replicas", 1);
142 
143         // 2:mappings
144         XContentBuilder builder = XContentFactory.jsonBuilder().startObject()//
145                 .field("dynamic", "true").startObject("properties")
146                 // 在文檔中存儲、
147                 .startObject("id").field("type", "integer").field("store", "yes").endObject()
148                 // 不分詞,但是建索引、
149                 .startObject("name").field("type", "string").field("index", "not_analyzed").endObject()
150                 //
151                 .startObject("age").field("type", "integer").endObject()
152                 //
153                 .startObject("salary").field("type", "integer").endObject()
154                 // 不分詞,但是建索引、
155                 .startObject("team").field("type", "string").field("index", "not_analyzed").endObject()
156                 // 不分詞,但是建索引、
157                 .startObject("position").field("type", "string").field("index", "not_analyzed").endObject()
158                 // 即分詞,又建立索引、
159                 .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed")
160                 .field("analyzer", "ik_smart").endObject()
161                 // 即分詞,又建立索引、在文檔中存儲、
162                 .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed")
163                 .field("analyzer", "ik_smart").endObject()
164 
165                 .endObject()
166 
167                 .endObject();
168 
169         CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player");
170         prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get();
171     }
172 }

5、使用java api操作Elasticsearch的增刪改查以及復雜查詢(聚合查詢,可以進行分組統計數量,分組統計最大值,分組統計平均值,等等統計)。

  1 package com.bie.elasticsearch;
  2 
  3 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
  4 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
  5 
  6 import java.io.IOException;
  7 import java.net.InetAddress;
  8 import java.util.Date;
  9 import java.util.Iterator;
 10 import java.util.Map;
 11 import java.util.Set;
 12 
 13 import org.elasticsearch.action.ActionListener;
 14 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
 15 import org.elasticsearch.action.delete.DeleteResponse;
 16 import org.elasticsearch.action.get.GetResponse;
 17 import org.elasticsearch.action.get.MultiGetItemResponse;
 18 import org.elasticsearch.action.get.MultiGetResponse;
 19 import org.elasticsearch.action.index.IndexResponse;
 20 import org.elasticsearch.action.search.SearchRequestBuilder;
 21 import org.elasticsearch.action.search.SearchResponse;
 22 import org.elasticsearch.action.update.UpdateRequest;
 23 import org.elasticsearch.action.update.UpdateResponse;
 24 import org.elasticsearch.client.transport.TransportClient;
 25 import org.elasticsearch.common.settings.Settings;
 26 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 27 import org.elasticsearch.index.query.QueryBuilder;
 28 import org.elasticsearch.index.query.QueryBuilders;
 29 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 30 import org.elasticsearch.search.aggregations.Aggregation;
 31 import org.elasticsearch.search.aggregations.AggregationBuilders;
 32 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 34 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 35 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
 36 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
 37 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 38 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
 39 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
 40 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
 41 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 42 import org.junit.Before;
 43 import org.junit.Test;
 44 
 45 /**
 46  * 
 47  * @author biehl
 48  *
 49  */
 50 public class ElasticsearchCRUD {
 51 
 52     private TransportClient client = null;
 53 
 54     @SuppressWarnings("resource")
 55     @Before
 56     public void init() throws Exception {
 57         // 設置集群名稱biehl01
 58         Settings settings = Settings.builder().put("cluster.name", "biehl01")
 59                 // 自動感知的功能(可以通過當前指定的節點獲取所有es節點的信息)
 60                 .put("client.transport.sniff", true).build();
 61         // 創建client
 62         client = new PreBuiltTransportClient(settings).addTransportAddresses(
 63                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 64                 // 9300),
 65                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 66                 // 9300),
 67                 // 建議指定2個及其以上的節點。
 68                 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
 69     }
 70 
 71     /**
 72      * 創建一個Index索引、Type類型、以及id。
 73      * 
 74      * 然后插入類型里面的數據。
 75      * 
 76      * @throws IOException
 77      */
 78     @Test
 79     public void elasticsearchCreate() throws IOException {
 80         IndexResponse response = client.prepareIndex("people", "student", "3")
 81                 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "")
 82                         .field("birthday", new Date()).field("age", 21).field("message", "trying out Elasticsearch")
 83                         .endObject())
 84                 .get();
 85         System.out.println(response.toString());
 86     }
 87 
 88     /**
 89      * 查找一條索引Index里面的類型Type里面的id的所有信息
 90      * 
 91      * @throws IOException
 92      */
 93     @Test
 94     public void elasticsearchGet() throws IOException {
 95         GetResponse response = client.prepareGet("people", "student", "1").get();
 96         System.out.println(response.getSourceAsString());
 97     }
 98 
 99     /**
100      * 查找多條
101      * 
102      * 索引Index里面的類型Type里面的多個id的所有信息
103      * 
104      * @throws IOException
105      */
106     @Test
107     public void elasticsearchMultiGet() throws IOException {
108         // 查詢出多個索引Index多個類型Type的多個id的所有信息
109         MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "1")
110                 .add("people", "student", "2", "3").add("people", "teacher", "1").add("news", "fulltext", "1").get();
111         // 將查詢出的結果遍歷輸出
112         for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
113             // 將每一個查詢出的結果遍歷輸出
114             GetResponse response = itemResponse.getResponse();
115             // 判斷如果存在就進行遍歷輸出
116             if (response.isExists()) {
117                 String json = response.getSourceAsString();
118                 System.out.println(json);
119             }
120         }
121     }
122 
123     /**
124      * 修改指定的索引Index里面的類型Type的id的信息
125      * 
126      * @throws Exception
127      */
128     @Test
129     public void elasticsearchUpdate() throws Exception {
130         // 創建一個更新的請求對象
131         UpdateRequest updateRequest = new UpdateRequest();
132         // 指定索引Index
133         updateRequest.index("people");
134         // 指定類型Type
135         updateRequest.type("student");
136         // 指定id的值
137         updateRequest.id("3");
138         // 設置修改的字段信息
139         updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject());
140         // 開始進行修改,並且返回響應信息
141         UpdateResponse updateResponse = client.update(updateRequest).get();
142         // 打印輸出響應的信息
143         System.out.println(updateResponse.toString());
144     }
145 
146     /**
147      * 刪除指定的索引Index里面的類型Type的id的信息
148      */
149     @Test
150     public void elasticsearchDelete() {
151         // 指定刪除的id信息,並且給出響應結果
152         // prepareDelete(String index, String type, String id);
153         DeleteResponse response = client.prepareDelete("people", "student", "4").get();
154         // 打印輸出的響應信息
155         System.out.println(response);
156     }
157 
158     /**
159      * 根據查詢條件進行刪除數據
160      * 
161      * 
162      */
163     @Test
164     public void elasticsearchDeleteByQuery() {
165         BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
166                 // 指定查詢條件,matchQuery是name的值text里面包括了這個內容就進行刪除。默認使用標准分詞器。
167                 .filter(QueryBuilders.matchQuery("username", "王五五"))
168                 // 指定索引名稱
169                 .source("people").get();
170         // 獲取到刪除的個數
171         long deleted = response.getDeleted();
172         // 打印輸出刪除的個數
173         System.out.println(deleted);
174     }
175 
176     /**
177      * 異步刪除
178      * 
179      * 監聽,如果真正刪除以后進行回調,打印輸出刪除確認的消息。
180      */
181     @Test
182     public void elasticsearchDeleteByQueryAsync() {
183         DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", ""))
184                 .source("people").execute(new ActionListener<BulkByScrollResponse>() {
185 
186                     // 刪除以后的方法回調
187                     @Override
188                     public void onResponse(BulkByScrollResponse response) {
189                         // 返回刪除的個數
190                         long deleted = response.getDeleted();
191                         System.out.println("數據刪除完畢!");
192                         // 打印刪除的個數
193                         System.out.println("數據刪除的個數: " + deleted);
194                     }
195 
196                     @Override
197                     public void onFailure(Exception e) {
198                         // 失敗打印異常信息
199                         e.printStackTrace();
200                     }
201                 });
202 
203         // 先打印輸出,正常執行完畢。再執行異步監聽刪除數據。
204         try {
205             System.out.println("異步刪除操作!");
206             // 休眠10秒鍾,避免主線程里面結束,子線程無法進行結果輸出
207             Thread.sleep(10000);
208         } catch (Exception e) {
209             e.printStackTrace();
210         }
211     }
212 
213     /**
214      * 
215      * 按照范圍進行查找。
216      * 
217      */
218     @Test
219     public void elasticsearchRange() {
220         // includeLower(true).includeUpper(false)含義是包含前面,不包含后面的
221         // [21, 24)
222         QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false);
223         // 將查詢條件傳遞進去,並將查詢結果進行返回。
224         SearchResponse response = client.prepareSearch("people").setQuery(qb).get();
225         System.out.println(response);
226     }
227 
228     /**
229      * 
230      * 向指定索引index里面的類型Type的id的信息
231      * 
232      * @throws IOException
233      */
234     @Test
235     public void elasticsearchAddPlayer() throws IOException {
236         //
237         IndexResponse response = client.prepareIndex("player", "basketball", "4")
238 
239                 .setSource(jsonBuilder().startObject()
240 
241                         .field("name", "安其拉")
242 
243                         .field("age", 28)
244 
245                         .field("salary", 99000)
246 
247                         .field("team", "啦啦隊 team")
248 
249                         .field("position", "打中鋒")
250 
251                         .field("description", "跪族藍孩")
252 
253                         .endObject())
254                 .get();
255 
256         System.out.println(response);
257     }
258 
259     /**
260      * 
261      *
262      * select team, count(*) as team_count from player group by team;
263      * 
264      * team_counts是別名稱。
265      */
266     @Test
267     public void elasticsearchAgg1() {
268         // 指定索引和type
269         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
270         // 按team分組然后聚合,但是並沒有指定聚合函數。
271         // team_count是別名稱
272         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team");
273         // 添加聚合器
274         builder.addAggregation(teamAgg);
275         // 觸發
276         SearchResponse response = builder.execute().actionGet();
277         // System.out.println(response);
278         // 將返回的結果放入到一個map中
279         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
280         // 遍歷打印輸出
281         Set<String> keys = aggMap.keySet();
282         for (String key : keys) {
283             System.out.println("key: " + key);
284         }
285 
286         System.out.println("");
287 
288         // //取出聚合屬性
289         StringTerms terms = (StringTerms) aggMap.get("team_count");
290 
291         // //依次迭代出分組聚合數據
292         for (Terms.Bucket bucket : terms.getBuckets()) {
293             // 分組的名字
294             String team = (String) bucket.getKey();
295             // count,分組后一個組有多少數據
296             long count = bucket.getDocCount();
297             System.out.println(team + ": " + count);
298         }
299 
300         System.out.println("");
301 
302         // 使用Iterator進行遍歷迭代
303         Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
304         while (teamBucketIt.hasNext()) {
305             Terms.Bucket bucket = teamBucketIt.next();
306             // 獲取到分組后每組的組名稱
307             String team = (String) bucket.getKey();
308             // 獲取到分組后的每組數量
309             long count = bucket.getDocCount();
310             // 打印輸出
311             System.out.println(team + ": " + count);
312         }
313     }
314 
315     /**
316      * 
317      * select
318      * 
319      * team, position, count(*) as pos_count
320      * 
321      * from
322      * 
323      * player
324      * 
325      * group by
326      * 
327      * team,position;
328      * 
329      * 
330      */
331     @Test
332     public void elasticsearchAgg2() {
333         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
334         // 指定別名和分組的字段
335         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
336         TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position");
337         // 添加兩個聚合構建器。先按照team分組,再按照position分組。
338         builder.addAggregation(teamAgg.subAggregation(posAgg));
339         // 執行查詢
340         SearchResponse response = builder.execute().actionGet();
341         // 將查詢結果放入map中
342         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
343         // 根據屬性名到map中查找
344         StringTerms teams = (StringTerms) aggMap.get("team_name");
345         // 循環查找結果
346         for (Terms.Bucket teamBucket : teams.getBuckets()) {
347             // 先按球隊進行分組
348             String team = (String) teamBucket.getKey();
349             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
350             StringTerms positions = (StringTerms) subAggMap.get("pos_count");
351             // 因為一個球隊有很多位置,那么還要依次拿出位置信息
352             for (Terms.Bucket posBucket : positions.getBuckets()) {
353                 // 拿到位置的名字
354                 String pos = (String) posBucket.getKey();
355                 // 拿出該位置的數量
356                 long docCount = posBucket.getDocCount();
357                 // 打印球隊,位置,人數
358                 System.out.println(team + ": " + pos + ": " + docCount);
359             }
360         }
361 
362     }
363 
364     /**
365      * select team, max(age) as max_age from player group by team;
366      */
367     @Test
368     public void elasticsearchAgg3() {
369         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
370         // 指定安球隊進行分組
371         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
372         // 指定分組求最大值
373         MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
374         // 分組后求最大值
375         builder.addAggregation(teamAgg.subAggregation(maxAgg));
376         // 查詢
377         SearchResponse response = builder.execute().actionGet();
378         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
379         // 根據team屬性,獲取map中的內容
380         StringTerms teams = (StringTerms) aggMap.get("team_name");
381         for (Terms.Bucket teamBucket : teams.getBuckets()) {
382             // 分組的屬性名
383             String team = (String) teamBucket.getKey();
384             // 在將聚合后取最大值的內容取出來放到map中
385             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
386             // 取分組后的最大值
387             InternalMax ages = (InternalMax) subAggMap.get("max_age");
388             // 獲取到年齡的值
389             double max = ages.getValue();
390             // 打印輸出值
391             System.out.println(team + ": " + max);
392         }
393     }
394 
395     /**
396      * select team, avg(age) as avg_age, sum(salary) as total_salary from player
397      * group by team;
398      */
399     @Test
400     public void elasticsearchAgg4() {
401         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
402         // 指定分組字段
403         TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
404         // 指定聚合函數是求平均數據
405         AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
406         // 指定另外一個聚合函數是求和
407         SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
408         // 分組的聚合器關聯了兩個聚合函數
409         builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
410         // 查詢
411         SearchResponse response = builder.execute().actionGet();
412         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
413         // 按分組的名字取出數據
414         StringTerms teams = (StringTerms) aggMap.get("team_name");
415         for (Terms.Bucket teamBucket : teams.getBuckets()) {
416             // 獲取球隊名字
417             String team = (String) teamBucket.getKey();
418             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
419             // 根據別名取出平均年齡
420             InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age");
421             // 根據別名取出薪水總和
422             InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
423             double avgAgeValue = avgAge.getValue();
424             double totalSalaryValue = totalSalary.getValue();
425             System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue);
426         }
427     }
428 
429     /**
430      * select team, sum(salary) as total_salary from player group by team order by
431      * total_salary desc;
432      */
433     @Test
434     public void elasticsearchAgg5() {
435         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
436         // 按team進行分組,然后指定排序規則
437         TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team")
438                 .order(Terms.Order.aggregation("total_salary ", true));
439         // 指定一個聚合函數是求和
440         SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
441         // 添加兩個聚合構建器。先按照team分組,再按照salary求和。
442         builder.addAggregation(termsAgg.subAggregation(sumAgg));
443         // 查詢
444         SearchResponse response = builder.execute().actionGet();
445         // 將查詢結果放入map中
446         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
447         // 從查詢結果中獲取到team_name的信息
448         StringTerms teams = (StringTerms) aggMap.get("team_name");
449         // 開始遍歷獲取到的信息
450         for (Terms.Bucket teamBucket : teams.getBuckets()) {
451             // 獲取到key的值
452             String team = (String) teamBucket.getKey();
453             // 獲取到求和的值
454             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
455             // 獲取到求和的值的信息
456             InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
457             // 獲取到求和的值
458             double totalSalaryValue = totalSalary.getValue();
459             // 打印輸出信息
460             System.out.println(team + " " + totalSalaryValue);
461         }
462     }
463 
464 }

執行效果,自己可以分別進行測試。由於測試都寫了說明,這里就不一一進行測試打印效果了。請自行練習使用即可。

作者:別先生

博客園:https://www.cnblogs.com/biehongli/

如果您想及時得到個人撰寫文章以及著作的消息推送,可以掃描上方二維碼,關注個人公眾號哦。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM