Flink 實現 Mysql Table Lookup Source


之前在 Lookup join mysql 的時候,因為只能使用 rowke 做關聯鍵,所以一直想寫個帶緩存的 udtf,通過 udtf 的方式關聯非主鍵的字段,同時由於 udf 里面加了緩存,所以用起來和 lookup join 差不多(關於 udf 的內容之前的博客已經寫過了)。

最近實現了幾個自定義的 TableSource,想着也實現一個 Lookup 的 Table Source,最近這段時間,花了點時間,自己寫 + 從 Flink 源碼里面抄,實現了一套自定義的 mysq Table Source 和 Lookup Source(隨后可能還會有 Hbase 的 Lookup Source,或許也會寫個 kudu 的)。

“參考” Flink 的 JdbcRowDataLookupFunction(大部分內容都是抄過來的,少造輪子,構造了和 Flink 源碼里面一樣的參數,主要 eval 方法就直接抄 Flink 源碼了)

DynamicTableSource 有兩種實現: ScanTableSource 和 LookupTableSource,需要先實現 ScanTableSource, LookupTableSource, 分別實現對應的的方法。

說明: LookupTableSource 也是一種 Table Source,不是 ScanTableSouce 的一部分。ScanTableSource 和 LookupTableSource 的選擇,是在優化 SQL 的時候確定的。

核心代碼:


override def translate(
  modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
  return List.empty[Transformation[_]]
}
val relNodes = modifyOperations.map(translateToRel)
// 優化 SQL
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
// 后續解析流程和 Stream Api 一樣,用 transformations 生成 StreamGraph,再生成 JobGraph
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}

執行 optimize 之前:

執行 optimize 之后:

實現

MysqlDynamicTableSource 實現 LookupTableSource 接口,實現對應的 getLookupRuntimeProvider 方法


@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    if (lookupOption == null) {
        lookupOption = new MysqlLookupOption.Builder()
                .setCacheMaxSize(options.get(MysqlOption.CACHE_MAX_SIZE))
                .setCacheExpireMs(options.get(MysqlOption.CACHE_EXPIRE_MS))
                .setMaxRetryTimes(options.get(MysqlOption.MAX_RETRY_TIMES))
                .build();
    }
    // 湊 MysqlRowDataLookUpFunction 需要的參數
    final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);

    String[] fieldNames = rowTypeInfo.getFieldNames();
    TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes();

    int[] lookupKeysIndex = context.getKeys()[0];
    int keyCount = lookupKeysIndex.length;
    String[] keyNames = new String[keyCount];
    for (int i = 0; i < keyCount; i++) {
        keyNames[i] = fieldNames[lookupKeysIndex[i]];
    }
    final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();

    // new MysqlRowDataLookUpFunction
    MysqlRowDataLookUpFunction lookUpFunction
            = new MysqlRowDataLookUpFunction(url, username, password, table, fieldNames, keyNames, fieldTypes, lookupOption, rowType);

    return TableFunctionProvider.of(lookUpFunction);
}

MysqlRowDataLookUpFunction 實現 TableFunction,核心代碼如下


@Override
public void open(FunctionContext context) {
    try {
        establishConnectionAndStatement();
        // cache, if not set "mysql.lookup.cache.max.size" and "mysql.lookup.cache.expire.ms", do not use cache
        this.cache =
                cacheMaxSize == -1 || cacheExpireMs == -1
                        ? null
                        : CacheBuilder.newBuilder()
                        .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                        .maximumSize(cacheMaxSize)
                        .build();
    } catch (SQLException sqe) {
        throw new IllegalArgumentException("open() failed.", sqe);
    }
}


/**
 * method eval lookup key,
 * search cache first
 * if cache not exit, query third system
 *
 * @param keys query parameter
 */
public void eval(Object... keys) {
    RowData keyRow = GenericRowData.of(keys);
    // get row from cache
    if (cache != null) {
        List<RowData> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (RowData cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }
    // query mysql, retry maxRetryTimes count
    for (int retry = 0; retry <= maxRetryTimes; retry++) {
        try {
            statement.clearParameters();
            statement = lookupKeyRowConverter.toExternal(keyRow, statement);
            try (ResultSet resultSet = statement.executeQuery()) {
                if (cache == null) {
                    // if cache is null, loop to collect result
                    while (resultSet.next()) {
                        collect(jdbcRowConverter.toInternal(resultSet));
                    }
                } else {
                    // cache is not null, loop to collect result, and save result to cache
                    ArrayList<RowData> rows = new ArrayList<>();
                    while (resultSet.next()) {
                        RowData row = jdbcRowConverter.toInternal(resultSet);
                        rows.add(row);
                        collect(row);
                    }
                    rows.trimToSize();
                    cache.put(keyRow, rows);
                }
            }
        }
    }
}


  • 構造方法獲取傳入的參數
  • open 方法初始化 mysql 連接,創建緩存對象
  • eval 方法是執行查詢的地方,先查緩存,再查 mysql

從整體來看,自定義Source,需要三個類: MysqlDynamicTableFactory -> MysqlDynamicTableSource -> MysqlRowDataLookUpFunction,Flink 通過 SPI 從 META-INF.services/org.apache.flink.table.factories.Factory 中注冊 TableFactory

代碼比較類似就不貼全部代碼了,完整代碼參考: GitHub

測試

建表語句


create temporary table mysql_behavior_conf(
   id int
  ,code STRING
  ,`value` STRING
  ,update_time TIMESTAMP(3)
)WITH(
 'connector' = 'cust-mysql'
 ,'mysql.url' = 'jdbc:mysql://localhost:3306/venn?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true'
 ,'mysql.username' = 'root'
 ,'mysql.password' = '123456'
 ,'mysql.database' = 'venn'
 ,'mysql.table' = 'lookup_join_config'
 ,'mysql.lookup.cache.max.size' = '1'
 ,'mysql.lookup.cache.expire.ms' = '600000'
 ,'mysql.lookup.max.retry.times' = '3'
 ,'mysql.timeout' = '10'
)
;

insert


INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.`value`, a.ts
FROM user_log a
  left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
  ON a.behavior = c.code
where a.behavior is not null;

任務執行圖

mysql 表數據:

輸出結果:

+I[user_id_1, abc, category_id_1, 1, 1_value, 2021-10-18T14:59:04.111]
+I[user_id_2, abc, category_id_2, 2, 2_value, 2021-10-18T14:59:04.112]
+I[user_id_3, abc, category_id_3, 3, null, 2021-10-18T14:59:05.113]
+I[user_id_4, abc, category_id_4, 4, null, 2021-10-18T14:59:05.113]
+I[user_id_5, abc, category_id_5, 5, null, 2021-10-18T14:59:06.115]
+I[user_id_6, abc, category_id_6, 6, null, 2021-10-18T14:59:06.116]
+I[user_id_7, abc, category_id_7, 7, null, 2021-10-18T14:59:07.118]

從緩存獲取數據:

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM