1. 背景
對於實時更新的維表,以什么組件來處理作為FlinkSQL的source維表?HBase?Kafka?或mysql?哪一種方案能得到正確結果?
且需要考慮到事實表和維表關聯的時候,是否需要和維表的歷史版本關聯?還是只關聯維表的最新版本?
下文以只關聯維表的最新版本為目標進行測試。
2. 實踐過程
2.1 采用upsert-kafka作為維表
(1) kafka生產者代碼
// 創建消息
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn");
for (int i = 2; i < 8; i++) {
JSONObject json1 = new JSONObject();
json1.put("key", i+"");
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name444"+i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"flinksqldim",
json1.toJSONString(),
json.toJSONString()
);
}
(2) FlinkSQL主體代碼
// 創建執行環境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一個輸入臨時表
tableEnv.executeSql(
"CREATE TABLE sensor_source(" +
" id STRING, " +
" name STRING, " +
" o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
" ) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把kafka中數據 映射成輸入維表 - 實時變更的維表
tableEnv.executeSql(
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
);
// 把Mysql中的表映射為一個輸出臨時表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入數據
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source as a " +
"INNER JOIN dim_source as b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
// "order by name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
2.2 采用mysql作為維表
mysql維表的更新(新增 或 修改)只對新到來的流生效。
主體代碼
// 創建執行環境,可web ui查看
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一個輸入臨時表,需要proctime
tableEnv.executeSql(
"CREATE TABLE sensor_source( " +
" id STRING, " +
" name STRING," +
" proctime AS PROCTIME() " +
" ) WITH ( " +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把mysql中表映射成輸入維表
tableEnv.executeSql(
"CREATE TABLE mysql_source (" +
" id STRING," +
" name STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'test_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'," +
" 'lookup.max-retries' = '3'," +
" 'lookup.cache.max-rows' = '1000'," +
" 'lookup.cache.ttl' = '60s' " +
")"
);
// 把Mysql中的表映射為一個輸出臨時表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入數據 FOR SYSTEM_TIME AS OF a.proctime
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source as a " +
"INNER JOIN mysql_source FOR SYSTEM_TIME AS OF a.proctime as b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
3. 試錯
3.1 使用Regular Joins 常規join
kafka生產者代碼
for (int i = 1; i < 10; i++) {
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name555"+i);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"flinksqldim2",
i,
json.toJSONString()
);
// 發送消息
Future<RecordMetadata> future = producer.send(record);
FlinkSQL處理代碼
// 創建執行環境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一個輸入臨時表
tableEnv.executeSql(
"CREATE TABLE sensor_source(" +
"id STRING, " +
"name STRING, " +
"o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把kafka中數據 映射成輸入維表 - 實時變更的維表, 非compacted topic
tableEnv.executeSql(
"CREATE TABLE dim_source ( " +
" id STRING, " +
" name STRING, " +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldim2'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把Mysql中的表映射為一個輸出臨時表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入數據
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source a " +
"JOIN dim_source b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
維表流更新了幾次數據后,結果表count_info中數據錯亂
3.2 mysql表作為維表
tableEnv.executeSql(
"CREATE TEMPORARY TABLE mysql_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3)," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'user_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")"
);
報錯如下:
Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
可見,mysql直接作為維表不能做temporal join , 那如果加上CDC呢?是不是就可以了?更新中。。。
3.3 使用HBase表作為維表
FlinkSQL主體代碼
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一個輸入臨時表
tableEnv.executeSql(
"CREATE TABLE sensor_source( " +
" id STRING, " +
" name STRING, " +
" o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
" ) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把hbase中表映射成輸入維表
tableEnv.executeSql(
"CREATE TABLE hbase_source (" +
" rowkey STRING," +
" INFO ROW<name STRING>," +
" PRIMARY KEY (rowkey) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'hbase-1.4'," +
" 'zookeeper.quorum' = 'node1:2181'," +
" 'table-name' = 'TEST_INFO'" +
")"
);
// 把Mysql中的表映射為一個輸出臨時表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入數據
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT INFO.name, count(*) as cnt " +
"FROM sensor_source o " +
"JOIN hbase_source c " +
"on o.id = c.rowkey " +
"where o.id > 3 " +
"group by INFO.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
遇到問題
# (1)未找到類
ClassNotFoundException: org.apache.hadoop.util.PlatformName
# 添加hadoop-auth
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.5.1</version>
</dependency>
# (2)未找到表
HBase Table expects at least one region in scan, please check the HBase table status in HBase cluster
# 因為表是用phoenix創建的,表名為大寫,而代碼用了小寫,所以找不到
# (3)classloader問題
在./conf/flink-conf.yaml文件添加以下配置
classloader.check-leaked-classloader: false