【源碼分析】Canal之Binlog的尋找過程


binlog的尋找過程可能的場景如下:

  • instance第一次啟動
  • 發生數據庫主備切換
  • canal server HA情況下的切換

所以這個過程是能夠保證binlog不丟失的關鍵點。

本文從源碼的角度來分析下啟動過程中的binlog尋找過程。

一、流程圖

下圖是根據源碼畫出的流程圖,需要結合源碼分析來一起看。
流程圖

二、源碼分析

入口在AbstractEventParser的start()方法中,這個start方法其實是instance的整個啟動過程。具體啟動過程中都做了哪些事情,請見另一篇文章的分析。這塊不再贅述。我們主要看的地方是

// 4. 獲取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);

這一行就是獲取binlog的解析位置,也是本文着重要分析的地方。因為我們目前所配置的都是MysqlEventParser,所以我們分析的也是這個類中的相關代碼。

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
    if (isGTIDMode()) {
        // GTID模式下,CanalLogPositionManager里取最后的gtid,沒有則取instanc配置中的
        LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
        if (logPosition != null) {
            return logPosition.getPostion();
        }

        if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
            return masterPosition;
        }
    }

    EntryPosition startPosition = findStartPositionInternal(connection);
    if (needTransactionPosition.get()) {
        logger.warn("prepare to find last position : {}", startPosition.toString());
        Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
        if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
            logger.warn("find new start Transaction Position , old : {} , new : {}",
                startPosition.getPosition(),
                preTransactionStartPosition);
            startPosition.setPosition(preTransactionStartPosition);
        }
        needTransactionPosition.compareAndSet(true, false);
    }
    return startPosition;
}

2.1 GTID模式

我們目前的數據庫架構一般都是M-S,所以binlog的位點很可能不一致,這就需要開啟數據庫GTID模式(通過在instance.properties中配置canal.instance.gtidon=true即可開啟),這是一個全局的事務ID,能夠防止主從位點不一致的情況下,找不到位點的問題。目前這塊是從CanalLogPositionManager中取最后的GTID。default-instance.xml中,使用的CanalLogPositionManager是FailbackLogPositionManager,一個兩級的位點管理器,XML配置如下:

<!-- 解析位點記錄 -->
<property name="logPositionManager">
	<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
		<constructor-arg>
			<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
		</constructor-arg>
		<constructor-arg>
			<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
				<constructor-arg ref="metaManager"/>
			</bean>
		</constructor-arg>
	</bean>
</property>

一級是放到本地緩存中,第二級直接打了個info日志,有點弱,其實考慮的情況是性能,另一個考慮可能是因為DB的主從切換,並不會導致instance掛掉,內存中還是存儲了之前DB的一些解析位點信息。其實都沒有放到zk中,不利於做HA,所以這塊目前還不是很完善,真正要使用GTID的話,需要對CanalLogPositionManager進行修改。目前已經提供了其他的一些實現,包括定時刷新到zk中等等。

如果CanalLogPositionManager中沒有存儲的話,也可以在instance.properties里面指定位點和GTID信息,也能從binlog中獲取。

2.2 非GTID模式

如果canal沒有開啟GTID模式,那么我們就需要走一個binlog的尋找過程。

EntryPosition startPosition = findStartPositionInternal(connection);

這個方法是個冗長的方法,里面的判斷邏輯就是上面的流程圖,我們來梳理一下。

首先還是從CanalLogPositionManager中獲取,也就是基本上從內存中獲取LogPosition。

LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);

2.2.1 內存中不存在LogPosition

2.2.1.1

首先判斷配置文件中的主庫信息是否與當前的數據庫連接connection的地址一致,如果一致,如果一致,那么直接取properties文件中的master的位點信息。

2.2.1.2

如果主庫不一致,那么判斷從庫standby的connection地址,如果是從庫,那么直接取從庫的位點信息。

我們可以在xml配置中看到properties的一些信息。

<!-- 解析起始位點 -->
<property name="masterPosition">
	<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
	    <property name="journalName" value="${canal.instance.master.journal.name}" />
		<property name="position" value="${canal.instance.master.position}" />
		<property name="timestamp" value="${canal.instance.master.timestamp}" />
		<property name="gtid" value="${canal.instance.master.gtid}" />
	</bean>
</property>
<property name="standbyPosition">
	<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
		<property name="journalName" value="${canal.instance.standby.journal.name}" />
		<property name="position" value="${canal.instance.standby.position}" />
		<property name="timestamp" value="${canal.instance.standby.timestamp}" />
		<property name="gtid" value="${canal.instance.standby.gtid}" />
	</bean>
</property>

2.2.1.3

如果內存中沒有,配置文件中也沒有,那么系統默認從當前時間開始消費。

entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最后一個位置進行消費

protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
    MysqlConnection mysqlConnection = (MysqlConnection) connection;
    final EntryPosition endPosition = findEndPosition(mysqlConnection);//獲取當前最新的位點信息
    if (tableMetaTSDB != null) {
        long startTimestamp = System.currentTimeMillis();
        return findAsPerTimestampInSpecificLogFile(mysqlConnection,
            startTimestamp,
            endPosition,
            endPosition.getJournalName(),
            true);
    } else {
        return endPosition;
    }
}

這里的findEndPosition()方法,其實就是執行了一個Mysql命令:

show master status

返回的內容中,包含binlog文件信息和位點position,甚至包括GTID信息。

找到了最新的binlog位點信息后,根據當前時間戳和binlog的時間戳等信息,去服務器上面尋找binlog。其實邏輯基本上都在findAsPerTimestampInSpecificLogFile()中,這個方法是根據時間戳去尋找,離時間戳最近(小於時間戳)的一個事務起始位置。由於這塊的代碼比較長,所以我們只做分析,不做代碼粘貼,具體的代碼在MysqlEventParser這個類中。整個尋找的過程如下:

先看一下這個seek的過程,見代碼注釋:

/**
 * 加速主備切換時的查找速度,做一些特殊優化,比如只解析事務頭或者尾
 */
public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();//在mysql中執行一些dump之前的命令

    sendBinlogDump(binlogfilename, binlogPosition);//指定位點和binlog文件,發送dump命令,COM_BINLOG_DUMP
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());//開始獲取
    LogDecoder decoder = new LogDecoder();
    decoder.handle(LogEvent.ROTATE_EVENT);
    decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
    decoder.handle(LogEvent.QUERY_EVENT);
    decoder.handle(LogEvent.XID_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {//遍歷獲取
        LogEvent event = null;
        event = decoder.decode(fetcher, context);//解析為event

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {//調用SinkFunction.sink()過濾
            break;
        }
    }
}

下面我們看下數據過濾這塊:

  • 起始位置為4,也就是跳過一個魔法值,具體可以看binlog的結構說明
  • 之后就是一個過濾的過程
    • 首先把事件event解析一個entry,這個entry使用的是消息模型EntryProtocol.proto
    • 首先判斷當前事件是否為事務開始或者結束的位置,如果是,判斷事件的時間,如果在我們需要的時間之后,直接過濾這條entry
    • 如果當前entry的binlog文件名和最新的binlog文件名相同,並且最新的位點小於entry的位點,那么直接過濾
    • 如果當前entry的類型表示的是事務開始或者事務結束,那么直接取當前entry的位點信息,利用當前entry構建位點信息,也就是找到了我們需要的事務起點。

2.2.1.4

如果binlog文件名為空,首先判斷時間戳是否存在,如果存在,那么直接按照時間戳去取,否則默認從當前最后一個位置進行消費。

// 如果沒有指定binlogName,嘗試按照timestamp進行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
    logger.warn("prepare to find start position {}:{}:{}",
        new Object[] { "", "", entryPosition.getTimestamp() });
    return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
    logger.warn("prepare to find start position just show master status");
    return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最后一個位置進行消費
}

這塊我們看下findByStartTimestamp()這個方法,也就是只根據時間來查找binlog。這塊的邏輯是這樣的:

  • 首先獲取最新和最老的binlog文件
  • 從最新的binlog中,根據時間去找,調用的方法也是findAsPerTimestampInSpecificLogFile()
  • 如果已經從最新的到最老的binlog文件中找遍了,沒找到,說明根本沒有對應時間的binlog
  • 否則不斷的遍歷binlog文件,因為binlog文件名的后綴都是連續的,所以可以很快的尋找

2.2.1.5

binlog文件名不為空,首先判斷是否有位點信息,如果有的話,直接根據當前內存中存儲的位點和文件信息去Mysql獲取。

否則,根據當前內存中管理的時間戳去獲取,根據時間戳和binlog文件名去獲取位點。當然,如果時間戳也不存在,直接從binlog文件名的文件開頭去獲取binlog。

2.2.2 內存中存在歷史成功記錄

2.2.2.1 內存中的位點信息對應的數據庫ip和當前連接的ip一致

如果dump錯誤的次數超過了一定的閾值,默認是2次,也就是連續幾次定位失敗,有幾種情況:

  • binlog位點被刪除
  • vip模式的mysql,發生了主備切換

這種需要進行一次判斷,判斷內容:

 boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                && logPosition.getPostion().getServerId() != null
                && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));

判斷幾個,第一個配置文件中的standby為空,第二個內存中的logPosition存在數據庫ip,第三個內存中的logPosition的數據庫ip和當前數據庫連接connection的數據庫ip不一致。

滿足這三個條件,說明發生了vip的主備切換,此時需要把logPosition中的時間戳向前推一個回退時間,默認60s,然后根據新的時間戳去找binlog文件和位點信息。

if (case2) {
    long timestamp = logPosition.getPostion().getTimestamp();
    long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
    logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
            logPosition.getPostion().getTimestamp() });
    EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
    // 重新置為一下
    dumpErrorCount = 0;
    return findPosition;
}

2.2.2.2 不一致的情況

說明發生了主從切換,這種情況下,直接把logPosition中的時間回退60s,然后根據回退后的時間去binlog中尋找,然后返回。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM