binlog的尋找過程可能的場景如下:
- instance第一次啟動
- 發生數據庫主備切換
- canal server HA情況下的切換
所以這個過程是能夠保證binlog不丟失的關鍵點。
本文從源碼的角度來分析下啟動過程中的binlog尋找過程。
一、流程圖
下圖是根據源碼畫出的流程圖,需要結合源碼分析來一起看。
二、源碼分析
入口在AbstractEventParser的start()方法中,這個start方法其實是instance的整個啟動過程。具體啟動過程中都做了哪些事情,請見另一篇文章的分析。這塊不再贅述。我們主要看的地方是
// 4. 獲取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);
這一行就是獲取binlog的解析位置,也是本文着重要分析的地方。因為我們目前所配置的都是MysqlEventParser,所以我們分析的也是這個類中的相關代碼。
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
if (isGTIDMode()) {
// GTID模式下,CanalLogPositionManager里取最后的gtid,沒有則取instanc配置中的
LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
if (logPosition != null) {
return logPosition.getPostion();
}
if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
return masterPosition;
}
}
EntryPosition startPosition = findStartPositionInternal(connection);
if (needTransactionPosition.get()) {
logger.warn("prepare to find last position : {}", startPosition.toString());
Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
logger.warn("find new start Transaction Position , old : {} , new : {}",
startPosition.getPosition(),
preTransactionStartPosition);
startPosition.setPosition(preTransactionStartPosition);
}
needTransactionPosition.compareAndSet(true, false);
}
return startPosition;
}
2.1 GTID模式
我們目前的數據庫架構一般都是M-S,所以binlog的位點很可能不一致,這就需要開啟數據庫GTID模式(通過在instance.properties中配置canal.instance.gtidon=true即可開啟),這是一個全局的事務ID,能夠防止主從位點不一致的情況下,找不到位點的問題。目前這塊是從CanalLogPositionManager中取最后的GTID。default-instance.xml中,使用的CanalLogPositionManager是FailbackLogPositionManager,一個兩級的位點管理器,XML配置如下:
<!-- 解析位點記錄 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<constructor-arg ref="metaManager"/>
</bean>
</constructor-arg>
</bean>
</property>
一級是放到本地緩存中,第二級直接打了個info日志,有點弱,其實考慮的情況是性能,另一個考慮可能是因為DB的主從切換,並不會導致instance掛掉,內存中還是存儲了之前DB的一些解析位點信息。其實都沒有放到zk中,不利於做HA,所以這塊目前還不是很完善,真正要使用GTID的話,需要對CanalLogPositionManager進行修改。目前已經提供了其他的一些實現,包括定時刷新到zk中等等。
如果CanalLogPositionManager中沒有存儲的話,也可以在instance.properties里面指定位點和GTID信息,也能從binlog中獲取。
2.2 非GTID模式
如果canal沒有開啟GTID模式,那么我們就需要走一個binlog的尋找過程。
EntryPosition startPosition = findStartPositionInternal(connection);
這個方法是個冗長的方法,里面的判斷邏輯就是上面的流程圖,我們來梳理一下。
首先還是從CanalLogPositionManager中獲取,也就是基本上從內存中獲取LogPosition。
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
2.2.1 內存中不存在LogPosition
2.2.1.1
首先判斷配置文件中的主庫信息是否與當前的數據庫連接connection的地址一致,如果一致,如果一致,那么直接取properties文件中的master的位點信息。
2.2.1.2
如果主庫不一致,那么判斷從庫standby的connection地址,如果是從庫,那么直接取從庫的位點信息。
我們可以在xml配置中看到properties的一些信息。
<!-- 解析起始位點 -->
<property name="masterPosition">
<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
<property name="journalName" value="${canal.instance.master.journal.name}" />
<property name="position" value="${canal.instance.master.position}" />
<property name="timestamp" value="${canal.instance.master.timestamp}" />
<property name="gtid" value="${canal.instance.master.gtid}" />
</bean>
</property>
<property name="standbyPosition">
<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
<property name="journalName" value="${canal.instance.standby.journal.name}" />
<property name="position" value="${canal.instance.standby.position}" />
<property name="timestamp" value="${canal.instance.standby.timestamp}" />
<property name="gtid" value="${canal.instance.standby.gtid}" />
</bean>
</property>
2.2.1.3
如果內存中沒有,配置文件中也沒有,那么系統默認從當前時間開始消費。
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最后一個位置進行消費
protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
final EntryPosition endPosition = findEndPosition(mysqlConnection);//獲取當前最新的位點信息
if (tableMetaTSDB != null) {
long startTimestamp = System.currentTimeMillis();
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
endPosition.getJournalName(),
true);
} else {
return endPosition;
}
}
這里的findEndPosition()方法,其實就是執行了一個Mysql命令:
show master status
返回的內容中,包含binlog文件信息和位點position,甚至包括GTID信息。
找到了最新的binlog位點信息后,根據當前時間戳和binlog的時間戳等信息,去服務器上面尋找binlog。其實邏輯基本上都在findAsPerTimestampInSpecificLogFile()中,這個方法是根據時間戳去尋找,離時間戳最近(小於時間戳)的一個事務起始位置。由於這塊的代碼比較長,所以我們只做分析,不做代碼粘貼,具體的代碼在MysqlEventParser這個類中。整個尋找的過程如下:
先看一下這個seek的過程,見代碼注釋:
/**
* 加速主備切換時的查找速度,做一些特殊優化,比如只解析事務頭或者尾
*/
public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
updateSettings();//在mysql中執行一些dump之前的命令
sendBinlogDump(binlogfilename, binlogPosition);//指定位點和binlog文件,發送dump命令,COM_BINLOG_DUMP
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
fetcher.start(connector.getChannel());//開始獲取
LogDecoder decoder = new LogDecoder();
decoder.handle(LogEvent.ROTATE_EVENT);
decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
decoder.handle(LogEvent.QUERY_EVENT);
decoder.handle(LogEvent.XID_EVENT);
LogContext context = new LogContext();
while (fetcher.fetch()) {//遍歷獲取
LogEvent event = null;
event = decoder.decode(fetcher, context);//解析為event
if (event == null) {
throw new CanalParseException("parse failed");
}
if (!func.sink(event)) {//調用SinkFunction.sink()過濾
break;
}
}
}
下面我們看下數據過濾這塊:
- 起始位置為4,也就是跳過一個魔法值,具體可以看binlog的結構說明
- 之后就是一個過濾的過程
- 首先把事件event解析一個entry,這個entry使用的是消息模型EntryProtocol.proto
- 首先判斷當前事件是否為事務開始或者結束的位置,如果是,判斷事件的時間,如果在我們需要的時間之后,直接過濾這條entry
- 如果當前entry的binlog文件名和最新的binlog文件名相同,並且最新的位點小於entry的位點,那么直接過濾
- 如果當前entry的類型表示的是事務開始或者事務結束,那么直接取當前entry的位點信息,利用當前entry構建位點信息,也就是找到了我們需要的事務起點。
2.2.1.4
如果binlog文件名為空,首先判斷時間戳是否存在,如果存在,那么直接按照時間戳去取,否則默認從當前最后一個位置進行消費。
// 如果沒有指定binlogName,嘗試按照timestamp進行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { "", "", entryPosition.getTimestamp() });
return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
logger.warn("prepare to find start position just show master status");
return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最后一個位置進行消費
}
這塊我們看下findByStartTimestamp()這個方法,也就是只根據時間來查找binlog。這塊的邏輯是這樣的:
- 首先獲取最新和最老的binlog文件
- 從最新的binlog中,根據時間去找,調用的方法也是findAsPerTimestampInSpecificLogFile()
- 如果已經從最新的到最老的binlog文件中找遍了,沒找到,說明根本沒有對應時間的binlog
- 否則不斷的遍歷binlog文件,因為binlog文件名的后綴都是連續的,所以可以很快的尋找
2.2.1.5
binlog文件名不為空,首先判斷是否有位點信息,如果有的話,直接根據當前內存中存儲的位點和文件信息去Mysql獲取。
否則,根據當前內存中管理的時間戳去獲取,根據時間戳和binlog文件名去獲取位點。當然,如果時間戳也不存在,直接從binlog文件名的文件開頭去獲取binlog。
2.2.2 內存中存在歷史成功記錄
2.2.2.1 內存中的位點信息對應的數據庫ip和當前連接的ip一致
如果dump錯誤的次數超過了一定的閾值,默認是2次,也就是連續幾次定位失敗,有幾種情況:
- binlog位點被刪除
- vip模式的mysql,發生了主備切換
這種需要進行一次判斷,判斷內容:
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
判斷幾個,第一個配置文件中的standby為空,第二個內存中的logPosition存在數據庫ip,第三個內存中的logPosition的數據庫ip和當前數據庫連接connection的數據庫ip不一致。
滿足這三個條件,說明發生了vip的主備切換,此時需要把logPosition中的時間戳向前推一個回退時間,默認60s,然后根據新的時間戳去找binlog文件和位點信息。
if (case2) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
// 重新置為一下
dumpErrorCount = 0;
return findPosition;
}
2.2.2.2 不一致的情況
說明發生了主從切換,這種情況下,直接把logPosition中的時間回退60s,然后根據回退后的時間去binlog中尋找,然后返回。