- flink 關聯 hbase 表非主鍵
關聯 Hbase 表非主鍵的功能,是我們一直都在做的事情,只是實現的方式不同。
在 Flink 1.10 版本的時候,SQL 關聯 Hbase,都是在 SqlSubmit 程序啟動的時候,基於配置文件生成 UDF 並注冊成臨時函數,直到 Flink 官方的 Hbase connector 支持 Lookup join,使用 lookup join 替換 udf 關聯 hbase 表主鍵的部分。
udf 相對於 connector 還是有比較大的差距,udf 的輸入輸出都要基於配置文件,並且生成的 udf 只能查詢固定的表的指定字段,條件也是提前確定好的,雖然也可以用,但是相對於 connector 靈活性上還是有很大的差距。
特別是這段時間,有個項目需要關聯hbase 表的場景多,如果使用 udf 需要在啟動的時候生成很多 udf,雖然對任務沒什么影響,但是看着就很傻,所以就有了自己實現 hbase 的 lookup join source。
最近花了一些時間,嘗試了一下自定義 sql source、mysql Table source 和支持 lookup join 的source,並實現了支持 lookup join hbase 非主鍵的 table source。
Flink 1.10 hbase udf
Flink 1.10 官方在源碼的里面提供了 hbase udf 的樣例,我們用這種方法實現流和hbase 表的關聯
HBaseConnectorITCase.java
// 配置信息和 hbase 表、列族、列 信息
private static Map<String, String> hbaseTableProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
properties.put(CONNECTOR_PROPERTY_VERSION, "1");
properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
// get zk quorum from "hbase-site.xml" in classpath
String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
// schema
String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
DescriptorProperties descriptorProperties = new DescriptorProperties(true);
TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
descriptorProperties.putTableSchema(SCHEMA, tableSchema);
descriptorProperties.putProperties(properties);
return descriptorProperties.asMap();
}
// lateral 方式 join hhbase 表
@Test
public void testHBaseLookupFunction() throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
StreamITCase.clear();
// prepare a source table
DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
Table in = streamTableEnv.fromDataStream(ds, "a, b, c");
streamTableEnv.registerTable("src", in);
Map<String, String> tableProperties = hbaseTableProperties();
TableSource source = TableFactoryService
.find(HBaseTableFactory.class, tableProperties)
.createTableSource(tableProperties);
streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
// perform a temporal table join query
String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))";
Table result = streamTableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<>());
streamEnv.execute();
List<String> expected = new ArrayList<>();
expected.add("1,10,Welt-1");
expected.add("2,20,Welt-2");
expected.add("3,30,Welt-3");
expected.add("3,30,Welt-3");
StreamITCase.compareWithList(expected);
}
Flink lookup join rowkey
join hbase 表主鍵的樣例,之前已經有博客寫過,也不再贅述: flink sql join hbase demo
這里簡單介紹一下 HBaseRowDataLookupFunction 的實現
還是一樣的三件套,通過 java spi api 注冊工廠 HBase2DynamicTableFactory,工廠創建 HBaseDynamicTableSource, TableSource 創建 HBaseRowDataLookupFunction(同步,或:HBaseRowDataAsyncLookupFunction 異步)

工廠和 TableSource 沒什么好說的,直接看 HBaseRowDataLookupFunction
構造方法中傳入需要的參數, open 方法初始化 緩存對象和hbase 連接
@Override
public void open(FunctionContext context) {
LOG.info("start open ...");
Configuration config = prepareRuntimeConfiguration();
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
this.cache =
cacheMaxSize <= 0 || cacheExpireMs <= 0
? null
: CacheBuilder.newBuilder()
.recordStats()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
if (cache != null) {
context.getMetricGroup()
.gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
}
} catch (TableNotFoundException tnfe) {
LOG.error("Table '{}' not found ", hTableName, tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
LOG.info("end open.");
}
lookup 的時候調用 eval 方法,執行 lookup 查詢,官方提供的 hbase lookup source 只支持 hbase 的 rowkey 查詢,所以 eval 方法輸入的參數,就是 hbase 表對應的 rowkey,直接使用 rowkey 就可以做 hbase 查詢。
方法中,先使用 rowkey 去緩存中找是否有緩存,存在就直接方法;如果緩存中不存在,再去 hbase 中查詢結果,返回結果,並把結果放到緩存中。
/**
* The invoke entry point of lookup function.
*
* @param rowKey the lookup key. Currently only support single rowkey.
*/
public void eval(Object rowKey) throws IOException {
if (cache != null) {
RowData cacheRowData = cache.getIfPresent(rowKey);
if (cacheRowData != null) {
collect(cacheRowData);
return;
}
}
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// fetch result
Get get = serde.createGet(rowKey);
if (get != null) {
Result result = table.get(get);
if (!result.isEmpty()) {
if (cache != null) {
// parse and collect
RowData rowData = serde.convertToNewRow(result);
collect(rowData);
cache.put(rowKey, rowData);
} else {
collect(serde.convertToReusedRow(result));
}
}
}
break;
} catch (IOException e) {
LOG.error(String.format("HBase lookup error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of HBase lookup failed.", e);
}
try {
Thread.sleep(1000 * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
}
HBaseRowDataLookupFunction 的實現並不復雜,就是簡單的創建 hbase 連接,用 get api 做主鍵查詢,並設置需要查詢的列。
flink lookup join hbase non rowkey
hbase 非主鍵的 lookup join 和 主鍵的查詢基本上一樣,只是在查詢條件上有所差異。由於 hbase 的特性,主鍵固定為 rowkey,使用 get api 即可實現高性能的查詢。非主鍵的查詢,直接在主鍵的基礎上,將唯一的主鍵替換一個或多個列,使用 scan api 配合 Filter 做查詢,並設置需要查詢的列。
比主鍵查詢復雜的有以下兩點:
- 主鍵可以用 get api 直接定位到結果,非主鍵只能用 scan api 配合 Filter 做全表掃描
- 主鍵查詢只返回一條數據,非主鍵會返回一到多條數據
列選擇
hbase 的主鍵查詢是不需要指定查詢列的,畢竟主鍵只有一個,非主鍵的查詢則需要選擇過濾器需要過濾的列。flink sql 中 表的 schema 的定義是在 sql 中定義的,所以需要在 sql 中定義 hbase 表的 schame(這部分和官方提供的一樣),不一樣的部分在 select 語句中。
hbase 主鍵查詢 sql,需要使用時態表 join 來啟動hbase 的 lookup 功能,用 build 表的指定字段 = rowkey 即可 :
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.user_id = rowkey
where a.behavior is not null;
非主鍵的查詢正常來說,應該是這樣的
create temporary table hbase_table_config(
rowkey string
,cf ROW(code string, `value` string, update_time string)
,cf2 ROW(code string, `value` string, update_time string)
)WITH(
'connector' = 'cust-hbase'
,'hbase.zookeeper.quorum' = 'thinkpad:12181'
,'zookeeper.znode.parent' = '/hbase'
,'hbase.tablename' = 'hbase_table_config'
,'hbase.null-string-literal' = 'null'
,'hbase.lookup.key' = 'cf:code,cf2:code'
,'hbase.lookup.cache.max.size' = '100'
,'hbase.lookup.cache.expire.ms' = '6'
,'hbase.lookup.max.retry.times' = '3'
,'hbase.timeout' = '10'
)
;
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
on a.user_id = c.cf.code and a.item_id = c.cf2.code
where a.behavior is not null;
但是 flink 的 sql 解析中,並不能識別 “c.cf.user_id” 這種字段,如果這樣定義,在解析關聯條件的時候,會解析不處理,並認為沒有關聯條件,報錯:
Temporal table join requires an equality condition on fields of table [default_catalog.default_database.hbase_table_config].
解析 查詢條件的源碼 CommonPhysicalLookupJoin.allLookupKeys
val allLookupKeys: Map[Int, LookupKey] = {
// join key pairs from left input field index to temporal table field index
val joinKeyPairs: Array[IntPair] = getTemporalTableJoinKeyPairs(joinInfo, calcOnTemporalTable)
// all potential index keys, mapping from field index in table source to LookupKey
analyzeLookupKeys(
cluster.getRexBuilder,
joinKeyPairs,
calcOnTemporalTable)
}

debug 進入具體解析的代碼 CommonPhysicalLookupJoin.getIdenticalSourceField
private def getIdenticalSourceField(rexProgram: RexProgram, outputOrdinal: Int): Int = {
assert((outputOrdinal >= 0) && (outputOrdinal < rexProgram.getProjectList.size()))
val project = rexProgram.getProjectList.get(outputOrdinal)
var index = project.getIndex
while (true) {
var expr = rexProgram.getExprList.get(index)
expr match {
case call: RexCall if call.getOperator == SqlStdOperatorTable.IN_FENNEL =>
// drill through identity function
expr = call.getOperands.get(0)
case call: RexCall if call.getOperator == SqlStdOperatorTable.CAST =>
// drill through identity function
val outputType = call.getType
val inputType = call.getOperands.get(0).getType
val isCompatible = PlannerTypeUtils.isInteroperable(
FlinkTypeFactory.toLogicalType(outputType), FlinkTypeFactory.toLogicalType(inputType))
expr = if (isCompatible) call.getOperands.get(0) else expr
case _ =>
}
expr match {
case ref: RexLocalRef => index = ref.getIndex
case ref: RexInputRef => return ref.getIndex
case _ => return -1
}
}
-1
}
解析查詢條件的時候,不能識別到 RexFieldAccess 這種復合類型,只能識別表達是或者通過 index 直接定位到字段
hbase 非主鍵,不能識別

hbase rowkey 主鍵,不需要解析

mysql 直接返回 RexInputRef 的 index

針對不能解析復合類型(c.cf.code)的關聯條件的情況,繞了一下,在 hbase 表的定義中添加對應參數,確定需要做查詢條件的列,在 select 語句中使用"拼接" 的方式,將查詢條件對應的值傳入。
hbase 表定義如下:
create temporary table hbase_table_config(
rowkey string
,cf ROW(code string, `value` string, update_time string)
,cf2 ROW(code string, `value` string, update_time string)
)WITH(
'connector' = 'cust-hbase'
,'hbase.zookeeper.quorum' = 'thinkpad:12181'
,'zookeeper.znode.parent' = '/hbase'
,'hbase.tablename' = 'hbase_table_config'
,'hbase.null-string-literal' = 'null'
,'hbase.lookup.key' = 'cf:code,cf2:code'
,'hbase.lookup.cache.max.size' = '100'
,'hbase.lookup.cache.expire.ms' = '6'
,'hbase.lookup.max.retry.times' = '3'
,'hbase.timeout' = '10'
)
查詢語句,join 中 hbase 表側的字段,只能寫 rowkey(只能 join 用等值連接,hbase 表只能有一個簡單字段,就是 rowkey)
SELECT a.user_id, a.item_id, a.category_id, a.behavior,c.rowkey, c.cf.`value`, c.cf2.`value`,a.ts
FROM user_log a
left join hbase_table_config FOR SYSTEM_TIME AS OF a.process_time AS c
-- 必須要一個key 做關聯條件,實際上不會用這個做關聯
-- lookup join may hive multiple join condition
-- 流輸入端的字段使用 ',' 拼接的方式傳入參數
-- hbase 端通過參數 'hbase.lookup.key' = 'cf:code,cf2:code' 傳入過濾的字段,兩邊必須的數量必須匹配
ON concat(a.user_id,',',a.item_id) = c.rowkey --and a.item_id = c.cf.`code`
-- on a.user_id = c.cf.code and a.item_id = c.cf2.code
where a.behavior is not null;
這樣查詢的列和對應的值都通過 SQL 傳入到了 flink 中,下面來看看 自定義 hbase lookup Source (非主鍵)的具體實現
自定義 hbase lookup Source (非主鍵)
如其他 TableSource,hbase lookup source 也包含 TableFactory、TableSource和SourceFunction,再加一個 HbaseOption 定義配置參數

HbaseDynamicTableFactory
HbaseDynamicTableFactory 中定義 connector 標識: cust-hbase,定義必填、選填參數,從執行上下文獲取參數/TableSchema,創建 HbaseDynamicTableSource
public class HbaseDynamicTableFactory implements DynamicTableSourceFactory {
@Override
// connector 標識
public String factoryIdentifier() {
// used for matching to `connector = '...'`
return "cust-hbase";
}
@Override
// 必填參數
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HbaseOption.ZOOKEEPER_QUORUM);
options.add(HbaseOption.ZOOKEEPER_ZNODE_PARENT);
options.add(HbaseOption.NULL_STRING_LITERAL);
options.add(HbaseOption.TABLE);
options.add(HbaseOption.LOOKUP_KEY);
// options.add(FactoryUtil.FORMAT); // use pre-defined option for format
return options;
}
@Override
// 選填參數
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
// no optional option
options.add(HbaseOption.CACHE_MAX_SIZE);
options.add(HbaseOption.CACHE_EXPIRE_MS);
options.add(HbaseOption.MAX_RETRY_TIMES);
options.add(HbaseOption.TIME_OUT);
return options;
}
@Override
// 從執行上下文獲取參數, 創建 HbaseDynamicTableSource
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig config = helper.getOptions();
HbaseOption option = new HbaseOption.Builder()
.setZookeeperQuorum(config.get(HbaseOption.ZOOKEEPER_QUORUM))
.setZookeeperZnodeParent(config.get(HbaseOption.ZOOKEEPER_ZNODE_PARENT))
.setNullStringLiteral(config.get(HbaseOption.NULL_STRING_LITERAL))
.setTable(config.get(HbaseOption.TABLE))
.setLookupKey(config.get(HbaseOption.LOOKUP_KEY))
.setCacheMaxSize(config.get(HbaseOption.CACHE_MAX_SIZE))
.setCacheExpireMs(config.get(HbaseOption.CACHE_EXPIRE_MS))
.setMaxRetryTimes(config.get(HbaseOption.MAX_RETRY_TIMES))
.setTimeOut(config.get(HbaseOption.TIME_OUT))
.build();
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new HbaseDynamicTableSource(producedDataType, option, physicalSchema);
}
}
HbaseDynamicTableSource
猶豫這里只實現了 HbaseRowDataLookUpFunction,所以 HbaseDynamicTableSource 沒有什么內容
HbaseRowDataLookUpFunction
HbaseRowDataLookUpFunction 是 LookUpFunction 的具體實現
構造方法
先從 配置中解析出了需要作為過濾的列,再從定義的 hbase 表 schame 中解析出需要返回的列族、列
public HbaseRowDataLookUpFunction(HBaseTableSchema hbaseSchema, HbaseOption options) throws UnsupportedEncodingException {
this.hbaseSchema = hbaseSchema;
this.cacheMaxSize = options.getCacheMaxSize();
this.cacheExpireMs = options.getCacheExpireMs();
this.maxRetryTimes = options.getMaxRetryTimes();
this.options = options;
// format lookup filter column
String lookupKeyConfig = options.getLookupKey();
lookupKey = new LinkedHashMap<>();
for (String key : lookupKeyConfig.split(",")) {
String[] tmp = key.split(":");
byte[] family = tmp[0].getBytes("UTF-8");
byte[] qualify = tmp[1].getBytes("UTF-8");
if (lookupKey.containsKey(family)) {
lookupKey.get(family).add(qualify);
} else {
List<byte[]> list = new ArrayList<>();
list.add(qualify);
lookupKey.put(family, list);
}
}
// format result qualifier
resultColumn = new LinkedHashMap<>();
for (String familyString : hbaseSchema.getFamilyNames()) {
byte[] family = familyString.getBytes("UTF-8");
byte[][] qualifies = hbaseSchema.getQualifierKeys(familyString);
for (byte[] by : qualifies) {
if (resultColumn.containsKey(family)) {
resultColumn.get(family).add(by);
} else {
List<byte[]> list = new ArrayList<>();
list.add(by);
resultColumn.put(family, list);
}
}
}
LOG.info("open end");
}
Open 方法
Open 方法中建了了Hbase 連接和創建了緩存對象
evil 方法
evil 方法是核心的數據處理方法
1、用組合的值 input 生產 RowData,從緩存中查詢數據,存在就返回,這里需要注意的,緩存是 Cache<RowData, List
RowData keyRow = GenericRowData.of(input);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
2、查詢 hbase
- 先用 lookupKey 創建列值過濾器(SingleColumnValueFilter ),添加到 scan 中,如果輸入參數的數量和lookupKey的
int i = 0;
// add SingleColumnValueFilter
for (Map.Entry<byte[], List<byte[]>> entry : lookupKey.entrySet()) {
byte[] family = entry.getKey();
for (byte[] qualifier : entry.getValue()) {
Filter filter = new SingleColumnValueFilter(family, qualifier, CompareOperator.EQUAL, keys[i].getBytes("UTF8"));
scan.setFilter(filter);
// Avoid can't get all condition column in keys
// if have two filter column, but only get one key, just use first column as filter
if (i == keys.length - 1) {
break;
}
++i;
}
}
- scan 添加返回的列
// add result column
for (Map.Entry<byte[], List<byte[]>> entry : resultColumn.entrySet()) {
byte[] family = entry.getKey();
for (byte[] qualifier : entry.getValue()) {
scan.addColumn(family, qualifier);
}
}
- 查詢和返回結果
// scan&parse result
try (ResultScanner resultSet = table.getScanner(scan)) {
Result result;
if (cache == null) {
while ((result = resultSet.next()) != null) {
// parse to RowData
RowData row = serde.convertToNewRow(result);
collect(row);
}
} else {
ArrayList<RowData> rows = new ArrayList<>();
while ((result = resultSet.next()) != null) {
// parse to RowData
RowData row = serde.convertToNewRow(result);
rows.add(row);
collect(row);
}
rows.trimToSize();
cache.put(keyRow, rows);
}
}
測試

自定義 hbase 表
為了更有代表性,這里定義了2個列族做測試
create temporary table hbase_table_config(
rowkey string
,cf ROW(code string, `value` string, update_time string)
,cf2 ROW(code string, `value` string, update_time string)
)WITH(
'connector' = 'cust-hbase'
,'hbase.zookeeper.quorum' = 'thinkpad:12181'
,'zookeeper.znode.parent' = '/hbase'
,'hbase.tablename' = 'hbase_table_config'
,'hbase.null-string-literal' = 'null'
,'hbase.lookup.key' = 'cf:code,cf2:code'
,'hbase.lookup.cache.max.size' = '100'
,'hbase.lookup.cache.expire.ms' = '6'
,'hbase.lookup.max.retry.times' = '3'
,'hbase.timeout' = '10'
)
hbase 表數據
put 'hbase_table_config','1','cf:code','code_cf'
put 'hbase_table_config','1','cf:value','value_cf_1'
put 'hbase_table_config','1','cf:update_time','1'
put 'hbase_table_config','1','cf2:code','code_cf2_1'
put 'hbase_table_config','1','cf2:value','value_cf2_1'
put 'hbase_table_config','1','cf2:update_time','1'
put 'hbase_table_config','2','cf:code','code_cf'
put 'hbase_table_config','2','cf:value','value_cf_2'
put 'hbase_table_config','2','cf:update_time','2'
put 'hbase_table_config','2','cf2:code','code_cf2_2'
put 'hbase_table_config','2','cf2:value','value_cf2_2'
put 'hbase_table_config','2','cf2:update_time','2'
場景1 關聯一個列
SQL:
create temporary table hbase_table_config(
rowkey string
,cf ROW(code string, `value` string, update_time string)
,cf2 ROW(code string, `value` string, update_time string)
)WITH(
'connector' = 'cust-hbase'
,'hbase.zookeeper.quorum' = 'thinkpad:12181'
,'zookeeper.znode.parent' = '/hbase'
,'hbase.tablename' = 'hbase_table_config'
,'hbase.null-string-literal' = 'null'
,'hbase.lookup.key' = 'cf:code'
-- ,'hbase.lookup.key' = 'cf:code,cf2:code'
,'hbase.lookup.cache.max.size' = '100'
,'hbase.lookup.cache.expire.ms' = '6'
,'hbase.lookup.max.retry.times' = '3'
,'hbase.timeout' = '10'
)
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior,rowkey, behavior_map, behavior_map2, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior,c.rowkey, c.cf.`value`, c.cf2.`value`,a.ts
FROM user_log a
left join hbase_table_config FOR SYSTEM_TIME AS OF a.process_time AS c
-- 參數順序和配置順序一匹配,即: a.behavior = c.cf2.code
ON a.behavior = c.rowkey
where a.behavior is not null;
輸入一條數據
{"category_id":"category_id_1","user_id":"user_id_3","item_id":"abc","behavior":"code_cf","ts":"2021-11-16 15:01:41.327"}
返回數據
+I[user_id_3, abc, category_id_1, code_cf, 1, value_cf_1, value_cf2_1, 2021-11-16T15:01:41.327]
+I[user_id_3, abc, category_id_1, code_cf, 2, value_cf_2, value_cf2_2, 2021-11-16T15:01:41.327]

場景2 關聯不同列族的兩個列
SQL:
create temporary table hbase_table_config(
rowkey string
,cf ROW(code string, `value` string, update_time string)
,cf2 ROW(code string, `value` string, update_time string)
)WITH(
'connector' = 'cust-hbase'
,'hbase.zookeeper.quorum' = 'thinkpad:12181'
,'zookeeper.znode.parent' = '/hbase'
,'hbase.tablename' = 'hbase_table_config'
,'hbase.null-string-literal' = 'null'
,'hbase.lookup.key' = 'cf:code,cf2:code'
,'hbase.lookup.cache.max.size' = '100'
,'hbase.lookup.cache.expire.ms' = '6'
,'hbase.lookup.max.retry.times' = '3'
,'hbase.timeout' = '10'
)
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior,rowkey, behavior_map, behavior_map2, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior,c.rowkey, c.cf.`value`, c.cf2.`value`,a.ts
FROM user_log a
left join hbase_table_config FOR SYSTEM_TIME AS OF a.process_time AS c
-- 參數順序和配置順序一匹配,即: a.category_id = c.cf.code and a.behavior = c.cf2.code
ON concat(a.behavior,',',a.category_id) = c.rowkey
where a.behavior is not null;
輸入一條數據
{"category_id":"code_cf2_2","user_id":"user_id_3","item_id":"abc","behavior":"code_cf","ts":"2021-11-16 15:01:41.327"}
返回數據
+I[user_id_3, abc, code_cf2_2, code_cf, 2, value_cf_2, value_cf2_2, 2021-11-16T15:01:41.327]

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

