ElasticSearch聚合


前言

說完了ES的索引與檢索,接着再介紹一個ES高級功能API – 聚合(Aggregations),聚合功能為ES注入了統計分析的血統,使用戶在面對大數據提取統計指標時變得游刃有余。同樣的工作,你在Hadoop中可能需要寫mapreduce或Hive,在mongo中你必須得用大段的mapreduce腳本,而在ES中僅僅調用一個API就能實現了。

開始之前,提醒老司機們注意,ES原有的聚合功能Facets在新版本中將被正式被移除,抓緊時間用Aggregations替換Facets吧。Facets真的很慢!

1 關於Aggregations

Aggregations的部分特性類似於SQL語言中的group by,avg,sum等函數。但Aggregations API還提供了更加復雜的統計分析接口。

掌握Aggregations需要理解兩個概念:

  • 桶(Buckets):符合條件的文檔的集合,相當於SQL中的group by。比如,在users表中,按“地區”聚合,一個人將被分到北京桶或上海桶或其他桶里;按“性別”聚合,一個人將被分到男桶或女桶
  • 指標(Metrics):基於Buckets的基礎上進行統計分析,相當於SQL中的count,avg,sum等。比如,按“地區”聚合,計算每個地區的人數,平均年齡等

對照一條SQL來加深我們的理解:

SELECT COUNT(color) FROM table GROUP BY color

GROUP BY相當於做分桶的工作,COUNT是統計指標。

下面介紹一些常用的Aggregations API。

2 Metrics

2.1 AVG
2.2 Cardinality
2.3 Stats
2.4 Extended Stats
2.5 Percentiles
2.6 Percentile Ranks

3 Bucket


3.1 Filter
3.2 Range
3.3 Missing
3.4 Terms
3.5 Date Range
3.6 Global Aggregation
3.7 Histogram
3.8 Date Histogram
3.9 IPv4 range
3.10 Return only aggregation results

4 聚合緩存

ES中經常使用到的聚合結果集可以被緩存起來,以便更快速的系統響應。這些緩存的結果集和你掠過緩存直接查詢的結果是一樣的。因為,第一次聚合的條件與結果緩存起來后,ES會判斷你后續使用的聚合條件,如果聚合條件不變,並且檢索的數據塊未增更新,ES會自動返回緩存的結果。

注意聚合結果的緩存只針對size=0的請求(參考3.10章節),還有在聚合請求中使用了動態參數的比如Date Range中的now(參考3.5章節),ES同樣不會緩存結果,因為聚合條件是動態的,即使緩存了結果也沒用了。

 

先加入幾條index數據,如下:

復制代碼
curl -XPUT 'localhost:9200/testindex/orders/2?pretty' -d '{
    "zone_id": "1",
    "user_id": "100008",
    "try_deliver_times": 102,
    "trade_status": "TRADE_FINISHED",
    "trade_no": "xiaomi.21142736250938334726",
    "trade_currency": "CNY",
    "total_fee": 100,
    "status": "paid",
    "sdk_user_id": "69272363",
    "sdk": "xiaomi",
    "price": 1,
    "platform": "android",
    "paid_channel": "unknown",
    "paid_at": 1427370289,
    "market": "unknown",
    "location": "local",
    "last_try_deliver_at": 1427856948,
    "is_guest": 0,
    "id": "fa6044d2fddb15681ea2637335f3ae6b7f8e76fef53bd805108a032cb3eb54cd",
    "goods_name": "一小堆元寶",
    "goods_id": "ID_001",
    "goods_count": "1",
    "expires_in": 2592000,
    "delivered_at": 0,
    "debug_mode": true,
    "created_at": 1427362509,
    "cp_result": "exception encountered",
    "cp_order_id": "cp.order.id.test",
    "client_id": "9c98152c6b42c7cb3f41b53f18a0d868",
    "app_user_id": "fvu100006"
}'
復制代碼

1、單值聚合

  Sum求和,dsl參考如下:

復制代碼
[sfapp@cmos1 ekfile]$ curl  'http://10.202.11.117:9200/testindex/orders/_search?pretty' -d '
> {
>   "size": 0,
>   "aggs": {
>     "return_expires_in": {
>       "sum": {
>         "field": "expires_in"
>       }
>     }
>   }
> }'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "return_expires_in" : {
      "value" : 5184000.0
    }
  }
}
[sfapp@cmos1 ekfile]$ 
復制代碼

返回expires_in之和,其中size=0 表示不需要返回參與查詢的文檔。

Min求最小值

復制代碼
[sfapp@cmos1 ekfile]$ curl  'http://10.202.11.117:9200/testindex/orders/_search?pretty' -d '
> {
>   "size": 0,
>   "aggs": {
>     "return_min_expires_in": {
>       "min": {
>         "field": "expires_in"
>       }
>     }
>   }
> }'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "return_min_expires_in" : {
      "value" : 2592000.0
    }
  }
}
[sfapp@cmos1 ekfile]$
復制代碼

Max求最大值

復制代碼
[sfapp@cmos1 ekfile]$ curl  'http://10.202.11.117:9200/testindex/orders/_search?pretty' -d '
> {
>   "size": 0,
>   "aggs": {
>     "return_max_expires_in": {
>       "max": {
>         "field": "expires_in"
>       }
>     }
>   }
> }'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "return_max_expires_in" : {
      "value" : 2592000.0
    }
  }
}
[sfapp@cmos1 ekfile]$ 
復制代碼

 

AVG求平均值

復制代碼
[sfapp@cmos1 ekfile]$ curl  'http://10.202.11.117:9200/testindex/orders/_search?pretty' -d '
> {
>   "size": 0,
>   "aggs": {
>     "return_avg_expires_in": {
>       "avg": {
>         "field": "expires_in"
>       }
>     }
>   }
> }'
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "return_avg_expires_in" : {
      "value" : 2592000.0
    }
  }
}
[sfapp@cmos1 ekfile]$ 
復制代碼

 

Cardinality 求基數(如下示例,查找性別的基數 M、F,共兩個)

復制代碼
復制代碼
{
  "size": 0,
  "aggs": {
    "return_cardinality": {
      "cardinality": {
        "field": "gender"
      }
    }
  }
}
復制代碼
復制代碼

結果為:

2、多值聚合

 

percentiles 求百分比

查看官方文檔時候,沒看懂,下面是自己測試時的例子,按照性別(F,M)查看工資范圍的百分比

復制代碼
復制代碼
{
  "size": 0,
  "aggs": {
    "states": {
      "terms": {
        "field": "gender"
      },
      "aggs": {
        "banlances": {
          "percentile_ranks": {
            "field": "balance",
            "values": [
              20000,
              40000
            ]
          }
        }
      }
    }
  }
復制代碼
復制代碼

結果:

 

 

 

stats 統計

查看balance的統計情況:

復制代碼
復制代碼
{
  "size": 0,
  "aggs": {
    "balance_stats": {
      "stats": {
        "field": "balance"
      }
    }
  }
}
復制代碼
復制代碼

返回結果:

extended_stats 擴展統計

復制代碼
復制代碼
{
  "size": 0,
  "aggs": {
    "balance_stats": {
      "extended_stats": {
        "field": "balance"
      }
    }
  }
}
復制代碼
復制代碼

結果:

 

更加復雜的查詢,后續慢慢在實踐中道來。

Terms聚合

記錄有多少F,多少M

 

按 Ctrl+C 復制代碼
按 Ctrl+C 復制代碼

 返回結果如下:m記錄507條,f記錄493條

 

 

數據的不確定性

使用terms聚合,結果可能帶有一定的偏差與錯誤性。

比如:

我們想要獲取name字段中出現頻率最高的前5個。

此時,客戶端向ES發送聚合請求,主節點接收到請求后,會向每個獨立的分片發送該請求。
分片獨立的計算自己分片上的前5個name,然后返回。當所有的分片結果都返回后,在主節點進行結果的合並,再求出頻率最高的前5個,返回給客戶端。

這樣就會造成一定的誤差,比如最后返回的前5個中,有一個叫A的,有50個文檔;B有49。 但是由於每個分片獨立的保存信息,信息的分布也是不確定的。 有可能第一個分片中B的信息有2個,但是沒有排到前5,所以沒有在最后合並的結果中出現。 這就導致B的總數少計算了2,本來可能排到第一位,卻排到了A的后面。

size與shard_size

為了改善上面的問題,就可以使用size和shard_size參數。

  • size參數規定了最后返回的term個數(默認是10個)
  • shard_size參數規定了每個分片上返回的個數
  • 如果shard_size小於size,那么分片也會按照size指定的個數計算

通過這兩個參數,如果我們想要返回前5個,size=5;shard_size可以設置大於5,這樣每個分片返回的詞條信息就會增多,相應的誤差幾率也會減小。

 

order排序

 

order指定了最后返回結果的排序方式,默認是按照doc_count排序。

 

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "field" : "gender",
                "order" : { "_count" : "asc" }
            }
        }
    }
}
復制代碼
復制代碼

 

 

