Flink-MysqCDC 大事務OOM問題分析與修復
一、背景:
系統剛上線,有些流程還未完全打通,需要通過人工修數的方式來確保業務運行;訂單域和財務域都存在大量的人工修數,大伙干得熱火朝天,夜深人靜的時候,往往忽略了一些基本的准則,在生產環境發現有數據硬刪+幾十萬的大事務更新操作;
這導致了Flink流大面積OOM,嚴重影響了同步到Kudu,深圳電商msyql和ES數據的准確性,時效性。
二、系統相關的配置
Flink:Flink 1.13 + MysqlCDC 2.1,Job Manage 2G,TaskManage 6G
Mysql版本:10.4.18-MariaDB
三、問題定位
如上圖,通過查看TaskManage日志發現,Flink在反序列化event的時候,發生了OOM,並且觸發了多次垃圾回收后也沒什么效果,爆 GC overhead limit exceeded。
四、問題分析
我們跟着錯誤日志去翻閱代碼,看看能不能有什么收獲。
錯誤日志
INFO: Trying to restore lost connection to 10.18.xx.xx:3306
Exception in thread "blc-10.18.57.93:3306" java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:103)
at io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:264)
at io.debezium.connector.mysql.RowDeserializers$UpdateRowsDeserializer.deserializeVarString(RowDeserializers.java:130)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:189)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:143)
at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:72)
at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58)
at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
at java.lang.Thread.run(Thread.java:748)
Dec 15, 2021 10:03:23 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to 10.18.xx.xx:3306 at mysql-bin.007528/291710792 (sid:1570186369, cid:4814696)
BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
// 監聽binlog event數據包的入口
private void listenForEventPackets() throws IOException {
............. 省略
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
........... 省略
}
................ 省略
}
BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
> /**
> Mysql協議規定了packet最大16M,當一個event大於16M的時候,就會分成多個packet來傳輸,
> readPacketSplitInChunks的作用就是接受多個packet,拼成一個完成的event給后續解析
> */
> // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
> private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {
> byte[] result = inputStream.read(packetLength);
> int chunkLength;
> do {
> chunkLength = inputStream.readInteger(3);
> inputStream.skip(1); // 1 byte for sequence
> result = Arrays.copyOf(result, result.length + chunkLength);
> inputStream.fill(result, result.length - chunkLength, chunkLength);
> } while (chunkLength == Packet.MAX_LENGTH);
> return result;
> }
MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)
/**
根據eventHeader來進行反序列化操作;
具體的binlog的協議可以參考:https://blog.51cto.com/u_3522866/2717255
*/
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
if (inputStream.peek() == -1) {
return null;
}
EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);
EventData eventData;
switch (eventHeader.getEventType()) {
case FORMAT_DESCRIPTION:
eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader);
break;
case TABLE_MAP:
eventData = deserializeTableMapEventData(inputStream, eventHeader);
break;
default:
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
}
return new Event(eventHeader, eventData);
}
后續的一些方法細節,都是講怎么去反序列化這個大事務update,最后導致Flink流OOM;
結論:
看完代碼,這塊的代碼,我們可以得出以下的結論:
1、Mysql主從之間的復制(binlog),是通過TCP來傳輸的,在網絡傳輸過程,在server端會按照16M的大小來拆包;
2、Debizium在接受到packet,會有一個合並packet的過程,最終組裝成一個完成的event,給到后續的方法去反序列化。
3、通常意義上,我們理解binlog就是數據一行行的變化,這個其實並不准確,底層是按照event來處理的,並不是一行行來;這里在處理的過程中,其實是沒有保證事務的,一個事務,是可以拆分成多個event的。
有點悲哀,這個故事到這里可能就要結束了,在msyql-cdc源碼層面,我們沒法做什么優化來解決這個大事務OOM的問題。
五、Mysql參數調優
既然我們不能解決問題,是不是可以換個角度,去把產生問題的源頭解決了。
上面也講到,底層既然處理的不是一個事務,處理的是事務分解后的event,那是不是可以通過調整event的大小,來解決呢? (https://www.cnblogs.com/stevenczp/p/6739042.html)
我們找到了其中一個發生OOM的binlog,這里可以看到,一個event幾百M,從sql里面可以看到,一個事務更新15萬的數據
binlog-row-event-max-size參數調整
我們在uat環境通過設置mariaDB參數binlog-row-event-max-size=67108864 (64M),再來做一個大事務的更新,一下子更新了400萬的數據,這個表40個字段
我們再看切割后的event,確實按照我們的預期,把大的event切成小的,勝利的曙光已經看到了。
timeout參數調整
幸福往往來之不易,解決問題的路上也是坎坎坷坷;當event切細后,flink處理大事務爆了另外一個錯誤:connection reset
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: Connection reset
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:59)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:257)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:241)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:111)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:104)
at com.github.shyiko.mysql.binlog.BinaryLogClient.readPacketSplitInChunks(BinaryLogClient.java:988)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:946)
... 3 more
遇到這個問題,很自然想到,debizium模擬的是msyql的從庫,有一方的主動的關閉了鏈接;經過查看debizium的官方問題,沒找到相關的參數,最后還是得從數據庫參數調優下手。
我們先理解下mysql幾個超時參數的含義,具體看blog:https://blog.51cto.com/lookingdream/2513405;
最后我們調整了以下幾個參數,其中最關鍵的是:net_read_timeout和net_write_timeout,給與flink更多時間來處理大事務相關的event
show variables like '%slave_net_timeout%' -- 120
show variables like '%thread_pool_idle_timeout%' -- 120
show variables like '%net_read_timeout%'; -- 120
show variables like '%net_write_timeout%'; -- 120
六、效果
如下圖:一個百萬級的大事務更新,flink流產生了背壓,但是很穩定,處理增量的能力1.2W/s
整體的jvm監控還算健康