輕量級OLAP(二):Hive + Elasticsearch


1. 引言

在做OLAP數據分析時,常常會遇到過濾分析需求,比如:除去只有性別、常駐地標簽的用戶,計算廣告媒體上的覆蓋UV。OLAP解決方案Kylin不支持復雜數據類型(array、struct、map),要求數據輸入Schema必須是平鋪的,但是平鋪后丟失了用戶的聚合標簽信息,而沒有辦法判斷某一個用戶是否只有性別、常駐地標簽。顯然,我們需要一種支持復雜數據類型的OLAP數據庫;底層為Lucene的Elasticsearch正在向OLAP融合,騰訊內部已經用基於Lucene的分析數據庫Hermes來做多維數據分析。

Elasticsearch(ES)在設計之初是用來做全文檢索的搜索引擎,但隨着倒排索引所表現出來優秀的查詢性能,有越來越多人拿它做分析數據庫使。可將ES視作文檔型NoSQL數據庫,一般情況下將具有相同schema的文檔(document)歸屬於一個type,所有的文檔存儲於某一個index;ES與RDBMS的概念對比如下:

Relational DB ⇒ Databases ⇒ Tables ⇒ Rows ⇒ Columns
Elasticsearch ⇒ Indices ⇒ Types ⇒ Documents ⇒ Fields

2. 寫數據

廣告日志與標簽數據均落在Hive表,並且ES官方提供與Hive的集成。因此,我們首選用Hive向ES寫數據。首先,采用ES做OLAP分析引擎,創建表如下:

add jar /path/elasticsearch-hadoop-2.3.1.jar;

create external table ad_tag (
  dvc string, 
  medias array < string >, 
  c1_arr array < string >, 
  week_time string
) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' tblproperties(
  'es.nodes' = '<ip1>:9200,<ip2>:9200', 
  'es.resource' = 'ad-{week_time}/tag', 
  'es.mapping.exclude' = 'week_time'
);

在設計Hive表結構時,ES的計算UV的distinct count(cardinality)存在着計算誤差;因此,我們按dvc對其他字段做了聚合,UV的計算轉換成了ES doc命中數。其中,es.nodes表示ES的節點,只需配置一個節點即可;es.resource對應於ES的Index/Type;es.mapping.exclude在寫ES時不會被索引的字段。因我們只有寫操作而沒有通過Hive查詢ES數據,因此並沒有設置es.query。Hive向ES寫數據如下:

set hive.map.aggr = false;

insert overwrite table ad_tag 
select 
  media, 
  a.dvc as dvc, 
  case when c1_arr is null then array('empty') else c1_arr end as c1_arr, 
  '2016-10-08' as week_time 
from 
  (
    select 
      dvc, 
      app_name as media 
    from 
      ad_log 
    where 
      is_exposure = '1' 
      and day_time between date_sub('2016-10-08', 6) 
      and '2016-10-08' 
    group by 
      dvc, 
      app_name
  ) a 
  left outer join (
    select 
      dvc, 
      collect_set(c1) as c1_arr 
    from 
      tag lateral view inline(tag) in_tb 
    where 
      day_time = '2016-10-08' 
    group by 
      dvc
  ) b on a.dvc = b.dvc;

在寫ES時,在構建索引時不需要分詞,通過PUT index template方式實現之:

{
  "template": "ad*",
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "string_template": {
            "mapping": {
              "include_in_all": false,
              "index": "not_analyzed",
              "type": "string",
              "index_options": "docs"
            },
            "match": "*"
          }
        }
      ]
    }
  }
}

3. 多維分析

ES官方的查詢語言是DSL,主要分為兩類:

  • Query,相當於SQL中的where部分,可套用filter、match等;
  • Aggregation,相當於SQL中的group by部分,在aggs內部也可以套用filter。

DSL可以嵌套,表達異常復雜的查詢操作;但是,若以字符串拼接的方式實現DSL,則顯得可維護性太差。因此,官方提供了elasticsearch-dsl-py,可以將DSL等同於一段Python代碼。我們的多維分析器便是基於此實現的(Python 3.5 + elasticsearch_dsl 2.1.0)

整體上曝光UV、有標簽的UV、除去常用標簽UV,以及每一個媒體上曝光UV、有標簽的UV、除去常用標簽UV的分析(相當於group by media with cube):

client = Elasticsearch(['<host1>'], port=20009, timeout=50)


def per_media(index_name):
    """count(distinct dvc) group by media with cube"""
    ms = MultiSearch(using=client, index=index_name)
    all_doc = Search()
    all_doc.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    tagged = Search().query('filtered', filter=~Q('term', c1_arr='empty'))
    tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    useful = Search().query('filtered', filter=~Q('term', c1_arr='empty') & Q('script',
                                                                              script="""['常駐地', '性別'].intersect(doc['c1_arr'].values).size() < doc['c1_arr'].values.size()"""))
    useful.aggs.bucket('per_media', 'terms', field='medias', size=1000)
    ms = ms.add(all_doc)
    ms = ms.add(tagged)
    ms = ms.add(useful)
    responses = ms.execute()
    result_list = []
    result_dict = defaultdict(lambda: [])
    for resp in responses:  # get per media uv(all, tagged, useful_tagged)
        print("Query %d: %r." % (responses.index(resp), resp.search.to_dict()))
        result_list.append(resp.hits.total)
        for buck in resp.aggregations['per_media']['buckets']:
            result_dict[buck['key']].append(buck['doc_count'])
    for k, v in result_dict.items():  # fill up default value 0
        if len(v) < 3:
            result_dict[k] = v + [0] * (3 - len(v))
    return result_list, result_dict

媒體與標簽組合維度下的UV統計:

def per_media_c1(index_name):
    """return {(media, c1) -> tagged_uv}"""
    s = Search(using=client, index=index_name)
    tagged = s.query('filtered', filter=~Q('term', c1_arr='empty'))
    tagged.aggs.bucket('per_media', 'terms', field='medias', size=1000) \
        .bucket('per_c1', 'terms', field='c1_arr', size=100)
    result = {}
    response = tagged.execute()
    for buck in response.aggregations['per_media']['buckets']:
        key = buck['key']
        for b in buck['per_c1']['buckets']:
            result[(key, b['key'])] = b['doc_count']
    return result


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM