感謝您的關注 + 點贊 + 再看,對博主的肯定,會督促博主持續的輸出更多的優質實戰內容!!!
1.序篇-本文結構
大數據羊說
用數據提升美好事物發生的概率~
30篇原創內容
公眾號
protobuf
作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf
的 format
會非常有用(目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release)。
pr
見:https://github.com/apache/flink/pull/14376
這一節主要介紹 flink sql 中怎么自定義實現 format
,其中以最常使用的 protobuf
作為案例來介紹。
-
背景篇-為啥需要 protobuf format
-
目標篇-protobuf format 預期效果
-
難點剖析篇-此框架建設的難點、目前有哪些實現
-
維表實現篇-實現的過程
-
總結與展望篇
如果想在本地直接測試下:
- 在公眾號后台回復
-
flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)
-
flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)
-
flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)
-
執行源碼包中的
flink.examples.sql._05.format.formats.SocketWriteTest
測試類來制造 protobuf 數據 -
然后執行源碼包中的
flink.examples.sql._05.format.formats.ProtobufFormatTest
測試類來消費 protobuf 數據,並且打印在 console 中,然后就可以在 console 中看到結果。
2.背景篇-為啥需要 protobuf format
關於為什么選擇 protobuf
可以看這篇文章,寫的很詳細:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在實時計算的領域中,為了可讀性會選擇 json
,為了效率以及一些已經依賴了 grpc
的公司會選擇 protobuf
來做數據序列化,那么自然而然,日志的序列化方式也會選擇 protobuf
。
而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format
的。如下圖,基於 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
1
因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。
3.目標篇-protobuf format 預期效果
預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message
(自定義 model)、map
(映射)、repeated
(列表)、其他基本數據類型等,這些都是我們最常使用的類型。
預期 protobuf message 定義如下:
2
測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:
3
預期 flink sql:
數據源表 DDL:
CREATE TABLE protobuf_source (
name STRING
, names ARRAY<STRING>
, si_map MAP<STRING, INT>
)
WITH (
'connector' = 'socket',
'hostname' = 'localhost',
'port' = '9999',
'format' = 'protobuf',
'protobuf.class-name' = 'flink.examples.sql._04.format.formats.protobuf.Test'
)
數據匯表 DDL:
CREATE TABLE print_sink (
name STRING
, names ARRAY<STRING>
, si_map MAP<STRING, INT>
) WITH (
'connector' = 'print'
)
Transform 執行邏輯:
INSERT INTO print_sink
SELECT *
FROM protobuf_source
下面是我在本地跑的結果:
可以看到打印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。
4.難點剖析篇-目前有哪些實現
目前業界可以參考的實現如下:https://github.com/maosuhan/flink-pb, 也就是這位哥們負責目前 flink protobuf 的 format。
這種實現的具體使用方式如下:
7
其實現有幾個特點:
-
復雜性:用戶需要在 flink sql 程序運行時,將對應的 protobuf java 文件引入 classpath,這個特點是復合 flink 這樣的通用框架的特點的。但是如果需要在各個公司場景要做一個流式處理平台的場景下,各個 protobuf sdk 可能都位於不同的 jar 包中,那么其 jar 包管理可能是一個比較大的問題。
-
高效 serde:一般很多場景下為了通用化 serde protobuf message,可能會選擇 DynamicMessage 來處理 protobuf message,但是其 serde 性能相比原生 java code 的性能比較差。因為特點 1 引入了 protobuf 的 java class,所以其 serde function 可以基於 codegen 實現,而這將極大提高 serde 效率,效率提高就代表着省錢啊,可以吹逼的。
8
Notes:
當然博主針對第一點也有一些想法,比如怎樣做到不依賴 protobuf java 文件,只依賴 protobuf 的 message 定義即可或者只依賴其 descriptor。目前博主的想法如下:
flink 程序在客戶端獲取到對應的 protobuf message 定義
然后根據這個定義恢復出 proto 文件
客戶端本地執行 protoc 將此文件編譯為 java 文件
客戶端本地動態將此 java 文件編譯並 load 到 jvm 中
使用 codegen 然后動態生成執行代碼
一氣呵成!!!
具體實現其實可以參考:https://stackoverflow.com/questions/28381659/how-to-compile-protocol-buffers-schema-at-runtime
5.實現篇-實現的過程
5.1.flink format 工作原理
其實上節已經詳細描述了 flink sql 對於 source\sink\format 的加載機制。
-
通過 SPI 機制加載所有的 source\sink\format 工廠
Factory
-
過濾出 DeserializationFormatFactory\SerializationFormatFactory + format 標識的 format 工廠類
-
通過 format 工廠類創建出對應的 format
12
[
flink sql 知其所以然(一)| source\sink 原理
11
如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat
和 TableFactoryHelper.discoverEncodingFormat
創建的
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
16
所有通過 SPI 的 source\sink\formt 插件都繼承自 Factory
。
整體創建 format 方法的調用鏈如下圖。
13
5.2.flink protobuf format 實現
最終實現如下,涉及到了幾個實現類:
-
ProtobufFormatFactory
-
ProtobufOptions
-
ProtobufRowDataDeserializationSchema
-
ProtobufToRowDataConverters
14
具體流程:
-
定義 SPI 的工廠類
ProtobufFormatFactory implements DeserializationFormatFactory
,並且在 resource\META-INF 下創建 SPI 的插件文件 -
實現
ProtobufFormatFactory#factoryIdentifier
標識protobuf
-
實現
ProtobufFormatFactory#createDecodingFormat
來創建對應的DecodingFormat<DeserializationSchema<RowData>>
,DecodingFormat
是用來封裝具體的反序列化器的,實現DecodingFormat<DeserializationSchema<RowData>>#createRuntimeDecoder
,返回ProtobufRowDataDeserializationSchema
-
定義
ProtobufRowDataDeserializationSchema implements DeserializationSchema<RowData>
,這個就是具體的反序列化器,其實與 datastream api 相同 -
實現
ProtobufRowDataDeserializationSchema#deserialize
方法,與 datastream 相同,這個方法就是將byte[]
序列化為RowData
的具體邏輯 -
注意這里還實現了一個類
ProtobufToRowDataConverters
,其作用就是在客戶端創建出具體的將byte[]
序列化為RowData
的具體工具類,其會根據用戶定義的表字段類型動態生成數據轉換的 converter 類(策略模式:https://www.runoob.com/design-pattern/strategy-pattern.html),相當於表的 schema 確定之后,其 converter 也會確定
上述實現類的具體關系如下:
19
介紹完流程,進入具體實現方案細節:
ProtobufFormatFactory
主要創建 format 的邏輯:
public class ProtobufFormatFactory implements DeserializationFormatFactory {
public static final String IDENTIFIER = "protobuf";
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
// 1.獲取到 protobuf 的 class 全路徑
final String className = formatOptions.get(PROTOBUF_CLASS_NAME);
try {
// 2.load class
Class<GeneratedMessageV3> protobufV3 =
(Class<GeneratedMessageV3>) this.getClass().getClassLoader().loadClass(className);
// 3.創建 DecodingFormat
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,
DataType physicalDataType) {
// 4.獲取到 table schema rowtype
final RowType rowType = (RowType) physicalDataType.getLogicalType();
// 5.創建對應的 DeserializationSchema 作為反序列化器
return new ProtobufRowDataDeserializationSchema(
protobufV3
, true
, rowType);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
...
}
resources\META-INF 文件:
17
ProtobufRowDataDeserializationSchema
主要實現反序列化的邏輯:
public class ProtobufRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
...
private ProtobufToRowDataConverters.ProtobufToRowDataConverter runtimeConverter;
public ProtobufRowDataDeserializationSchema(
Class<? extends GeneratedMessageV3> messageClazz
, boolean ignoreParseErrors
, RowType expectedResultType) {
this.ignoreParseErrors = ignoreParseErrors;
Preconditions.checkNotNull(messageClazz, "Protobuf message class must not be null.");
this.messageClazz = messageClazz;
this.descriptorBytes = null;
this.descriptor = ProtobufUtils.getDescriptor(messageClazz);
this.defaultInstance = ProtobufUtils.getDefaultInstance(messageClazz);
// protobuf 本身的 schema
this.protobufOriginalRowType = (RowType) ProtobufSchemaConverter.convertToRowDataTypeInfo(messageClazz);
this.expectedResultType = expectedResultType;
// 1.根據 table schema 動態創建出對應的反序列化器
this.runtimeConverter = new ProtobufToRowDataConverters(false)
.createRowDataConverterByLogicalType(this.descriptor, this.expectedResultType);
}
@Override
public RowData deserialize(byte[] bytes) throws IOException {
if (bytes == null) {
return null;
}
try {
// 2.將 bytes 反序列化為 protobuf message
Message message = this.defaultInstance
.newBuilderForType()
.mergeFrom(bytes)
.build();
// 3.反序列化邏輯,從 protobuf message 中獲取字段轉換為 RowData
return (RowData) runtimeConverter.convert(message);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException(
format("Failed to deserialize Protobuf '%s'.", new String(bytes)), t);
}
}
...
可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter
上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter
。
ProtobufToRowDataConverters.ProtobufToRowDataConverter
就是在 ProtobufToRowDataConverters
中定義的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter
其實就是一個 convertor 接口:
@FunctionalInterface
public interface ProtobufToRowDataConverter extends Serializable {
Object convert(Object object);
}
其作用就是將 protobuf message 中的每一個字段轉換成為 RowData
中的每一個字段。
ProtobufToRowDataConverters
中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 字段轉換為 flink 數據類型的邏輯:
18
源碼公眾號后台回復flink sql 知其所以然(五)| 自定義 protobuf format獲取。
6.總結與展望篇
6.1.總結
本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。如果你正好需要這么一個 format,直接公眾號后台回復flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼吧。
大數據羊說
用數據提升美好事物發生的概率~
30篇原創內容
公眾號
6.2.展望
當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。
-
性能優化、通用化:protobuf java class 本地 codegen 來提高任務性能
-
數據質量:異常 AOP,alert 等
往期推薦
[
flink sql 知其所以然(四)| sql api 類型系統
[
flink sql 知其所以然(三)| 自定義 redis 數據匯表(附源碼)
[
flink sql 知其所以然(二)| 自定義 redis 數據維表(附源碼)
[
flink sql 知其所以然(一)| source\sink 原理
[
揭秘字節跳動埋點數據實時動態處理引擎(附源碼)
更多 Flink 實時大數據分析相關技術博文,視頻。后台回復 “flink” 獲取。
點個贊+在看,感謝您的肯定 👇