Flink+Hologres億級用戶實時UV精確去重最佳實踐


簡介: Flink+Hologres億級用戶實時UV精確去重最佳實踐

UV、PV計算,因為業務需求不同,通常會分為兩種場景:

  • 離線計算場景:以T+1為主,計算歷史數據
  • 實時計算場景:實時計算日常新增的數據,對用戶標簽去重

針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支持超高基數UV計算(基於RoaringBitmap實現)

對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對用戶標簽去重。這樣的方式,可以較細粒度的實時得到用戶UV、PV數據,同時便於根據需求調整最小統計窗口(如最近5分鍾的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且通過簡單的聚合,也可以得到較大時間單位的統計結果。

主體思想

  1. Flink將流式數據轉化為表與維表進行JOIN操作,再轉化為流式數據。此舉可以利用Hologres維表的insertIfNotExists特性結合自增字段實現高效的uid映射。
  2. Flink把關聯的結果數據按照時間窗口進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap類型的字段中。
  3. 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap字段做or運算后並統計基數,即可得出對應用戶數。
  4. 處理流程如下圖所示

0.jpeg

 

方案最佳實踐

1.創建相關基礎表

1)創建表uid_mapping為uid映射表,用於映射uid到32位int類型。

  • RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續)。常見的業務系統或者埋點中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。映射表利用Hologres的SERIAL類型(自增的32位int)來實現用戶映射的自動管理和穩定映射。
  • 由於是實時數據, 設置該表為行存表,以提高Flink維表實時JOIN的QPS。
BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); --將uid設為clustering_key和distribution_key便於快速查找其對應的int32值 CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;

 

2)創建表dws_app為基礎聚合表,用於存放在基礎維度上聚合后的結果。

  • 使用RoaringBitmap前需要創建RoaringBitmap extention,同時也需要Hologres實例為0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
  • 為了更好性能,建議根據基礎聚合表數據量合理的設置Shard數,但建議基礎聚合表的Shard數設置不超過計算資源的Core數。推薦使用以下方式通過Table Group來設置Shard數
--新建shard數為16的Table Group, --因為測試數據量百萬級,其中后端計算資源為100core,設置shard數為16 BEGIN; CREATE TABLE tg16 (a int); --Table Group哨兵表 call set_table_property('tg16', 'shard_count', '16'); COMMIT;
  • 相比離線結果表,此結果表增加了時間戳字段,用於實現以Flink窗口周期為單位的統計。結果表DDL如下:
BEGIN; create table dws_app(  country text,  prov text,  city text,  ymd text NOT NULL, --日期字段  timetz TIMESTAMPTZ, --統計時間戳,可以實現以Flink窗口周期為單位的統計  uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv  primary key(country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重復插入數據 ); CALL set_table_property('public.dws_app', 'orientation', 'column'); --日期字段設為clustering_key和event_time_column,便於過濾 CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); --等價於將表放在shard數為16的table group call set_table_property('public.dws_app', 'colocate_with', 'tg16'); --group by字段設為distribution_key CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;

2.Flink實時讀取數據並更新dws_app基礎聚合表

完整示例源碼請見alibabacloud-hologres-connectors examples

1)Flink 流式讀取數據源(DataStream),並轉化為源表(Table)

//此處使用csv文件作為數據源,也可以是kafka等 DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html Table odsTable =  tableEnv.fromDataStream(  odsStream,  $("uid"),  $("country"),  $("prov"),  $("city"),  $("ymd"),  $("proctime").proctime()); // 注冊到catalog環境 tableEnv.createTemporaryView("odsTable", odsTable); 

2)將源表與Hologres維表(uid_mapping)進行關聯

其中維表使用insertIfNotExists參數,即查詢不到數據時自行插入,uid_int32字段便可以利用Hologres的serial類型自增創建。

// 創建Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入 String createUidMappingTable =  String.format(  "create table uid_mapping_dim("  + " uid string,"  + " uid_int32 INT"  + ") with ("  + " 'connector'='hologres',"  + " 'dbname' = '%s'," //Hologres DB名  + " 'tablename' = '%s',"//Hologres 表名  + " 'username' = '%s'," //當前賬號access id  + " 'password' = '%s'," //當前賬號access key  + " 'endpoint' = '%s'," //Hologres endpoint  + " 'insertifnotexists'='true'"  + ")",  database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // 源表與維表join String odsJoinDim =  "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"  + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"  + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);

 

3)將關聯結果轉化為DataStream,通過Flink時間窗口處理,結合RoaringBitmap進行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =  source  // 篩選需要統計的維度(country, prov, city, ymd)  .keyBy(0, 1, 2, 3)  // 滾動時間窗口;此處由於使用讀取csv模擬輸入流,采用ProcessingTime,實際使用中可使用EventTime  .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))  // 觸發器,可以在窗口未結束時獲取聚合結果  .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))  .aggregate(  // 聚合函數,根據key By篩選的維度,進行聚合  new AggregateFunction<  Tuple5<String, String, String, String, Integer>,  RoaringBitmap,  RoaringBitmap>() {  @Override  public RoaringBitmap createAccumulator() {  return new RoaringBitmap();  }  @Override  public RoaringBitmap add(  Tuple5<String, String, String, String, Integer> in,  RoaringBitmap acc) {  // 將32位的uid添加到RoaringBitmap進行去重  acc.add(in.f4);  return acc;  }  @Override  public RoaringBitmap getResult(RoaringBitmap acc) {  return acc;  }  @Override  public RoaringBitmap merge(  RoaringBitmap acc1, RoaringBitmap acc2) {  return RoaringBitmap.or(acc1, acc2);  }  },  //窗口函數,輸出聚合結果  new WindowFunction<  RoaringBitmap,  Tuple6<String, String, String, String, Timestamp, byte[]>,  Tuple,  TimeWindow>() {  @Override  public void apply(  Tuple keys,  TimeWindow timeWindow,  Iterable<RoaringBitmap> iterable,  Collector<  Tuple6<String, String, String, String, Timestamp, byte[]>> out)  throws Exception {  RoaringBitmap result = iterable.iterator().next();  // 優化RoaringBitmap  result.runOptimize();  // 將RoaringBitmap轉化為字節數組以存入Holo中  byte[] byteArray = new byte[result.serializedSizeInBytes()];  result.serialize(ByteBuffer.wrap(byteArray));  // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為周期進行統計,以秒為單位  out.collect(  new Tuple6<>(  keys.getField(0),  keys.getField(1),  keys.getField(2),  keys.getField(3),  new Timestamp(  timeWindow.getEnd() / 1000 * 1000),  byteArray));  }  });

 

4)寫入結果表

需要注意的是,Hologres中RoaringBitmap類型在Flink中對應Byte數組類型

// 計算結果轉換為表 Table resTable =  tableEnv.fromDataStream(  processedSource,  $("country"),  $("prov"),  $("city"),  $("ymd"),  $("timest"),  $("uid32_bitmap")); // 創建Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入 String createHologresTable =  String.format(  "create table sink("  + " country string,"  + " prov string,"  + " city string,"  + " ymd string,"  + " timetz timestamp,"  + " uid32_bitmap BYTES"  + ") with ("  + " 'connector'='hologres',"  + " 'dbname' = '%s',"  + " 'tablename' = '%s',"  + " 'username' = '%s',"  + " 'password' = '%s',"  + " 'endpoint' = '%s',"  + " 'connectionSize' = '%s',"  + " 'mutatetype' = 'insertOrReplace'"  + ")",  database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // 寫入計算結果到dws表 tableEnv.executeSql("insert into sink select * from " + resTable);

3.數據查詢

查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的用戶數

  • 查詢某天內各個城市的uv
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好 set hg_experimental_enable_force_three_stage_agg=off  SELECT country  ,prov  ,city  ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country  ,prov  ,city ;

 

  • 查詢某段時間內各個省份的uv
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好 set hg_experimental_enable_force_three_stage_agg=off  SELECT country  ,prov  ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country  ,prov ;


原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM