flink Stream Api cdc demo


最近在研究 flink-cdc ,因為想到table api 的 cdc 都是針對單表,如果在同一個數據庫上,有很多表需要實時采集(比如: 100 張表),會不會對 mysql 造成壓力,如果 mysql 數據量又比較大,是不是會對 mysql 所在服務器造成磁盤和網絡的壓力。

對 binlog 有所了解的都知道,binlog 是不區分數據庫和表的,所以在讀取 binlog 的時候,即使只需要一張表的 binlog,也需要解析全部的 binlog 文件,如果 cdc 的表很多,可以想象,資源的消耗是成倍的增加。

基於這樣的問題,有個新的思路,用一個任務把所有需要的表的 binlog 全部解析成 json 發到 kafka 中,將 mysql 的壓力轉嫁到 kafka 上,而mysql 都可以承受的壓力,對 kafka 來說就稱不上是壓力了(不過這樣跟直接部署個 canal 或 Debezium 基本一樣了)。

## 官網案例

flink-cdc 官網 Stream API 案例如下:

官網鏈接: https://github.com/ververica/flink-cdc-connectors/wiki#usage-for-datastream-api

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

我們需要做的就是將 StringDebeziumDeserializationSchema 修改了,並寫個 kafka sink

## 解析

解析 Debezium 格式的 binlog,官方提供了兩個 DeserializationSchema: StringDebeziumDeserializationSchema 和 RowDataDebeziumDeserializeSchema , StringDebeziumDeserializationSchema 就是輸出 Debezium 的 SourceRecord 的 toString 結果,而 RowDataDebeziumDeserializeSchema 需要預先定義表的 scheme,跟我們的需求不同。

所有,我自己解析了 Debezium 的 SourceRecord,將結果轉成了 json 的,並把一些如: host、端口、數據庫、表等信息加入了其中(可能會有分庫分表)

/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

    private String host;
    private int port;


    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();

        jsonObject.addProperty("host", host);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);

        out.collect(jsonObject.toString());
    }

    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }

        return jo;
    }

    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

核心方法是 deserialize 解析數據 和 parseRecord 解析表中字段內容

解析出來的數據如下:

插入:

sql : insert into user_log1(user_id, item_id, category_id, behavior, ts) values('venn1', 'item_1', 'category_1', 'read', now());
{
"host":"localhost","port":3306,"file":"binlog.000002","pos":13781,"ts_sec":null,"db":"venn","table":"user_log","operator_type":"c",
"after":{"id":16,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619358456000},"parse_time":1619360320}

更新:

sql : update user_log set user_id = 'zhangsan1' where id = 10;

{"host":"localhost","port":3306,"file":"binlog.000002","pos":14205,"ts_sec":1619360393,"db":"venn","table":"user_log","operator_type":"u",
"after":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},
"before":{"id":10,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360394}

刪除:

delete from user_log where id = 10;
{
"host":"localhost","port":3306,"file":"binlog.000002","pos":14598,"ts_sec":1619360441,"db":"venn","table":"user_log","operator_type":"d","before":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360441}

注: operator_type: c : create, u: update, d: delete, r: read
before 為原始數據, after 為插入、修改后的數據

## sink

由於需要解析的表可能很多,所有單獨寫了個 sink,將不同表的數據,發往不同的 topic,代碼如下:

@Override
public void invoke(String element, Context context) {

    JsonObject jsonObject = parser.parse(element).getAsJsonObject();
    String db = jsonObject.get("db").getAsString();
    String table = jsonObject.get("table").getAsString();
    // topic 不存在就自動創建
    String topic = db + "_" + table;
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, element);
    kafkaProducer.send(record);
}

如果不需要將數據寫到不同的topic,直接用flink 提供的 FlinkkakfaProducer 即可

遇到個問題: MySQL 8 的報錯,不能檢索公鑰,url 中不能指定 allowPublicKeyRetrieval 參數

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Public Key Retrieval is not allowed
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342)
    at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
    at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:230)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:871)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:866)
    at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:412)
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:328)
    ... 11 more

在 git 上提了個 issue,看下大佬的回復吧,不行就自己改下源碼,添加這個參數 : https://github.com/ververica/flink-cdc-connectors/issues/173

完整代碼參見 github : https://github.com/springMoon/flink-rookie  MySqlBinlogSourceExample

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM