2019年1月28日,阿里雲宣布開源“計算王牌”實時計算平台Blink回饋給ApacheFlink社區。官方稱,計算延遲已經降到毫秒級,也就是你在瀏覽網頁的時候,眨了一下眼睛,淘寶、天貓處理的信息已經刷新了17億次。
作為一家對技術有追求、有渴望的公司,怎么少得了為Flink社區做些貢獻呢?
夫子說
首先,本文所述均基於flink 1.5.4。
我們為什么擴展Flink-SQL?
由於Flink 本身SQL語法並不提供在對接輸入源和輸出目的的SQL語法。數據開發在使用的過程中需要根據其提供的Api接口編寫Source和 Sink, 異常繁瑣,不僅需要了解FLink 各類Operator的API,還需要對各個組件的相關調用方式有了解(比如kafka,redis,mongo,hbase等),並且在需要關聯到外部數據源的時候沒有提供SQL相關的實現方式,因此數據開發直接使用Flink編寫SQL作為實時的數據分析時需要較大的額外工作量。
我們的目的是在使用Flink-SQL的時候只需要關心做什么,而不需要關心怎么做。不需要過多的關心程序的實現,專注於業務邏輯。
接下來,我們一起來看下Flink-SQL的擴展實現吧!
01擴展了哪些flink相關sql
(1)創建源表語句
(2)創建輸出表語句
(3)創建自定義函數
(4)維表關聯
02各個模塊是如何翻譯到flink的實現
( 1 ) 如何將創建源表的sql語句轉換為flink的operator;
Flink中表的都會映射到Table這個類。然后調用注冊方法將Table注冊到environment。
StreamTableEnvironment.registerTable(tableName, table);
當前我們只支持kafka數據源。Flink本身有讀取kafka 的實現類, FlinkKafkaConsumer09,所以只需要根據指定參數實例化出該對象。並調用注冊方法注冊即可。
另外需要注意在flink sql經常會需要用到rowtime, proctime, 所以我們在注冊表結構的時候額外添加rowtime,proctime。
當需要用到rowtime的使用需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個事情:1:如何從Row中獲取時間字段。 2:設定最大延遲時間。
( 2 ) 如何將創建的輸出表sql語句轉換為flink的operator;
Flink輸出Operator的基類是OutputFormat, 我們這里繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實現了獲取運行環境的方法getRuntimeContext(), 方便於我們之后自定義metric等操作。
我們以輸出到mysql插件mysql-sink為例,分兩部分:
-
將create table 解析出表名稱,字段信息,mysql連接信息。
該部分使用正則表達式的方式將create table 語句轉換為內部的一個實現類。該類存儲了表名稱,字段信息,插件類型,插件連接信息。
-
繼承RichOutputFormat將數據寫到對應的外部數據源。
主要是實現writeRecord方法,在mysql插件中其實就是調用jdbc 實現插入或者更新方法。
( 3) 如何將自定義函數語句轉換為flink的operator;
Flink對udf提供兩種類型的實現方式:
(1)繼承ScalarFunction
(2)繼承TableFunction
需要做的將用戶提供的jar添加到URLClassLoader, 並加載指定的class (實現上述接口的類路徑),然后調用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注冊。之后即可使用改定義的udf;
( 4 ) 維表功能是如何實現的?
流計算中一個常見的需求就是為數據流補齊字段。因為數據采集端采集到的數據往往比較有限,在做數據分析之前,就要先將所需的維度信息補全,但是當前flink並未提供join外部數據源的SQL功能。
實現該功能需要注意的幾個問題:
(1)維表的數據是不斷變化的
在實現的時候需要支持定時更新內存中的緩存的外部數據源,比如使用LRU等策略。
(2)IO吞吐問題
如果每接收到一條數據就串行到外部數據源去獲取對應的關聯記錄的話,網絡延遲將會是系統最大的瓶頸。這里我們選擇阿里貢獻給flink社區的算子RichAsyncFunction。該算子使用異步的方式從外部數據源獲取數據,大大減少了花費在網絡請求上的時間。
(3)如何將sql 中包含的維表解析到flink operator
為了從sql中解析出指定的維表和過濾條件, 使用正則明顯不是一個合適的辦法。需要匹配各種可能性。將是一個無窮無盡的過程。查看flink本身對sql的解析。它使用了calcite做為sql解析的工作。將sql解析出一個語法樹,通過迭代的方式,搜索到對應的維表;然后將維表和非維表結構分開。
通過上述步驟可以通過SQL完成常用的從kafka源表,join外部數據源,寫入到指定的外部目的結構中。