1、Elasticsearch是基於Lucene開發的一個分布式全文檢索框架,向Elasticsearch中存儲和從Elasticsearch中查詢,格式是json。
索引index,相當於數據庫中的database。
類型type相當於數據庫中的table。
主鍵id相當於數據庫中記錄的主鍵,是唯一的。
向Elasticsearch中存儲數據,其實就是向es中的index下面的type中存儲json類型的數據。
2、Elasticsearch是RestFul風格的api,通過http的請求形式(注意,參數是url拼接還是請求的json形式哦),發送請求,對Elasticsearch進行操作。
查詢,請求方式應該是get。刪除,請求方式應該是delete。添加,請求方式應該是put/post。修改,請求方式應該是put/post。
RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必須提供的。id是可以選擇的,不提供es會自動生成,index、type將信息進行分層,利於管理。
3、如何使用java連接Elasticsearch。由於使用的是maven項目,pom.xml的依賴如下所示:
1 <project xmlns="http://maven.apache.org/POM/4.0.0" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 4 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.bie</groupId> 7 <artifactId>elasticsearch-hello</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 10 <properties> 11 <maven.compiler.source>1.8</maven.compiler.source> 12 <maven.compiler.target>1.8</maven.compiler.target> 13 <encoding>UTF-8</encoding> 14 </properties> 15 16 <dependencies> 17 <!-- elasticsearch的客戶端 --> 18 <dependency> 19 <groupId>org.elasticsearch.client</groupId> 20 <artifactId>transport</artifactId> 21 <version>5.4.3</version> 22 </dependency> 23 <!-- elasticsearch依賴2.x的log4j --> 24 <dependency> 25 <groupId>org.apache.logging.log4j</groupId> 26 <artifactId>log4j-api</artifactId> 27 <version>2.8.2</version> 28 </dependency> 29 <dependency> 30 <groupId>org.apache.logging.log4j</groupId> 31 <artifactId>log4j-core</artifactId> 32 <version>2.8.2</version> 33 </dependency> 34 <!-- junit單元測試 --> 35 <dependency> 36 <groupId>junit</groupId> 37 <artifactId>junit</artifactId> 38 <version>4.12</version> 39 </dependency> 40 </dependencies> 41 42 43 </project>
使用查詢的方式,先簡單測試一下是否連通es集群,和對比查詢的數據是否一致。
1 package com.bie.elasticsearch; 2 3 import java.net.InetAddress; 4 5 import org.elasticsearch.action.get.GetResponse; 6 import org.elasticsearch.client.transport.TransportClient; 7 import org.elasticsearch.common.settings.Settings; 8 import org.elasticsearch.common.transport.InetSocketTransportAddress; 9 import org.elasticsearch.transport.client.PreBuiltTransportClient; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 public class HelloElasticsearch { 17 18 public static void main(String[] args) { 19 try { 20 // 設置集群名稱biehl01,Settings設置es的集群名稱,使用的設計模式,鏈式設計模式、build設計模式。 21 Settings settings = Settings.builder().put("cluster.name", "biehl01").build(); 22 // 讀取es集群中的數據,創建client。 23 @SuppressWarnings("resource") 24 TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses( 25 // 用java訪問ES用的端口是9300。es的9200是restful的請求端口號 26 // 由於我使用的是偽集群,所以就配置了一台機器,如果是集群方式,將競選主節點的加進來即可。 27 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 28 // 9300), 29 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 30 // 9300), 31 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 32 // 搜索數據(.actionGet()方法是同步的,沒有返回就等待) 33 // 方式是先去索引里面查詢出索引數據,再去文檔里面查詢出數據。 34 GetResponse response = client.prepareGet("news", "fulltext", "2").execute().actionGet(); 35 // 輸出結果 36 System.out.println(response); 37 // 關閉client 38 client.close(); 39 } catch (Exception e) { 40 e.printStackTrace(); 41 } 42 43 } 44 45 }
查詢的結果如下所示:
4、如何使用java api創建索引Index、類型Type、以及指定字段,是否創建索引,是否存儲,是否即分詞,又建立索引(analyzed)、是否建索引不分詞(not_analyzed)等等。
1 package com.bie.elasticsearch; 2 3 import java.io.IOException; 4 import java.net.InetAddress; 5 import java.util.HashMap; 6 7 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; 8 import org.elasticsearch.client.AdminClient; 9 import org.elasticsearch.client.IndicesAdminClient; 10 import org.elasticsearch.client.transport.TransportClient; 11 import org.elasticsearch.common.settings.Settings; 12 import org.elasticsearch.common.transport.InetSocketTransportAddress; 13 import org.elasticsearch.common.xcontent.XContentBuilder; 14 import org.elasticsearch.common.xcontent.XContentFactory; 15 import org.elasticsearch.transport.client.PreBuiltTransportClient; 16 import org.junit.Before; 17 import org.junit.Test; 18 19 /** 20 * 21 * @author biehl 22 * 23 */ 24 public class AdminAPI { 25 26 private TransportClient client = null; 27 28 // 在所有的測試方法之前執行 29 @SuppressWarnings("resource") 30 @Before 31 public void init() throws Exception { 32 // 設置集群名稱biehl01 33 Settings settings = Settings.builder().put("cluster.name", "biehl01") 34 // 自動感知的功能(可以通過當前指定的節點獲取所有es節點的信息) 35 .put("client.transport.sniff", true).build(); 36 // 創建client 37 client = new PreBuiltTransportClient(settings).addTransportAddresses( 38 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 39 // 9300), 40 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 41 // 9300), 42 // 建議指定2個及其以上的節點。 43 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 44 } 45 46 /** 47 * 48 * AdminClient創建索引,並配置一些參數,用來指定一些映射關系等等 49 * 50 * 這里創建一個索引Index,並且指定分區、副本的數量 51 * 52 */ 53 @Test 54 public void createIndexWithSettings() { 55 // 獲取Admin的API 56 AdminClient admin = client.admin(); 57 // 使用Admin API對索引進行操作 58 IndicesAdminClient indices = admin.indices(); 59 // 准備創建索引 60 indices.prepareCreate("food") 61 // 配置索引參數 62 .setSettings( 63 // 參數配置器 64 Settings.builder()// 指定索引分區的數量。shards分區 65 .put("index.number_of_shards", 5) 66 // 指定索引副本的數量(注意:不包括本身,如果設置數據存儲副本為1,實際上數據存儲了2份) 67 // replicas副本 68 .put("index.number_of_replicas", 1)) 69 // 真正執行 70 .get(); 71 } 72 73 /** 74 * 你可以通過dynamic設置來控制這一行為,它能夠接受以下的選項: true:默認值。 75 * 76 * 動態添加字段 false:忽略新字段 77 * 78 * strict:如果碰到陌生字段,拋出異常 79 * 80 * 給索引添加mapping信息(給表添加schema信息) 81 * 82 * @throws IOException 83 */ 84 @Test 85 public void elasticsearchSettingsMappings() throws IOException { 86 // 1:settings 87 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 88 // shards分區的數量4 89 settings_map.put("number_of_shards", 4); 90 // 副本的數量1 91 settings_map.put("number_of_replicas", 1); 92 93 // 2:mappings(映射、schema) 94 // field("dynamic", "true")含義是動態字段 95 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true") 96 // 設置type中的屬性 97 .startObject("properties") 98 // id屬性 99 .startObject("id") 100 // 類型是integer 101 .field("type", "integer") 102 // 不分詞,但是建索引 103 .field("index", "not_analyzed") 104 // 在文檔中存儲 105 .field("store", "yes").endObject() 106 // name屬性 107 .startObject("name") 108 // string類型 109 .field("type", "string") 110 // 在文檔中存儲 111 .field("store", "yes") 112 // 建立索引 113 .field("index", "analyzed") 114 // 使用ik_smart進行分詞 115 .field("analyzer", "ik_smart").endObject().endObject().endObject(); 116 117 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer"); 118 // 管理索引(user_info)然后關聯type(user) 119 prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get(); 120 } 121 122 /** 123 * index這個屬性,no代表不建索引 124 * 125 * not_analyzed,建索引不分詞 126 * 127 * analyzed 即分詞,又建立索引 128 * 129 * expected [no],[not_analyzed] or [analyzed]。即可以選擇三者任意一個值 130 * 131 * @throws IOException 132 */ 133 134 @Test 135 public void elasticsearchSettingsPlayerMappings() throws IOException { 136 // 1:settings 137 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 138 // 分區的數量4 139 settings_map.put("number_of_shards", 4); 140 // 副本的數量1 141 settings_map.put("number_of_replicas", 1); 142 143 // 2:mappings 144 XContentBuilder builder = XContentFactory.jsonBuilder().startObject()// 145 .field("dynamic", "true").startObject("properties") 146 // 在文檔中存儲、 147 .startObject("id").field("type", "integer").field("store", "yes").endObject() 148 // 不分詞,但是建索引、 149 .startObject("name").field("type", "string").field("index", "not_analyzed").endObject() 150 // 151 .startObject("age").field("type", "integer").endObject() 152 // 153 .startObject("salary").field("type", "integer").endObject() 154 // 不分詞,但是建索引、 155 .startObject("team").field("type", "string").field("index", "not_analyzed").endObject() 156 // 不分詞,但是建索引、 157 .startObject("position").field("type", "string").field("index", "not_analyzed").endObject() 158 // 即分詞,又建立索引、 159 .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed") 160 .field("analyzer", "ik_smart").endObject() 161 // 即分詞,又建立索引、在文檔中存儲、 162 .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed") 163 .field("analyzer", "ik_smart").endObject() 164 165 .endObject() 166 167 .endObject(); 168 169 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player"); 170 prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get(); 171 } 172 }
5、使用java api操作Elasticsearch的增刪改查以及復雜查詢(聚合查詢,可以進行分組統計數量,分組統計最大值,分組統計平均值,等等統計)。
1 package com.bie.elasticsearch; 2 3 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 4 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; 5 6 import java.io.IOException; 7 import java.net.InetAddress; 8 import java.util.Date; 9 import java.util.Iterator; 10 import java.util.Map; 11 import java.util.Set; 12 13 import org.elasticsearch.action.ActionListener; 14 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; 15 import org.elasticsearch.action.delete.DeleteResponse; 16 import org.elasticsearch.action.get.GetResponse; 17 import org.elasticsearch.action.get.MultiGetItemResponse; 18 import org.elasticsearch.action.get.MultiGetResponse; 19 import org.elasticsearch.action.index.IndexResponse; 20 import org.elasticsearch.action.search.SearchRequestBuilder; 21 import org.elasticsearch.action.search.SearchResponse; 22 import org.elasticsearch.action.update.UpdateRequest; 23 import org.elasticsearch.action.update.UpdateResponse; 24 import org.elasticsearch.client.transport.TransportClient; 25 import org.elasticsearch.common.settings.Settings; 26 import org.elasticsearch.common.transport.InetSocketTransportAddress; 27 import org.elasticsearch.index.query.QueryBuilder; 28 import org.elasticsearch.index.query.QueryBuilders; 29 import org.elasticsearch.index.reindex.DeleteByQueryAction; 30 import org.elasticsearch.search.aggregations.Aggregation; 31 import org.elasticsearch.search.aggregations.AggregationBuilders; 32 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms; 34 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; 35 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; 36 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; 37 import org.elasticsearch.search.aggregations.metrics.max.InternalMax; 38 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; 39 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; 40 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; 41 import org.elasticsearch.transport.client.PreBuiltTransportClient; 42 import org.junit.Before; 43 import org.junit.Test; 44 45 /** 46 * 47 * @author biehl 48 * 49 */ 50 public class ElasticsearchCRUD { 51 52 private TransportClient client = null; 53 54 @SuppressWarnings("resource") 55 @Before 56 public void init() throws Exception { 57 // 設置集群名稱biehl01 58 Settings settings = Settings.builder().put("cluster.name", "biehl01") 59 // 自動感知的功能(可以通過當前指定的節點獲取所有es節點的信息) 60 .put("client.transport.sniff", true).build(); 61 // 創建client 62 client = new PreBuiltTransportClient(settings).addTransportAddresses( 63 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 64 // 9300), 65 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 66 // 9300), 67 // 建議指定2個及其以上的節點。 68 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 69 } 70 71 /** 72 * 創建一個Index索引、Type類型、以及id。 73 * 74 * 然后插入類型里面的數據。 75 * 76 * @throws IOException 77 */ 78 @Test 79 public void elasticsearchCreate() throws IOException { 80 IndexResponse response = client.prepareIndex("people", "student", "3") 81 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "男") 82 .field("birthday", new Date()).field("age", 21).field("message", "trying out Elasticsearch") 83 .endObject()) 84 .get(); 85 System.out.println(response.toString()); 86 } 87 88 /** 89 * 查找一條索引Index里面的類型Type里面的id的所有信息 90 * 91 * @throws IOException 92 */ 93 @Test 94 public void elasticsearchGet() throws IOException { 95 GetResponse response = client.prepareGet("people", "student", "1").get(); 96 System.out.println(response.getSourceAsString()); 97 } 98 99 /** 100 * 查找多條 101 * 102 * 索引Index里面的類型Type里面的多個id的所有信息 103 * 104 * @throws IOException 105 */ 106 @Test 107 public void elasticsearchMultiGet() throws IOException { 108 // 查詢出多個索引Index多個類型Type的多個id的所有信息 109 MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "1") 110 .add("people", "student", "2", "3").add("people", "teacher", "1").add("news", "fulltext", "1").get(); 111 // 將查詢出的結果遍歷輸出 112 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 113 // 將每一個查詢出的結果遍歷輸出 114 GetResponse response = itemResponse.getResponse(); 115 // 判斷如果存在就進行遍歷輸出 116 if (response.isExists()) { 117 String json = response.getSourceAsString(); 118 System.out.println(json); 119 } 120 } 121 } 122 123 /** 124 * 修改指定的索引Index里面的類型Type的id的信息 125 * 126 * @throws Exception 127 */ 128 @Test 129 public void elasticsearchUpdate() throws Exception { 130 // 創建一個更新的請求對象 131 UpdateRequest updateRequest = new UpdateRequest(); 132 // 指定索引Index 133 updateRequest.index("people"); 134 // 指定類型Type 135 updateRequest.type("student"); 136 // 指定id的值 137 updateRequest.id("3"); 138 // 設置修改的字段信息 139 updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject()); 140 // 開始進行修改,並且返回響應信息 141 UpdateResponse updateResponse = client.update(updateRequest).get(); 142 // 打印輸出響應的信息 143 System.out.println(updateResponse.toString()); 144 } 145 146 /** 147 * 刪除指定的索引Index里面的類型Type的id的信息 148 */ 149 @Test 150 public void elasticsearchDelete() { 151 // 指定刪除的id信息,並且給出響應結果 152 // prepareDelete(String index, String type, String id); 153 DeleteResponse response = client.prepareDelete("people", "student", "4").get(); 154 // 打印輸出的響應信息 155 System.out.println(response); 156 } 157 158 /** 159 * 根據查詢條件進行刪除數據 160 * 161 * 162 */ 163 @Test 164 public void elasticsearchDeleteByQuery() { 165 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) 166 // 指定查詢條件,matchQuery是name的值text里面包括了這個內容就進行刪除。默認使用標准分詞器。 167 .filter(QueryBuilders.matchQuery("username", "王五五")) 168 // 指定索引名稱 169 .source("people").get(); 170 // 獲取到刪除的個數 171 long deleted = response.getDeleted(); 172 // 打印輸出刪除的個數 173 System.out.println(deleted); 174 } 175 176 /** 177 * 異步刪除 178 * 179 * 監聽,如果真正刪除以后進行回調,打印輸出刪除確認的消息。 180 */ 181 @Test 182 public void elasticsearchDeleteByQueryAsync() { 183 DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", "男")) 184 .source("people").execute(new ActionListener<BulkByScrollResponse>() { 185 186 // 刪除以后的方法回調 187 @Override 188 public void onResponse(BulkByScrollResponse response) { 189 // 返回刪除的個數 190 long deleted = response.getDeleted(); 191 System.out.println("數據刪除完畢!"); 192 // 打印刪除的個數 193 System.out.println("數據刪除的個數: " + deleted); 194 } 195 196 @Override 197 public void onFailure(Exception e) { 198 // 失敗打印異常信息 199 e.printStackTrace(); 200 } 201 }); 202 203 // 先打印輸出,正常執行完畢。再執行異步監聽刪除數據。 204 try { 205 System.out.println("異步刪除操作!"); 206 // 休眠10秒鍾,避免主線程里面結束,子線程無法進行結果輸出 207 Thread.sleep(10000); 208 } catch (Exception e) { 209 e.printStackTrace(); 210 } 211 } 212 213 /** 214 * 215 * 按照范圍進行查找。 216 * 217 */ 218 @Test 219 public void elasticsearchRange() { 220 // includeLower(true).includeUpper(false)含義是包含前面,不包含后面的 221 // [21, 24) 222 QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false); 223 // 將查詢條件傳遞進去,並將查詢結果進行返回。 224 SearchResponse response = client.prepareSearch("people").setQuery(qb).get(); 225 System.out.println(response); 226 } 227 228 /** 229 * 230 * 向指定索引index里面的類型Type的id的信息 231 * 232 * @throws IOException 233 */ 234 @Test 235 public void elasticsearchAddPlayer() throws IOException { 236 // 237 IndexResponse response = client.prepareIndex("player", "basketball", "4") 238 239 .setSource(jsonBuilder().startObject() 240 241 .field("name", "安其拉") 242 243 .field("age", 28) 244 245 .field("salary", 99000) 246 247 .field("team", "啦啦隊 team") 248 249 .field("position", "打中鋒") 250 251 .field("description", "跪族藍孩") 252 253 .endObject()) 254 .get(); 255 256 System.out.println(response); 257 } 258 259 /** 260 * 261 * 262 * select team, count(*) as team_count from player group by team; 263 * 264 * team_counts是別名稱。 265 */ 266 @Test 267 public void elasticsearchAgg1() { 268 // 指定索引和type 269 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 270 // 按team分組然后聚合,但是並沒有指定聚合函數。 271 // team_count是別名稱 272 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team"); 273 // 添加聚合器 274 builder.addAggregation(teamAgg); 275 // 觸發 276 SearchResponse response = builder.execute().actionGet(); 277 // System.out.println(response); 278 // 將返回的結果放入到一個map中 279 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 280 // 遍歷打印輸出 281 Set<String> keys = aggMap.keySet(); 282 for (String key : keys) { 283 System.out.println("key: " + key); 284 } 285 286 System.out.println(""); 287 288 // //取出聚合屬性 289 StringTerms terms = (StringTerms) aggMap.get("team_count"); 290 291 // //依次迭代出分組聚合數據 292 for (Terms.Bucket bucket : terms.getBuckets()) { 293 // 分組的名字 294 String team = (String) bucket.getKey(); 295 // count,分組后一個組有多少數據 296 long count = bucket.getDocCount(); 297 System.out.println(team + ": " + count); 298 } 299 300 System.out.println(""); 301 302 // 使用Iterator進行遍歷迭代 303 Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator(); 304 while (teamBucketIt.hasNext()) { 305 Terms.Bucket bucket = teamBucketIt.next(); 306 // 獲取到分組后每組的組名稱 307 String team = (String) bucket.getKey(); 308 // 獲取到分組后的每組數量 309 long count = bucket.getDocCount(); 310 // 打印輸出 311 System.out.println(team + ": " + count); 312 } 313 } 314 315 /** 316 * 317 * select 318 * 319 * team, position, count(*) as pos_count 320 * 321 * from 322 * 323 * player 324 * 325 * group by 326 * 327 * team,position; 328 * 329 * 330 */ 331 @Test 332 public void elasticsearchAgg2() { 333 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 334 // 指定別名和分組的字段 335 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 336 TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position"); 337 // 添加兩個聚合構建器。先按照team分組,再按照position分組。 338 builder.addAggregation(teamAgg.subAggregation(posAgg)); 339 // 執行查詢 340 SearchResponse response = builder.execute().actionGet(); 341 // 將查詢結果放入map中 342 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 343 // 根據屬性名到map中查找 344 StringTerms teams = (StringTerms) aggMap.get("team_name"); 345 // 循環查找結果 346 for (Terms.Bucket teamBucket : teams.getBuckets()) { 347 // 先按球隊進行分組 348 String team = (String) teamBucket.getKey(); 349 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 350 StringTerms positions = (StringTerms) subAggMap.get("pos_count"); 351 // 因為一個球隊有很多位置,那么還要依次拿出位置信息 352 for (Terms.Bucket posBucket : positions.getBuckets()) { 353 // 拿到位置的名字 354 String pos = (String) posBucket.getKey(); 355 // 拿出該位置的數量 356 long docCount = posBucket.getDocCount(); 357 // 打印球隊,位置,人數 358 System.out.println(team + ": " + pos + ": " + docCount); 359 } 360 } 361 362 } 363 364 /** 365 * select team, max(age) as max_age from player group by team; 366 */ 367 @Test 368 public void elasticsearchAgg3() { 369 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 370 // 指定安球隊進行分組 371 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 372 // 指定分組求最大值 373 MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age"); 374 // 分組后求最大值 375 builder.addAggregation(teamAgg.subAggregation(maxAgg)); 376 // 查詢 377 SearchResponse response = builder.execute().actionGet(); 378 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 379 // 根據team屬性,獲取map中的內容 380 StringTerms teams = (StringTerms) aggMap.get("team_name"); 381 for (Terms.Bucket teamBucket : teams.getBuckets()) { 382 // 分組的屬性名 383 String team = (String) teamBucket.getKey(); 384 // 在將聚合后取最大值的內容取出來放到map中 385 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 386 // 取分組后的最大值 387 InternalMax ages = (InternalMax) subAggMap.get("max_age"); 388 // 獲取到年齡的值 389 double max = ages.getValue(); 390 // 打印輸出值 391 System.out.println(team + ": " + max); 392 } 393 } 394 395 /** 396 * select team, avg(age) as avg_age, sum(salary) as total_salary from player 397 * group by team; 398 */ 399 @Test 400 public void elasticsearchAgg4() { 401 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 402 // 指定分組字段 403 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team"); 404 // 指定聚合函數是求平均數據 405 AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age"); 406 // 指定另外一個聚合函數是求和 407 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 408 // 分組的聚合器關聯了兩個聚合函數 409 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg)); 410 // 查詢 411 SearchResponse response = builder.execute().actionGet(); 412 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 413 // 按分組的名字取出數據 414 StringTerms teams = (StringTerms) aggMap.get("team_name"); 415 for (Terms.Bucket teamBucket : teams.getBuckets()) { 416 // 獲取球隊名字 417 String team = (String) teamBucket.getKey(); 418 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 419 // 根據別名取出平均年齡 420 InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age"); 421 // 根據別名取出薪水總和 422 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 423 double avgAgeValue = avgAge.getValue(); 424 double totalSalaryValue = totalSalary.getValue(); 425 System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue); 426 } 427 } 428 429 /** 430 * select team, sum(salary) as total_salary from player group by team order by 431 * total_salary desc; 432 */ 433 @Test 434 public void elasticsearchAgg5() { 435 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 436 // 按team進行分組,然后指定排序規則 437 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team") 438 .order(Terms.Order.aggregation("total_salary ", true)); 439 // 指定一個聚合函數是求和 440 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 441 // 添加兩個聚合構建器。先按照team分組,再按照salary求和。 442 builder.addAggregation(termsAgg.subAggregation(sumAgg)); 443 // 查詢 444 SearchResponse response = builder.execute().actionGet(); 445 // 將查詢結果放入map中 446 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 447 // 從查詢結果中獲取到team_name的信息 448 StringTerms teams = (StringTerms) aggMap.get("team_name"); 449 // 開始遍歷獲取到的信息 450 for (Terms.Bucket teamBucket : teams.getBuckets()) { 451 // 獲取到key的值 452 String team = (String) teamBucket.getKey(); 453 // 獲取到求和的值 454 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 455 // 獲取到求和的值的信息 456 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 457 // 獲取到求和的值 458 double totalSalaryValue = totalSalary.getValue(); 459 // 打印輸出信息 460 System.out.println(team + " " + totalSalaryValue); 461 } 462 } 463 464 }
執行效果,自己可以分別進行測試。由於測試都寫了說明,這里就不一一進行測試打印效果了。請自行練習使用即可。
作者:別先生
博客園:https://www.cnblogs.com/biehongli/
如果您想及時得到個人撰寫文章以及著作的消息推送,可以掃描上方二維碼,關注個人公眾號哦。