使用 ElasticSearch Aggregations 進行統計分析


https://blog.csdn.net/zxjiayou1314/article/details/53837719/

 https://segmentfault.com/a/1190000017846551?utm_source=tag-newest

https://www.cnblogs.com/wangnanhui/articles/9638925.html

ElasticSearch 的特點隨處可見:基於 Lucene 的分布式搜索引擎,友好的 RESTful API……

大部分文章都圍繞 ELK Stack 和全文搜索展開,本文試圖用一個小案例來展示 ElasticSearch Aggregations 在統計分析的強大之處。

表單長這樣

image

需求:對回收的問卷進行統計,統計方式可能有:

  • 看每周/天/小時回收量(可以做成可視化的柱狀圖,人人都愛 Dashboard)
  • 以上需求加一個時間范圍(例如最近90天)
  • 在問題 1 中選擇 A 答案的用戶,其他答案的占比
  • 問題 1 選擇了 A 答案和問題 2 中選擇了 B 答案的用戶的其他回答占比

前兩個需求都是對文檔的根字段進行查詢,后面的都是對子文檔的字段進行搜索

可視化用了 Chart.js 和 Twitter Bootstrap;膠水語言么,自然是世界上最好的語(P)言(H)啦(P),安裝和啟動過程什么的太簡單就跳過了。

1. 初次見面

就像新人學習如何使用 Postgres 那樣,步驟如下:

  1. 創建一個 index(index 既是名詞,又是動詞,這里是名詞)
  2. 定義 mapping (相當於 schema)
  3. 使用 bulk 導入數據
  4. 查詢(ElasticSearch 的強大之處可在這里體現)

創建 index 和 定義 mapping

在 ElasticSearch 使用 index 的成本相當低,以下代碼在創建 index 時也同時指定了 mapping

代碼只展示關鍵部分(反正你們也不會去運行)

$client = Elasticsearch\ClientBuilder::create()->build();
$params = [
    'index' => 'your_awesome_data',
    'body' => [
        'mappings' => [
            'ur_radio_answers' => [
                'properties' => [
                    'answer_id' => [ #這里是字段名
                        'type' => 'string', #字段類型(不指定也行,elasticsearch 自己會猜)
                        'index' => 'not_analyzed' #告訴 elasticsearch,本字段不需要被分詞,需要完整的讀寫)
                    ],
                    'user_id' => ['type' => 'string', 'index' => 'not_analyzed'],
                    'questions' => [
                        'type' => 'nested',
                        'properties' => [
                            'page_id' => [
                                'type' => 'string',
                                'index' => 'not_analyzed'
                            ],
                            'question_id' => ['type' => 'string', 'index' => 'not_analyzed'],
                            'question' => ['type' => 'string', 'index' => 'not_analyzed'],
                            'option' => ['type' => 'string', 'index' => 'not_analyzed']
                        ]
                    ],
                    'start_at' => [
                        'type' => 'date',
                        'format' => 'yyyy-MM-dd HH:mm:ss'
                    ],
                    'ended_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss']
                ]
            ]
        ]
    ]
];

$client->indices()->create($params);

使用 bulk API 導入數據

這部分代碼沒啥好看,只要知道在批量導入數據的時候用 bulk API 就行了

bulk 是批量插入文檔的 API,一般是將幾千個 Document 一起插入(因為每插入一次就是一個 HTTP 請求)

$client = Elasticsearch\ClientBuilder::create()->build();
$connect = new mysqli('localhost', 'root', 'STUPIDPASSWORD', 'db');

$max = 823880;
$cursor = 1000;

while ($cursor < $max) {
    $result = $connect->query("select * from raw_answer_265033 where wd_oaid > {$cursor} order by wd_oaid asc limit 1000");
    $params = [];
    while ($obj = $result->fetch_array()) {
        $pages = json_decode($obj['wd_answer_json']);
        $answer = [
            'answer_id' => $obj['wd_oaid'],
            'user_id' => $obj['wd_uin'],
            'questions' => [],
            'ip' => $obj['wd_ip'],
            'start_at' => date('Y-m-d h:i:s', $obj['wd_starttime']),
            'ended_at' => date('Y-m-d h:i:s', $obj['wd_endtime'])
        ];
        foreach ($pages as $page) {
            foreach ($page->questions as $question) {
                foreach ($question->options as $option) {
                    if (isset($option->checked) && $option->checked == 1) {
                        $answer['questions'][] = [
                            'page_id' => $page->id,
                            'question_id' => $question->id,
                            'question' => trim(strip_tags(htmlspecialchars_decode($question->title))),
                            'option' => trim(strip_tags(htmlspecialchars_decode($option->text))),
                        ];
                    }
                }
            }
        }
        $cursor = $obj['wd_oaid'];
        $params['body'][] = [
            'index' => ['_index' => 'your_awesome_data', '_type' => 'your_awesome_data']
        ];
        $params['body'][] = $answer;
    }
    // 這里是重點
    $response = $client->bulk($params);
    $params = [];
}

經過上面膠水語言的拼裝,單個 Document 在入庫時是長這樣的:

{
    "answer_id": "192013",
    "user_id": "2971957289",
    "questions": [  #這里是一個數組,數量都不一樣;(在 ElasticSearch 中就是 Nested Document)
        {
            "page_id": "p-12-Y1cU",
            "question_id": "q-35-gJ9a",
            "question": "八月飄香香滿園(打一地名)",
            "option": "桂林"
        },
        {
            "page_id": "p-1-e8fe",
            "question_id": "q-4-irlF",
            "question": "遙知不是雪,為有暗香來(打一《紅樓夢》人名)",
            "option": "王作梅"
        },
        {
            "page_id": "p-2-8jI8",
            "question_id": "q-48-WG7d",
            "question": "單刀赴會 (打一《水滸傳》人名)",
            "option": "林沖"
        }
    ],
    "ip": "223.88.92.21",
    "start_at": "2016-02-21 12:02:01",
    "ended_at": "2016-02-21 13:18:15"
}

以下是返回結果, took 屬性是查詢耗時,這里的空白查詢花了 42ms,hits.total 表示有多少個 Document,這里有 82萬,表明我們剛才的批量插入成功了

{
    "took": 42,
    "timed_out": false,
    "_shards": { "total": 5, "successful": 5, "failed": 0 },
    "hits": { "total": 822880, "max_score": 1.0, "hits": [ #這里是搜索結果,省略了 ] }
}

查詢

好了,以上都只是准備工作,需求來了:

  • 沒有任何條件過濾,統計所有問題的各選項比例

這是查詢語句

{
    "aggs": {
        "answers": {
            "nested": {
                "path": "questions"
            },
            "aggs": {
                "questions": {
                    "terms": {
                        "field": "questions.question",
                        "size": 100,
                        "order": {
                            "_count": "desc"
                        }
                    },
                    "aggs": {
                        "options": {
                            "terms": {
                                "field": "questions.option",
                                "size": 100,
                                "order": {
                                    "_count": "desc"
                                }
                            }
                        }
                    }
                }
            }
        },
        "dates": {
            "date_histogram": {
                "field": "ended_at",
                "interval": "day",
                "min_doc_count": 0
            },
            "aggs": {
                "user_count": {
                    "cardinality": {
                        "field": "answer_id"
                    }
                }
            }
        }
    }
}

這是返回結果,只耗時 155ms,並且在一個請求內返回了兩個統計結果( dates 和 answers ))

下一段再介紹這個查詢用到的聚合

{
    "took": 155,
    "timed_out": false,
    "_shards": { "total": 5, "successful": 5, "failed": 0},
    "hits": {"total": 822880, "max_score": 0, "hits": []},
    "aggregations": {
        "dates": {
            "buckets": [
                {"key_as_string": "2016-02-22 00:00:00", "key": 1456099200000, "doc_count": 573855, "user_count": {"value": 613589}},
                {"key_as_string": "2016-02-23 00:00:00", "key": 1456185600000, "doc_count": 35533,  "user_count": {"value": 32221}}
                # 省略類似以上兩條的內容
            ]
        },
        "answers": {
            "doc_count": 2738528,
            "questions": {
                "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0,
                "buckets": [
                    {   "key": "千條線,萬條線, 掉到水里看不見(打一自然現象)",
                        "doc_count": 166145,
                        "options": {
                            "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0,
                            "buckets": [
                                {"key": "雨", "doc_count": 147481},
                                {"key": "雪", "doc_count": 11717},
                                {"key": "霧", "doc_count": 6947}
                            ]
                        }
                    },
                    {   "key": "細白嫩肉裹紫衣,霜兒一打不成器(打一蔬菜)",
                        "doc_count": 164585,
                        "options": {
                            "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0,
                            "buckets": [
                                {"key": "茄子", "doc_count": 136404},
                                {"key": "紫薯", "doc_count": 19811},
                                {"key": "蘿卜", "doc_count": 8370}
                            ]
                        }
                    },
                    {   "key": "八月飄香香滿園(打一地名)",
                        "doc_count": 164571,
                        "options": {
                            "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0,
                            "buckets": [
                                {"key": "桂林", "doc_count": 148744},
                                {"key": "廈門", "doc_count": 8963},
                                {"key": "青島", "doc_count": 6864}
                            ]
                        }
                    }
                    # 省略類似內容
                ]
            }
        }
    }
}

直接可視化就是下圖的樣子

image

改一下需求: 問題1選擇 A 選項的用戶是怎么選擇其他選項的?

這里只現實 query 部分,省略 aggs,以下是查詢

{
    "query": {
        "filtered": {
            "query": {
                "nested": {
                    "path": "questions",
                    "query": {
                        "bool": {
                            "must": [
                                {
                                    "term": {
                                        "questions.question": {
                                            "value": "千條線,萬條線, 掉到水里看不見(打一自然現象)"
                                        }
                                    }
                                },
                                {
                                    "term": {
                                        "questions.option": {
                                            "value": "雨"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            },
            "filter": {
                "and": [
                    {
                        "range": {
                            "ended_at": {
                                "from": "2016-02-14 00:00:00",
                                "to": "2016-03-15 23:59:59"
                            }
                        }
                    }
                ]
            }
        }
    },
    "aggs": {
        #
        .
        .
        .
    }
}

返回結果,耗時差不多,還是很快的

{
    "took": 63,
    "timed_out": false,
    "_shards": { "total": 5, "successful": 5, "failed": 0 },
    "hits": { "total": 147481, "max_score": 0, "hits": [ #... ] }
}

聚合

在 ElasticSearch 中,聚合分為兩種: Metrics 和 Bucket,上面的查詢里包含了這兩種聚合,分別展開說明

Metrics 直接計算出結果,類似 SQL 中的 sum(), min(), max(), avg(), count() 函數

Bucket 不像 Metrics 直接出指標,而且創建一堆桶(可以看到每個桶有多少數量的文檔),然后還可以再用 Sub-Aggregations 再聚合

Nested Aggregation

aggs.answers 用到了,這個聚合不出結果,只是告訴 ElasticSearch 某個字段是 Nested 的,然后再繼續進行聚合

Date Histogram Aggregation

例子中的 aggs.dates 就使用了 Date Histogram,這是最常用的聚合,只要數據中包含時間字段就可以使用這個聚合。有哪些使用場景?

  • 每月/周/日/時/分,不同周期內的數量,而且這個周期不一定是單周、單日,還可以是每2天,每3個小時 etc.
  • 某個時間點如果沒有數據, ElasticSearch 也能自動補充上這個時間點(count 為 0)

Terms Aggregation

aggs.answers.aggs.questions 中使用了兩次,相當於 SQL 的 group by,屬於 Bucket Aggregations

Cardinality Aggregation

相當於 SQL 的 count(distinct(FIELD)),屬於 Metrics Aggregations

*還有一個很重要的概念:聚合后再聚合 Sub-Aggregations *

像例子中的 aggs.answers.aggs.questions,就是先用題目進行聚合,然后再將答案聚合一次(見 aggs.answers.aggs.questions.options),如果不使用 Sub-Aggregations 就沒法講答案放在問題下了

2. 日常使用

在導入完數據后,常規維護有哪些呢?

  • 插入新的 Document,相當於 SQL 的 insert
  • 更新原有的 Document,相當於 SQL 的 update
  • 刪除 Document,也就是 SQL 的 delete

插入單個 Document (例如有用戶剛填完一份問卷)

以下都是從官方拷貝的例子

curl -XPUT 'localhost:9200/customer/external/1' -d '
{
  "name": "John Doe"
}'

更新原有的 Document

curl -XPOST 'localhost:9200/customer/external/1/_update' -d '
{
  "doc": { "name": "Jane Doe" }
}'

刪除 Document,沒有意外,如你所見,用的還是 DELETE 方法,很 RESTful

curl -XDELETE 'localhost:9200/customer/external/2'

常規的使用如果不更新字段,就跟使用 MySQL 差不多,沒有太大區別

總結

查詢時間

好了,這里是重點,實時計算真的很重要(否則要驗證一個想法的成本都很高),在 ElasticSearch 中,對幾百萬行進行搜索都能在幾十至幾百 ms 內完成

初次導入數據耗時

從 MySQL 讀取到全部塞進 ElasticSearch 花了 420秒(7分鍾),文檔結構簡單時能更加快(每秒幾萬)

空間占用

本例子中 Documents 有 360萬(子文檔也算一個),空間占用只有 434.4MB

其他

ElasticSearch 真的很快,尤其是在數據分析領域,請不要被它的名字上的 search 給騙了

在對幾百萬、幾千萬的數據能實時搜索和聚合,同時占用空間也不大,很輕松就能造一個窮人版的 Google Analytics

ElasticSearch 為啥這么快?IEG 前同事 @wentao 寫了一系列文章分享,強烈建議閱讀一下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM