最近在研究 flink-cdc ,因為想到table api 的 cdc 都是針對單表,如果在同一個數據庫上,有很多表需要實時采集(比如: 100 張表),會不會對 mysql 造成壓力,如果 mysql 數據量又比較大,是不是會對 mysql 所在服務器造成磁盤和網絡的壓力。
對 binlog 有所了解的都知道,binlog 是不區分數據庫和表的,所以在讀取 binlog 的時候,即使只需要一張表的 binlog,也需要解析全部的 binlog 文件,如果 cdc 的表很多,可以想象,資源的消耗是成倍的增加。
基於這樣的問題,有個新的思路,用一個任務把所有需要的表的 binlog 全部解析成 json 發到 kafka 中,將 mysql 的壓力轉嫁到 kafka 上,而mysql 都可以承受的壓力,對 kafka 來說就稱不上是壓力了(不過這樣跟直接部署個 canal 或 Debezium 基本一樣了)。
## 官網案例
flink-cdc 官網 Stream API 案例如下:
官網鏈接: https://github.com/ververica/flink-cdc-connectors/wiki#usage-for-datastream-api
public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("inventory") // monitor all tables under inventory database .username("flinkuser") .password("flinkpw") .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(sourceFunction) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute(); } }
我們需要做的就是將 StringDebeziumDeserializationSchema 修改了,並寫個 kafka sink
## 解析
解析 Debezium 格式的 binlog,官方提供了兩個 DeserializationSchema: StringDebeziumDeserializationSchema 和 RowDataDebeziumDeserializeSchema , StringDebeziumDeserializationSchema 就是輸出 Debezium 的 SourceRecord 的 toString 結果,而 RowDataDebeziumDeserializeSchema 需要預先定義表的 scheme,跟我們的需求不同。
所有,我自己解析了 Debezium 的 SourceRecord,將結果轉成了 json 的,並把一些如: host、端口、數據庫、表等信息加入了其中(可能會有分庫分表)
/** * deserialize debezium format binlog */ public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { private String host; private int port; public CommonStringDebeziumDeserializationSchema(String host, int port) { this.host = host; this.port = port; } public void deserialize(SourceRecord record, Collector<String> out) { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("host", host); jsonObject.addProperty("port", port); jsonObject.addProperty("file", (String) record.sourceOffset().get("file")); jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos")); jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec")); String[] name = record.valueSchema().name().split("\\."); jsonObject.addProperty("db", name[1]); jsonObject.addProperty("table", name[2]); Struct value = ((Struct) record.value()); String operatorType = value.getString("op"); jsonObject.addProperty("operator_type", operatorType); // c : create, u: update, d: delete, r: read // insert update if (!"d".equals(operatorType)) { Struct after = value.getStruct("after"); JsonObject afterJsonObject = parseRecord(after); jsonObject.add("after", afterJsonObject); } // update & delete if ("u".equals(operatorType) || "d".equals(operatorType)) { Struct source = value.getStruct("before"); JsonObject beforeJsonObject = parseRecord(source); jsonObject.add("before", beforeJsonObject); } jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000); out.collect(jsonObject.toString()); } private JsonObject parseRecord(Struct after) { JsonObject jo = new JsonObject(); for (Field field : after.schema().fields()) { switch ((field.schema()).type()) { case INT8: int resultInt8 = after.getInt8(field.name()); jo.addProperty(field.name(), resultInt8); break; case INT64: Long resultInt = after.getInt64(field.name()); jo.addProperty(field.name(), resultInt); break; case FLOAT32: Float resultFloat32 = after.getFloat32(field.name()); jo.addProperty(field.name(), resultFloat32); break; case FLOAT64: Double resultFloat64 = after.getFloat64(field.name()); jo.addProperty(field.name(), resultFloat64); break; case BYTES: // json ignore byte column // byte[] resultByte = after.getBytes(field.name()); // jo.addProperty(field.name(), String.valueOf(resultByte)); break; case STRING: String resultStr = after.getString(field.name()); jo.addProperty(field.name(), resultStr); break; default: } } return jo; } public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
核心方法是 deserialize 解析數據 和 parseRecord 解析表中字段內容
解析出來的數據如下:
插入:
sql : insert into user_log1(user_id, item_id, category_id, behavior, ts) values('venn1', 'item_1', 'category_1', 'read', now());
{"host":"localhost","port":3306,"file":"binlog.000002","pos":13781,"ts_sec":null,"db":"venn","table":"user_log","operator_type":"c",
"after":{"id":16,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619358456000},"parse_time":1619360320}
更新:
sql : update user_log set user_id = 'zhangsan1' where id = 10; {"host":"localhost","port":3306,"file":"binlog.000002","pos":14205,"ts_sec":1619360393,"db":"venn","table":"user_log","operator_type":"u",
"after":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},
"before":{"id":10,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360394}
刪除:
delete from user_log where id = 10;
{"host":"localhost","port":3306,"file":"binlog.000002","pos":14598,"ts_sec":1619360441,"db":"venn","table":"user_log","operator_type":"d","before":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360441}
注: operator_type: c : create, u: update, d: delete, r: read
before 為原始數據, after 為插入、修改后的數據
## sink
由於需要解析的表可能很多,所有單獨寫了個 sink,將不同表的數據,發往不同的 topic,代碼如下:
@Override public void invoke(String element, Context context) { JsonObject jsonObject = parser.parse(element).getAsJsonObject(); String db = jsonObject.get("db").getAsString(); String table = jsonObject.get("table").getAsString(); // topic 不存在就自動創建 String topic = db + "_" + table; ProducerRecord<String, String> record = new ProducerRecord<>(topic, element); kafkaProducer.send(record); }
如果不需要將數據寫到不同的topic,直接用flink 提供的 FlinkkakfaProducer 即可
遇到個問題: MySQL 8 的報錯,不能檢索公鑰,url 中不能指定 allowPublicKeyRetrieval 參數
Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Public Key Retrieval is not allowed at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342) at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321) at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79) at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52) at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350) at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836) at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456) at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246) at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197) at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:230) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:871) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:866) at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:412) at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:328) ... 11 more
在 git 上提了個 issue,看下大佬的回復吧,不行就自己改下源碼,添加這個參數 : https://github.com/ververica/flink-cdc-connectors/issues/173
完整代碼參見 github : https://github.com/springMoon/flink-rookie MySqlBinlogSourceExample
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文