也可以按照字典方式排序:

 

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "field" : "gender",
                "order" : { "_term" : "asc" }
            }
        }
    }
}
復制代碼
復制代碼

 

 

當然也可以通過order指定一個單值聚合,來排序。

 

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "field" : "gender",
                "order" : { "avg_balance" : "desc" }
            },
            "aggs" : {
                "avg_balance" : { "avg" : { "field" : "balance" } }
            }
        }
    }
}
復制代碼
復制代碼

 

 

同時也支持多值聚合,不過要指定使用的多值字段:

 

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "field" : "gender",
                "order" : { "balance_stats.avg" : "desc" }
            },
            "aggs" : {
                "balance_stats" : { "stats" : { "field" : "balance" } }
            }
        }
    }
}
復制代碼
復制代碼

返回結果:

 

 

min_doc_count與shard_min_doc_count

聚合的字段可能存在一些頻率很低的詞條,如果這些詞條數目比例很大,那么就會造成很多不必要的計算。
因此可以通過設置min_doc_count和shard_min_doc_count來規定最小的文檔數目,只有滿足這個參數要求的個數的詞條才會被記錄返回。

通過名字就可以看出:

  • min_doc_count:規定了最終結果的篩選
  • shard_min_doc_count:規定了分片中計算返回時的篩選

script

桶聚合也支持腳本的使用:

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "script" : "doc['gender'].value"
            }
        }
    }
}
 
復制代碼
復制代碼

 

以及外部腳本文件:

 

復制代碼
復制代碼
{
    "aggs" : {
        "genders" : {
            "terms" : {
                "script" : {
                    "file": "my_script",
                    "params": {
                        "field": "gender"
                    }
                }
            }
        }
    }
}
 
復制代碼
復制代碼

 

filter

filter字段提供了過濾的功能,使用兩種方式:include可以匹配出包含該值的文檔,exclude則排除包含該值的文檔。
例如:

{
    "aggs" : { "tags" : { "terms" : { "field" : "tags", "include" : ".*sport.*", "exclude" : "water_.*" } } } }

 

上面的例子中,最后的結果應該包含sport並且不包含water。
也支持數組的方式,定義包含與排除的信息:

 

{
    "aggs" : { "JapaneseCars" : { "terms" : { "field" : "make", "include" : ["mazda", "honda"] } }, "ActiveCarManufacturers" : { "terms" : { "field" : "make", "exclude" : ["rover", "jensen"] } } } }

 

多字段聚合

 

通常情況,terms聚合都是僅針對於一個字段的聚合。因為該聚合是需要把詞條放入一個哈希表中,如果多個字段就會造成n^2的內存消耗。

 

不過,對於多字段,ES也提供了下面兩種方式:

 

  • 1 使用腳本合並字段
  • 2 使用copy_to方法,合並兩個字段,創建出一個新的字段,對新字段執行單個字段的聚合。

 

collect模式

 

對於子聚合的計算,有兩種方式:

 

  • depth_first 直接進行子聚合的計算
  • breadth_first 先計算出當前聚合的結果,針對這個結果在對子聚合進行計算。

 

默認情況下ES會使用深度優先,不過可以手動設置成廣度優先,比如:

 

{
    "aggs" : { "actors" : { "terms" : { "field" : "actors", "size" : 10, "collect_mode" : "breadth_first" }, "aggs" : { "costars" : { "terms" : { "field" : "actors", "size" : 5 } } } } } }

 

缺省值Missing value

 

缺省值指定了缺省的字段的處理方式:

 

{
    "aggs" : { "tags" : { "terms" : { "field" : "tags", "missing": "N/A" } } } }


聚合的桶操作和度量的完整用法可以在 Elasticsearch 參考 中找到。本章中會涵蓋其中很多內容,但在閱讀完本章后查看它會有助於我們對它的整體能力有所了解。

所以讓我們先看一個例子。我們將會創建一些對汽車經銷商有用的聚合,數據是關於汽車交易的信息:車型、制造商、售價、何時被出售等。

首先我們批量索引一些數據:

POST /cars/transactions/_bulk
{ "index": {}}
{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }
{ "index": {}}
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" }
{ "index": {}}
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }
{ "index": {}}
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" }

有了數據,開始構建我們的第一個聚合。汽車經銷商可能會想知道哪個顏色的汽車銷量最好,用聚合可以輕易得到結果,用 terms 桶操作:

GET /cars/transactions/_search
{
    "size" : 0,
    "aggs" : { 
        "popular_colors" : { 
            "terms" : { 
              "field" : "color"
            }
        }
    }
}

聚合操作被置於頂層參數 aggs 之下(如果你願意,完整形式 aggregations 同樣有效)。

然后,可以為聚合指定一個我們想要名稱,本例中是: popular_colors 。

最后,定義單個桶的類型 terms 。

聚合是在特定搜索結果背景下執行的, 這也就是說它只是查詢請求的另外一個頂層參數(例如,使用 /_search 端點)。 聚合可以與查詢結對,但我們會晚些在 限定聚合的范圍(Scoping Aggregations) 中來解決這個問題。

注意

可能會注意到我們將 size 設置成 0 。我們並不關心搜索結果的具體內容,所以將返回記錄數設置為 0 來提高查詢速度。 設置 size: 0 與 Elasticsearch 1.x 中使用 count 搜索類型等價。

然后我們為聚合定義一個名字,名字的選擇取決於使用者,響應的結果會以我們定義的名字為標簽,這樣應用就可以解析得到的結果。

隨后我們定義聚合本身,在本例中,我們定義了一個單 terms 桶。 這個 terms 桶會為每個碰到的唯一詞項動態創建新的桶。 因為我們告訴它使用 color 字段,所以 terms 桶會為每個顏色動態創建新桶。

讓我們運行聚合並查看結果:

{
...
   "hits": {
      "hits": [] 
   },
   "aggregations": {
      "popular_colors": { 
         "buckets": [
            {
               "key": "red", 
               "doc_count": 4 
            },
            {
               "key": "blue",
               "doc_count": 2
            },
            {
               "key": "green",
               "doc_count": 2
            }
         ]
      }
   }
}

因為我們設置了 size 參數,所以不會有 hits 搜索結果返回。

popular_colors 聚合是作為 aggregations 字段的一部分被返回的。

每個桶的 key 都與 color 字段里找到的唯一詞對應。它總會包含 doc_count 字段,告訴我們包含該詞項的文檔數量。

每個桶的數量代表該顏色的文檔數量。

響應包含多個桶,每個對應一個唯一顏色(例如:紅 或 綠)。每個桶也包括 聚合進 該桶的所有文檔的數量。例如,有四輛紅色的車。

前面的這個例子完全是實時執行的:一旦文檔可以被搜到,它就能被聚合。這也就意味着我們可以直接將聚合的結果源源不斷的傳入圖形庫,然后生成實時的儀表盤。 不久,你又銷售了一輛銀色的車,我們的圖形就會立即動態更新銀色車的統計信息。

瞧!這就是我們的第一個聚合!

java代碼實現:

①、實例化es

 

private static TransportClient client;
static {
try {
String esClusterName = "shopmall-es";
List<String> clusterNodes = Arrays.asList("http://172.16.32.69:9300","http://172.16.32.48:9300");
Settings settings = Settings.builder().put("cluster.name", esClusterName).build();
client = new PreBuiltTransportClient(settings);
for (String node : clusterNodes) {
URI host = URI.create(node);
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.getHost()), host.getPort()));
}
} catch (Exception e) {
log.error("elasticsearchUtils init exception", e);
}
}

public static void close() {
client.close();
}

②、度量聚合實現

 

 

/**
* Description:指標聚合查詢,COUNT(color) ,min ,max,sum等相當於指標

* @author wangweidong
* CreateTime: 2017年11月9日 上午10:43:18

* 數據格式:{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }

* 1、可能會注意到我們將 size 設置成 0 。我們並不關心搜索結果的具體內容,所以將返回記錄數設置為 0 來提高查詢速度。 設置 size: 0 與 Elasticsearch 1.x 中使用 count 搜索類型等價。

* 2、對text 字段上的腳本進行排序,聚合或訪問值時,出現Fielddata is disabled on text fields by default. Set fielddata=true on [color] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memory.
* Fielddata默認情況下禁用文本字段,因為Fielddata可以消耗大量的堆空間,特別是在加載高基數text字段時。一旦fielddata被加載到堆中,它將在該段的生命周期中保持在那里。此外,加載fielddata是一個昂貴的過程,可以導致用戶體驗延遲命中。
* 可以使用使用該my_field.keyword字段進行聚合,排序或腳本或者啟用fielddata(不建議使用)
*/
@Test
public void metricsAggregation() {
try {
String index = "cars";
String type = "transactions";

SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
// MaxAggregationBuilder field = AggregationBuilders.max("max_price").field("price");
//MinAggregationBuilder 統計最小值
// MinAggregationBuilder field = AggregationBuilders.min("min_price").field("price");
//SumAggregationBuilder 統計合計
// SumAggregationBuilder field = AggregationBuilders.sum("sum_price").field("price");
//StatsAggregationBuilder 統計聚合即一次性獲取最小值、最小值、平均值、求和、統計聚合的集合。
StatsAggregationBuilder field = AggregationBuilders.stats("stats_price").field("price");
searchRequestBuilder.addAggregation(field);
searchRequestBuilder.setSize(0);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

System.out.println(searchResponse.toString());
Aggregations agg = searchResponse.getAggregations();
if(agg == null) {
return;
}

// Max max = agg.get("max_price");
// System.out.println(max.getValue());

// Min min = agg.get("min_price");
// System.out.println(min.getValue());

// Sum sum = agg.get("sum_price");
// System.out.println(sum.getValue());

Stats stats = agg.get("stats_price");
System.out.println("最大值:"+stats.getMax());
System.out.println("最小值:"+stats.getMin());
System.out.println("平均值:"+stats.getAvg());
System.out.println("合計:"+stats.getSum());
System.out.println("總條數:"+stats.getCount());

} catch (Exception e) {
e.printStackTrace();
}
}

③、桶聚合實現

 

/**
* Description:桶聚合查詢,GROUP BY相當於桶

* @author wangweidong
* CreateTime: 2017年11月9日 下午3:47:54
*
*/
@Test
public void bucketsAggregation() {
String index = "cars";
String type = "transactions";
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);

TermsAggregationBuilder field = AggregationBuilders.terms("popular_colors").field("color.keyword");
searchRequestBuilder.addAggregation(field);
searchRequestBuilder.setSize(0);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

System.out.println(searchResponse.toString());

Terms genders = searchResponse.getAggregations().get("popular_colors");
for (Terms.Bucket entry : genders.getBuckets()) {
Object key = entry.getKey(); // Term
Long count = entry.getDocCount(); // Doc count

System.out.println(key);
System.out.println(count);
}
}

文章參考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/_aggregation_test_drive.html#_aggregation_test_drive

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM