數倉場景下,經常有兩個表 Join 后往一個寬表寫數據。比如:埋點數據中只包含 user_id,但是下游計算的需要使用用戶的其他屬性,就需要將埋點數據和用戶數據做關聯。
實時場景,需要做流表 Join,用埋點數據的 user_id 去全量用戶表中取用戶屬性。
如果兩部分數據產生的順序不確定,可能先生成A,也可能先生成B,並且先后的時間范圍也不一定,可能是一起生成,也可能隔三五天。
兩部分數據生成的時間間隔不確定,在 Flink 中無法使用 Interval join,如果用流表 Join 也可能實現 Join 的功能,但是比較麻煩。
在這種場景下,可以基於 Hbase 的主鍵覆蓋策略將兩部分數據分別寫入 Hbase 表中,由於兩部分數據都包含 user_id,直接起兩個任務將兩部分的數據以 user_id 做主鍵,直接寫入 Hbase 表,兩部分數據直接寫自己對應的字段就可以達到 Join 的效果了。
(這是之前我們遇到的問題,雙流Join的場景,A Left Join B 流(Interval Join)需要 Join 的任務中,如果沒有關聯上 B 流的數據,不覆蓋了表中的對應列(一個單獨的任務將B流的數據直接寫入到對應Hbase 的 對應字段))
----- 原諒我表達能力不行,就是想改寫 Hbase sink 源碼,寫入 Hbase 的時候忽略空列 ----
先看下測試環境:
Flink 1.13.2
Hadoop 3.1.1
Hbase 2.2.7
## 測試
看下測試 SQL
-- kafka source CREATE TABLE user_log ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior1' ,'properties.bootstrap.servers' = 'localhost:9092' ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'latest-offset' ,'format' = 'json' ); drop table if exists hbase_user_log_sink ; CREATE TABLE hbase_user_log_sink ( user_id STRING ,cf ROW(item_id STRING ,category_id STRING ,behavior STRING ,ts TIMESTAMP(3)) ) WITH ( 'connector' = 'hbase-2.2' ,'zookeeper.quorum' = 'localhost:12181' ,'zookeeper.znode.parent' = '/hbase' ,'table-name' = 'user_log' ,'null-string-literal' = '--' -- ,'lookup.cache.max-rows' = '10000' -- ,'lookup.cache.ttl' = '10 minute' -- ttl time 超過這么長時間無數據才行 -- ,'lookup.async' = 'true' ); insert into hbase_user_log_sink select user_id, row(item_id, category_id, behavior, ts) from user_log;
測試數據:
{"category_id":"category_id_107","user_id":"user_id_108","item_id":"item_id_107","behavior":"107","ts":"2021-09-07 15:02:55.110"}
{"category_id":"category_id_107","user_id":"user_id_108","behavior":"107","ts":"2021-09-07 15:02:55.110"}
任務如下:

直接測試結果如下:
# 第一次把數據寫入 hbase hbase(main):010:0> get 'user_log','user_id_108' COLUMN CELL cf:behavior timestamp=1630997629859, value=107 cf:category_id timestamp=1630997629859, value=category_id_107 cf:item_id timestamp=1630997629859, value=item_id_107_11 cf:ts timestamp=1630997629859, value=\x00\x00\x01{\xC0\xBE<\xA2 1 row(s) Took 0.0123 seconds # 第二次把數據寫入 hbase,輸入 json 中沒有 item_id 字段 hbase(main):011:0> get 'user_log','user_id_108' COLUMN CELL cf:behavior timestamp=1630997647868, value=107 cf:category_id timestamp=1630997647868, value=category_id_107 cf:item_id timestamp=1630997647868, value=null cf:ts timestamp=1630997647868, value=\x00\x00\x01{\xC0\xBE<\xA2 1 row(s) Took 0.0211 seconds
第一次發送包含全部字段的json 到 kafka,hbase 每個列都寫入值
第一次發送不包含 item 字段的json 到 kafka,hbase item_id 列的值被 null 覆蓋
## 修改源碼
直接看源碼,定位到 HbaseSinkFunction.invoke 方法
public void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); // 輸入輸入轉為 Mutation mutator.mutate(mutationConverter.convertToMutation(value)); // flush when the buffer number of mutations greater than the configured max size. if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); } }
最終定位到組裝 Put 的代碼 HbaseSerde.createPutMutation 方法
public @Nullable Put createPutMutation(RowData row) { checkArgument(keyEncoder != null, "row key is not set."); // 獲取主鍵 byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); // rowkey 不能為空 if (rowkey.length == 0) { // drop dirty records, rowkey shouldn't be zero length return null; } // upsert Put put = new Put(rowkey); for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; // get family key byte[] familyKey = families[f]; // 獲取 column family 的 row RowData familyRow = row.getRow(i, qualifiers[f].length); // 循環 qualifiers, 將 row 中的值填入到 put 中 for (int q = 0; q < this.qualifiers[f].length; q++) { // get quantifier key byte[] qualifier = qualifiers[f][q]; // serialize value byte[] value = qualifierEncoders[f][q].encode(familyRow, q); put.addColumn(familyKey, qualifier, value); } } } return put; }
修改比較簡單,就是在組裝 put 的時候,判斷一下對應 列的值是否為 null,null 的就不添加到 put 中就可以了
為了保留Hbase sink 有的功能,有不想改太多,直接借用 hbase sink 的 "null-string-literal" 屬性,默認將數據中 null 的列轉為字符串 "null"
官網 "null-string-literal" 屬性介紹
Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.
借用 "null-string-literal" 屬性,當值為 "--" 的時候,就忽略 null 的列,源碼如下:
public class HBaseSerde { private final byte[] nullStringBytes; // add by venn,是否忽略 null 列 private final boolean ignoreNullColumn; public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) { ... this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8); // 屬性 null-string-literal 的值為 -- ignoreNullColumn = true ignoreNullColumn = "--".equals(nullStringLiteral); ... } /** * Returns an instance of Put that writes record to HBase table. * * @return The appropriate instance of Put for this use case. */ public @Nullable Put createPutMutation(RowData row) { checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { // drop dirty records, rowkey shouldn't be zero length return null; } // upsert Put put = new Put(rowkey); for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; // get family key byte[] familyKey = families[f]; RowData familyRow = row.getRow(i, qualifiers[f].length); for (int q = 0; q < this.qualifiers[f].length; q++) { // add by venn, 如果 ignoreNullColumn 為 true,切 對應列為 null,忽略 列 if (ignoreNullColumn && familyRow.isNullAt(q)) { continue; } // get quantifier key byte[] qualifier = qualifiers[f][q]; // serialize value byte[] value = qualifierEncoders[f][q].encode(familyRow, q); put.addColumn(familyKey, qualifier, value); } } } return put; } }
測試結果忽略
完整代碼參考:https://github.com/springMoon/sqlSubmit
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

