flink sql 知其所以然(五)| 自定義 protobuf format


圖片

感謝您的關注  +  點贊 + 再看,對博主的肯定,會督促博主持續的輸出更多的優質實戰內容!!!

1.序篇-本文結構

大數據羊說

大數據羊說

用數據提升美好事物發生的概率~

30篇原創內容

公眾號

protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用(目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release)。

issue 見:https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project %3D FLINK AND issuetype %3D "New Feature" AND text ~ protobuf order by created DESC

pr 見:https://github.com/apache/flink/pull/14376

這一節主要介紹 flink sql 中怎么自定義實現 format,其中以最常使用的 protobuf 作為案例來介紹。

  1. 背景篇-為啥需要 protobuf format

  2. 目標篇-protobuf format 預期效果

  3. 難點剖析篇-此框架建設的難點、目前有哪些實現

  4. 維表實現篇-實現的過程

  5. 總結與展望篇

如果想在本地直接測試下:

  1. 在公眾號后台回復
  • flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)

  • flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)

  • flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼(源碼基於 1.13.1 實現)

  1. 執行源碼包中的 flink.examples.sql._05.format.formats.SocketWriteTest 測試類來制造 protobuf 數據

  2. 然后執行源碼包中的 flink.examples.sql._05.format.formats.ProtobufFormatTest 測試類來消費 protobuf 數據,並且打印在 console 中,然后就可以在 console 中看到結果。

2.背景篇-為啥需要 protobuf format

關於為什么選擇 protobuf 可以看這篇文章,寫的很詳細:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在實時計算的領域中,為了可讀性會選擇 json,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那么自然而然,日志的序列化方式也會選擇 protobuf

而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

圖片

1

因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。

3.目標篇-protobuf format 預期效果

預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message(自定義 model)、map(映射)、repeated(列表)、其他基本數據類型等,這些都是我們最常使用的類型。

預期 protobuf message 定義如下:

圖片

2

測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:

圖片

3

預期 flink sql:

數據源表 DDL:

CREATE TABLE protobuf_source (
    name STRING
  , names ARRAY<STRING>
  , si_map MAP<STRING, INT>
)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'format' = 'protobuf',
  'protobuf.class-name' = 'flink.examples.sql._04.format.formats.protobuf.Test'
)

數據匯表 DDL:

CREATE TABLE print_sink (
  name STRING
  , names ARRAY<STRING>
  , si_map MAP<STRING, INT>
) WITH (
  'connector' = 'print'
)

Transform 執行邏輯:

INSERT INTO print_sink
SELECT *
FROM protobuf_source

下面是我在本地跑的結果:

圖片

圖片

可以看到打印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。

4.難點剖析篇-目前有哪些實現

目前業界可以參考的實現如下:https://github.com/maosuhan/flink-pb, 也就是這位哥們負責目前 flink protobuf 的 format。

這種實現的具體使用方式如下:

圖片

7

其實現有幾個特點:

  1. 復雜性:用戶需要在 flink sql 程序運行時,將對應的 protobuf java 文件引入 classpath,這個特點是復合 flink 這樣的通用框架的特點的。但是如果需要在各個公司場景要做一個流式處理平台的場景下,各個 protobuf sdk 可能都位於不同的 jar 包中,那么其 jar 包管理可能是一個比較大的問題。

  2. 高效 serde:一般很多場景下為了通用化 serde protobuf message,可能會選擇 DynamicMessage 來處理 protobuf message,但是其 serde 性能相比原生 java code 的性能比較差。因為特點 1 引入了 protobuf 的 java class,所以其 serde function 可以基於 codegen 實現,而這將極大提高 serde 效率,效率提高就代表着省錢啊,可以吹逼的。

圖片

8

Notes:

當然博主針對第一點也有一些想法,比如怎樣做到不依賴 protobuf java 文件,只依賴 protobuf 的 message 定義即可或者只依賴其 descriptor。目前博主的想法如下:

  1. flink 程序在客戶端獲取到對應的 protobuf message 定義

  2. 然后根據這個定義恢復出 proto 文件

  3. 客戶端本地執行 protoc 將此文件編譯為 java 文件

  4. 客戶端本地動態將此 java 文件編譯並 load 到 jvm 中

  5. 使用 codegen 然后動態生成執行代碼

一氣呵成!!!

具體實現其實可以參考:https://stackoverflow.com/questions/28381659/how-to-compile-protocol-buffers-schema-at-runtime

5.實現篇-實現的過程

其實上節已經詳細描述了 flink sql 對於 source\sink\format 的加載機制。

  1. 通過 SPI 機制加載所有的 source\sink\format 工廠 Factory

  2. 過濾出 DeserializationFormatFactory\SerializationFormatFactory + format 標識的 format 工廠類

  3. 通過 format 工廠類創建出對應的 format

圖片

12

[

圖片

flink sql 知其所以然(一)| source\sink 原理

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

圖片

11

如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的

// either implement your custom validation logic here ...
        // or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
        DeserializationFormatFactory.class,
        FactoryUtil.FORMAT);

圖片

16

所有通過 SPI 的 source\sink\formt 插件都繼承自 Factory

整體創建 format 方法的調用鏈如下圖。

圖片

13

最終實現如下,涉及到了幾個實現類:

  1. ProtobufFormatFactory

  2. ProtobufOptions

  3. ProtobufRowDataDeserializationSchema

  4. ProtobufToRowDataConverters

圖片

14

具體流程:

  1. 定義 SPI 的工廠類 ProtobufFormatFactory implements DeserializationFormatFactory,並且在 resource\META-INF 下創建 SPI 的插件文件

  2. 實現 ProtobufFormatFactory#factoryIdentifier 標識 protobuf

  3. 實現 ProtobufFormatFactory#createDecodingFormat 來創建對應的 DecodingFormat<DeserializationSchema<RowData>>DecodingFormat 是用來封裝具體的反序列化器的,實現 DecodingFormat<DeserializationSchema<RowData>>#createRuntimeDecoder,返回 ProtobufRowDataDeserializationSchema

  4. 定義 ProtobufRowDataDeserializationSchema implements DeserializationSchema<RowData>,這個就是具體的反序列化器,其實與 datastream api 相同

  5. 實現 ProtobufRowDataDeserializationSchema#deserialize 方法,與 datastream 相同,這個方法就是將 byte[] 序列化為 RowData 的具體邏輯

  6. 注意這里還實現了一個類 ProtobufToRowDataConverters,其作用就是在客戶端創建出具體的將  byte[] 序列化為 RowData 的具體工具類,其會根據用戶定義的表字段類型動態生成數據轉換的 converter 類(策略模式:https://www.runoob.com/design-pattern/strategy-pattern.html),相當於表的 schema 確定之后,其 converter 也會確定

上述實現類的具體關系如下:

圖片

19

介紹完流程,進入具體實現方案細節:

ProtobufFormatFactory 主要創建 format 的邏輯:

public class ProtobufFormatFactory implements DeserializationFormatFactory {

    public static final String IDENTIFIER = "protobuf";

    @Override
    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context,
            ReadableConfig formatOptions) {

        FactoryUtil.validateFactoryOptions(this, formatOptions);

        // 1.獲取到 protobuf 的 class 全路徑
        final String className = formatOptions.get(PROTOBUF_CLASS_NAME);

        try {
            // 2.load class
            Class<GeneratedMessageV3> protobufV3 =
                    (Class<GeneratedMessageV3>) this.getClass().getClassLoader().loadClass(className);

            // 3.創建 DecodingFormat
            return new DecodingFormat<DeserializationSchema<RowData>>() {
                @Override
                public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,
                        DataType physicalDataType) {
                    // 4.獲取到 table schema rowtype
                    final RowType rowType = (RowType) physicalDataType.getLogicalType();

                    // 5.創建對應的 DeserializationSchema 作為反序列化器
                    return new ProtobufRowDataDeserializationSchema(
                            protobufV3
                            , true
                            , rowType);
                }

                @Override
                public ChangelogMode getChangelogMode() {
                    return ChangelogMode.insertOnly();
                }
            };
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    ...
}

resources\META-INF 文件:

圖片

17

ProtobufRowDataDeserializationSchema 主要實現反序列化的邏輯:

public class ProtobufRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {

    ...

    private ProtobufToRowDataConverters.ProtobufToRowDataConverter runtimeConverter;

    public ProtobufRowDataDeserializationSchema(
            Class<? extends GeneratedMessageV3> messageClazz
            , boolean ignoreParseErrors
            , RowType expectedResultType) {
        this.ignoreParseErrors = ignoreParseErrors;
        Preconditions.checkNotNull(messageClazz, "Protobuf message class must not be null.");
        this.messageClazz = messageClazz;
        this.descriptorBytes = null;
        this.descriptor = ProtobufUtils.getDescriptor(messageClazz);
        this.defaultInstance = ProtobufUtils.getDefaultInstance(messageClazz);

        // protobuf 本身的 schema
        this.protobufOriginalRowType = (RowType) ProtobufSchemaConverter.convertToRowDataTypeInfo(messageClazz);

        this.expectedResultType = expectedResultType;

        // 1.根據 table schema 動態創建出對應的反序列化器
        this.runtimeConverter = new ProtobufToRowDataConverters(false)
                .createRowDataConverterByLogicalType(this.descriptor, this.expectedResultType);
    }

    @Override
    public RowData deserialize(byte[] bytes) throws IOException {
        if (bytes == null) {
            return null;
        }
        try {

            // 2.將 bytes 反序列化為 protobuf message
            Message message = this.defaultInstance
                    .newBuilderForType()
                    .mergeFrom(bytes)
                    .build();

            // 3.反序列化邏輯,從 protobuf message 中獲取字段轉換為 RowData
            return (RowData) runtimeConverter.convert(message);
        } catch (Throwable t) {
            if (ignoreParseErrors) {
                return null;
            }
            throw new IOException(
                    format("Failed to deserialize Protobuf '%s'.", new String(bytes)), t);
        }
    }

    ...

可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 接口:

@FunctionalInterface
public interface ProtobufToRowDataConverter extends Serializable {
    Object convert(Object object);
}

其作用就是將 protobuf message 中的每一個字段轉換成為 RowData 中的每一個字段。

ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 字段轉換為 flink 數據類型的邏輯:

圖片

18

源碼公眾號后台回復flink sql 知其所以然(五)| 自定義 protobuf format獲取。

6.總結與展望篇

6.1.總結

本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。如果你正好需要這么一個 format,直接公眾號后台回復flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼吧。

大數據羊說

大數據羊說

用數據提升美好事物發生的概率~

30篇原創內容

公眾號

6.2.展望

當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。

  1. 性能優化、通用化:protobuf java class 本地 codegen 來提高任務性能

  2. 數據質量:異常 AOP,alert 等

往期推薦

[

flink sql 知其所以然(四)| sql api 類型系統

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488788&idx=1&sn=0127fd4037788762a0401313b43b0ea5&chksm=c15499ecf62310fa747c530f722e631570a1b0469af2a693e9f48d3a660aa2c15e610653fe8c&scene=21#wechat_redirect)

[

flink sql 知其所以然(三)| 自定義 redis 數據匯表(附源碼)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488720&idx=1&sn=5695e3691b55a7e40814d0e455dbe92a&chksm=c1549828f623113e9959a382f98dc9033997dd4bdcb127f9fb2fbea046545b527233d4c3510e&scene=21#wechat_redirect)

[

flink sql 知其所以然(二)| 自定義 redis 數據維表(附源碼)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488635&idx=1&sn=41817a078ef456fb036e94072b2383ff&chksm=c1549883f623119559c47047c6d2a9540531e0e6f0b58b155ef9da17e37e32a9c486fe50f8e3&scene=21#wechat_redirect)

[

flink sql 知其所以然(一)| source\sink 原理

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

[

揭秘字節跳動埋點數據實時動態處理引擎(附源碼)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488435&idx=1&sn=5d89a0d24603c08af4be342462409230&chksm=c1549f4bf623165d977426d13a0bdbe821ec8738744d2274613a7ad92dec0256d090aea4b815&scene=21#wechat_redirect)

更多 Flink 實時大數據分析相關技術博文,視頻。后台回復 “flink” 獲取。

點個贊+在看,感謝您的肯定 👇


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM