Flink sql 寫 Hbase 忽略空列


數倉場景下,經常有兩個表 Join 后往一個寬表寫數據。比如:埋點數據中只包含 user_id,但是下游計算的需要使用用戶的其他屬性,就需要將埋點數據和用戶數據做關聯。

實時場景,需要做流表 Join,用埋點數據的 user_id 去全量用戶表中取用戶屬性。

如果兩部分數據產生的順序不確定,可能先生成A,也可能先生成B,並且先后的時間范圍也不一定,可能是一起生成,也可能隔三五天。

兩部分數據生成的時間間隔不確定,在 Flink 中無法使用 Interval join,如果用流表 Join 也可能實現 Join 的功能,但是比較麻煩。

在這種場景下,可以基於 Hbase 的主鍵覆蓋策略將兩部分數據分別寫入 Hbase 表中,由於兩部分數據都包含 user_id,直接起兩個任務將兩部分的數據以 user_id 做主鍵,直接寫入 Hbase 表,兩部分數據直接寫自己對應的字段就可以達到 Join 的效果了。
(這是之前我們遇到的問題,雙流Join的場景,A Left Join B 流(Interval Join)需要 Join 的任務中,如果沒有關聯上 B 流的數據,不覆蓋了表中的對應列(一個單獨的任務將B流的數據直接寫入到對應Hbase 的 對應字段))

----- 原諒我表達能力不行,就是想改寫 Hbase sink 源碼,寫入 Hbase 的時候忽略空列 ----

先看下測試環境:
Flink 1.13.2
Hadoop 3.1.1
Hbase 2.2.7

## 測試
看下測試 SQL

-- kafka source
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior1'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
);

drop table if exists hbase_user_log_sink ;
CREATE TABLE hbase_user_log_sink (
   user_id STRING
  ,cf ROW(item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3))
) WITH (
   'connector' = 'hbase-2.2'
   ,'zookeeper.quorum' = 'localhost:12181'
   ,'zookeeper.znode.parent' = '/hbase'
   ,'table-name' = 'user_log'
   ,'null-string-literal' = '--'
   -- ,'lookup.cache.max-rows' = '10000'
   -- ,'lookup.cache.ttl' = '10 minute' -- ttl time 超過這么長時間無數據才行
   -- ,'lookup.async' = 'true'
);

insert into hbase_user_log_sink
select user_id, row(item_id, category_id, behavior, ts)
from user_log;

測試數據:

{"category_id":"category_id_107","user_id":"user_id_108","item_id":"item_id_107","behavior":"107","ts":"2021-09-07 15:02:55.110"}
{"category_id":"category_id_107","user_id":"user_id_108","behavior":"107","ts":"2021-09-07 15:02:55.110"}

任務如下: 

直接測試結果如下:

# 第一次把數據寫入 hbase
hbase(main):010:0> get 'user_log','user_id_108'
COLUMN                                                CELL                                                                                                                                                         
 cf:behavior                                          timestamp=1630997629859, value=107                                                                                                                           
 cf:category_id                                       timestamp=1630997629859, value=category_id_107                                                                                                               
 cf:item_id                                           timestamp=1630997629859, value=item_id_107_11                                                                                                                
 cf:ts                                                timestamp=1630997629859, value=\x00\x00\x01{\xC0\xBE<\xA2                                                                                                    
1 row(s)
Took 0.0123 seconds   
# 第二次把數據寫入 hbase,輸入 json 中沒有 item_id 字段
hbase(main):011:0> get 'user_log','user_id_108'
COLUMN                                                CELL                                                                                                                                                         
 cf:behavior                                          timestamp=1630997647868, value=107                                                                                                                           
 cf:category_id                                       timestamp=1630997647868, value=category_id_107                                                                                                               
 cf:item_id                                           timestamp=1630997647868, value=null                                                                                                                          
 cf:ts                                                timestamp=1630997647868, value=\x00\x00\x01{\xC0\xBE<\xA2                                                                                                    
1 row(s)
Took 0.0211 seconds

第一次發送包含全部字段的json 到 kafka,hbase 每個列都寫入值
第一次發送不包含 item 字段的json 到 kafka,hbase item_id 列的值被 null 覆蓋

## 修改源碼

直接看源碼,定位到 HbaseSinkFunction.invoke 方法

public void invoke(T value, Context context) throws Exception {
    checkErrorAndRethrow();
    // 輸入輸入轉為 Mutation
    mutator.mutate(mutationConverter.convertToMutation(value));

    // flush when the buffer number of mutations greater than the configured max size.
    if (bufferFlushMaxMutations > 0
            && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
        flush();
    }
}

最終定位到組裝 Put 的代碼 HbaseSerde.createPutMutation 方法

public @Nullable Put createPutMutation(RowData row) {
    checkArgument(keyEncoder != null, "row key is not set.");
    // 獲取主鍵
    byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
    // rowkey 不能為空
    if (rowkey.length == 0) {
        // drop dirty records, rowkey shouldn't be zero length
        return null;
    }
    // upsert
    Put put = new Put(rowkey);
    for (int i = 0; i < fieldLength; i++) {
        if (i != rowkeyIndex) {
            int f = i > rowkeyIndex ? i - 1 : i;
            // get family key
            byte[] familyKey = families[f];
            // 獲取 column family 的 row 
            RowData familyRow = row.getRow(i, qualifiers[f].length);
            // 循環 qualifiers, 將 row 中的值填入到 put 中
            for (int q = 0; q < this.qualifiers[f].length; q++) {
                // get quantifier key
                byte[] qualifier = qualifiers[f][q];
                // serialize value
                byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
                put.addColumn(familyKey, qualifier, value);
            }
        }
    }
    return put;
}

修改比較簡單,就是在組裝 put 的時候,判斷一下對應 列的值是否為 null,null 的就不添加到 put 中就可以了

為了保留Hbase sink 有的功能,有不想改太多,直接借用 hbase sink 的 "null-string-literal" 屬性,默認將數據中 null 的列轉為字符串 "null"

官網 "null-string-literal" 屬性介紹

Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.

借用 "null-string-literal" 屬性,當值為 "--" 的時候,就忽略 null 的列,源碼如下:

public class HBaseSerde {


private final byte[] nullStringBytes;
// add by venn,是否忽略 null 列
private final boolean ignoreNullColumn;


public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {

...
    this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
    // 屬性 null-string-literal 的值為 -- ignoreNullColumn = true
    ignoreNullColumn = "--".equals(nullStringLiteral);

...
}

/**
 * Returns an instance of Put that writes record to HBase table.
 *
 * @return The appropriate instance of Put for this use case.
 */
public @Nullable
Put createPutMutation(RowData row) {
    checkArgument(keyEncoder != null, "row key is not set.");
    byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
    if (rowkey.length == 0) {
        // drop dirty records, rowkey shouldn't be zero length
        return null;
    }
    // upsert
    Put put = new Put(rowkey);
    for (int i = 0; i < fieldLength; i++) {
        if (i != rowkeyIndex) {
            int f = i > rowkeyIndex ? i - 1 : i;
            // get family key
            byte[] familyKey = families[f];
            RowData familyRow = row.getRow(i, qualifiers[f].length);
            for (int q = 0; q < this.qualifiers[f].length; q++) {
                // add by venn, 如果 ignoreNullColumn 為 true,切 對應列為 null,忽略 列
                if (ignoreNullColumn && familyRow.isNullAt(q)) {
                    continue;
                }
                // get quantifier key
                byte[] qualifier = qualifiers[f][q];
                // serialize value
                byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
                put.addColumn(familyKey, qualifier, value);
            }
        }
    }
    return put;
}

}

測試結果忽略

完整代碼參考:https://github.com/springMoon/sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM