Flink-MysqCDC 大事務OOM問題分析與修復


Flink-MysqCDC 大事務OOM問題分析與修復
一、背景:
系統剛上線,有些流程還未完全打通,需要通過人工修數的方式來確保業務運行;訂單域和財務域都存在大量的人工修數,大伙干得熱火朝天,夜深人靜的時候,往往忽略了一些基本的准則,在生產環境發現有數據硬刪+幾十萬的大事務更新操作;
這導致了Flink流大面積OOM,嚴重影響了同步到Kudu,深圳電商msyql和ES數據的准確性,時效性。

二、系統相關的配置
Flink:Flink 1.13 + MysqlCDC 2.1,Job Manage 2G,TaskManage 6G
Mysql版本:10.4.18-MariaDB

三、問題定位

如上圖,通過查看TaskManage日志發現,Flink在反序列化event的時候,發生了OOM,並且觸發了多次垃圾回收后也沒什么效果,爆 GC overhead limit exceeded。

四、問題分析
我們跟着錯誤日志去翻閱代碼,看看能不能有什么收獲。
錯誤日志

INFO: Trying to restore lost connection to 10.18.xx.xx:3306
Exception in thread "blc-10.18.57.93:3306" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:103)
        at io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:264)
        at io.debezium.connector.mysql.RowDeserializers$UpdateRowsDeserializer.deserializeVarString(RowDeserializers.java:130)
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:189)
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:143)
        at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:72)
        at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58)
        at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
        at java.lang.Thread.run(Thread.java:748)
Dec 15, 2021 10:03:23 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to 10.18.xx.xx:3306 at mysql-bin.007528/291710792 (sid:1570186369, cid:4814696)

BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)

// 監聽binlog event數據包的入口
private void listenForEventPackets() throws IOException {
    ............. 省略
    try {
        event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
            new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
            inputStream);
        if (event == null) {
            throw new EOFException();
        }
    } catch (Exception e) {
        ........... 省略
    }
    ................ 省略
}

BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)

> /**
>  Mysql協議規定了packet最大16M,當一個event大於16M的時候,就會分成多個packet來傳輸,
>  readPacketSplitInChunks的作用就是接受多個packet,拼成一個完成的event給后續解析
> */
> // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
> private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {
>     byte[] result = inputStream.read(packetLength);
>     int chunkLength;
>     do {
>         chunkLength = inputStream.readInteger(3);
>         inputStream.skip(1); // 1 byte for sequence
>         result = Arrays.copyOf(result, result.length + chunkLength);
>         inputStream.fill(result, result.length - chunkLength, chunkLength);
>     } while (chunkLength == Packet.MAX_LENGTH);
>     return result;
> }

MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)

/**
根據eventHeader來進行反序列化操作;
具體的binlog的協議可以參考:https://blog.51cto.com/u_3522866/2717255
*/
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
    if (inputStream.peek() == -1) {
        return null;
    }
    EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);
    EventData eventData;
    switch (eventHeader.getEventType()) {
        case FORMAT_DESCRIPTION:
            eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader);
            break;
        case TABLE_MAP:
            eventData = deserializeTableMapEventData(inputStream, eventHeader);
            break;
        default:
            EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
            eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
    }
    return new Event(eventHeader, eventData);
}

后續的一些方法細節,都是講怎么去反序列化這個大事務update,最后導致Flink流OOM;

結論:
看完代碼,這塊的代碼,我們可以得出以下的結論:
1、Mysql主從之間的復制(binlog),是通過TCP來傳輸的,在網絡傳輸過程,在server端會按照16M的大小來拆包;
2、Debizium在接受到packet,會有一個合並packet的過程,最終組裝成一個完成的event,給到后續的方法去反序列化。
3、通常意義上,我們理解binlog就是數據一行行的變化,這個其實並不准確,底層是按照event來處理的,並不是一行行來;這里在處理的過程中,其實是沒有保證事務的,一個事務,是可以拆分成多個event的。

有點悲哀,這個故事到這里可能就要結束了,在msyql-cdc源碼層面,我們沒法做什么優化來解決這個大事務OOM的問題。

五、Mysql參數調優
既然我們不能解決問題,是不是可以換個角度,去把產生問題的源頭解決了。

上面也講到,底層既然處理的不是一個事務,處理的是事務分解后的event,那是不是可以通過調整event的大小,來解決呢? (https://www.cnblogs.com/stevenczp/p/6739042.html)

我們找到了其中一個發生OOM的binlog,這里可以看到,一個event幾百M,從sql里面可以看到,一個事務更新15萬的數據

binlog-row-event-max-size參數調整
我們在uat環境通過設置mariaDB參數binlog-row-event-max-size=67108864 (64M),再來做一個大事務的更新,一下子更新了400萬的數據,這個表40個字段

我們再看切割后的event,確實按照我們的預期,把大的event切成小的,勝利的曙光已經看到了。

timeout參數調整
幸福往往來之不易,解決問題的路上也是坎坎坷坷;當event切細后,flink處理大事務爆了另外一個錯誤:connection reset

com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: Connection reset
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
        ... 5 more
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:59)
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:257)
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:241)
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:111)
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:104)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.readPacketSplitInChunks(BinaryLogClient.java:988)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:946)
        ... 3 more

遇到這個問題,很自然想到,debizium模擬的是msyql的從庫,有一方的主動的關閉了鏈接;經過查看debizium的官方問題,沒找到相關的參數,最后還是得從數據庫參數調優下手。

我們先理解下mysql幾個超時參數的含義,具體看blog:https://blog.51cto.com/lookingdream/2513405;
最后我們調整了以下幾個參數,其中最關鍵的是:net_read_timeout和net_write_timeout,給與flink更多時間來處理大事務相關的event

show variables like '%slave_net_timeout%' -- 120
show variables like '%thread_pool_idle_timeout%' -- 120
show variables like '%net_read_timeout%'; -- 120
show variables like '%net_write_timeout%'; -- 120 

六、效果
如下圖:一個百萬級的大事務更新,flink流產生了背壓,但是很穩定,處理增量的能力1.2W/s

整體的jvm監控還算健康


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM