一、序列圖
1.1 啟動
1.2 停止
二、源碼分析
2.1 啟動
這部分代碼其實在ServerRunningMonitor的start()方法中。針對不同的destination,啟動不同的CanalInstance。主要的方法在於initRunning()。
private void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 觸發一下事件
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在節點,立即嘗試一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試創建父節點
initRunning();
}
}
首先在zk中新增一個臨時節點,表示的是正在運行destination的ip和端口,然后觸發一下processActiveEnter()。我們主要看下這個方法,在controller啟動時定義的。
public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embededCanalServer.start(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void start(final String destination) {
final CanalInstance canalInstance = canalInstances.get(destination);
if (!canalInstance.isStart()) {
try {
MDC.put("destination", destination);
canalInstance.start();
logger.info("start CanalInstances[{}] successfully", destination);
} finally {
MDC.remove("destination");
}
}
}
主要在embededCanalServer.start中,我們看下這個canalInstance.start(),跟蹤到AbstractCanalInstance。
2.1.1 啟動metaManager
在默認的instance配置文件中,我們選擇的metaManager是PeriodMixedMetaManager,定時(默認1s)刷新數據到zk中,所以我們主要關注這個類的start方法。這個類繼承了MemoryMetaManager,首先啟動一個MemoryMetaManager,然后再啟動一個ZooKeeperMetaManager。
2.1.1.1 獲取所有destination和client
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
public List<ClientIdentity> apply(String destination) {
return zooKeeperMetaManager.listAllSubscribeInfo(destination);
}
});
從/otter/canal/destinations/{destination}獲取所有的client信息,返回的內容是List
2.1.1.2 獲取client指針cursor
根據ClientIdentity去zk獲取指針,從zk的/otter/canal/destinations/{destination}/{clientId}/cursor下面去獲取,返回的內容是個LogPosition。
cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() {
public Position apply(ClientIdentity clientIdentity) {
Position position = zooKeeperMetaManager.getCursor(clientIdentity);
if (position == null) {
return nullCursor; // 返回一個空對象標識,避免出現異常
} else {
return position;
}
}
});
有可能返回一個空。
2.1.1.3 獲取批次batch
創建一個基於內存的MemoryClientIdentityBatch,包含位點的start、end、ack信息。然后從zk節點/otter/canal/destinations/{destination}/{clientId}/mark獲取,取出來的數據進行排序,然后從/otter/canal/destinations/{destination}/{clientId}/mark/{batchId}中取出PositionRange這個類,描述的是一個position的范圍。
batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
// 讀取一下zookeeper信息,初始化一次
MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加記錄到指定batchId
}
return batches;
}
});
2.1.1.4 啟動定時刷zk任務
// 啟動定時工作任務
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
List<ClientIdentity> tasks = new ArrayList<ClientIdentity>(updateCursorTasks);
for (ClientIdentity clientIdentity : tasks) {
try {
// 定時將內存中的最新值刷到zookeeper中,多次變更只刷一次
zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
updateCursorTasks.remove(clientIdentity);
} catch (Throwable e) {
// ignore
logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
}
}
}
}, period, period, TimeUnit.MILLISECONDS);
定時刷新position到zk后,從任務中刪除。刷新的頻率為1s。
2.1.2 啟動alarmHandler
這塊比較簡單。
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
其實默認是LogAlarmHandler,用於發送告警信息的。
2.1.3 啟動eventStore
啟動EventStore,默認是MemoryEventStoreWithBuffer。start方法也比較簡單。
public void start() throws CanalStoreException {
super.start();
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
indexMask = bufferSize - 1;
entries = new Event[bufferSize];
}
2.1.4 啟動eventSink
這塊默認是EntryEventSink。這塊也不復雜。
public void start() {
super.start();
Assert.notNull(eventStore);
for (CanalEventDownStreamHandler handler : getHandlers()) {
if (!handler.isStart()) {
handler.start();
}
}
}
正常的啟動,將running狀態置為true。
2.1.5 啟動eventParser
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
我們分別看下。
2.1.5.1 beforeStartEventParser
protected void beforeStartEventParser(CanalEventParser eventParser) {
boolean isGroup = (eventParser instanceof GroupEventParser);
if (isGroup) {
// 處理group的模式
List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
for (CanalEventParser singleEventParser : eventParsers) {// 需要遍歷啟動
startEventParserInternal(singleEventParser, true);
}
} else {
startEventParserInternal(eventParser, false);
}
}
判斷是不是集群的parser(用於分庫),如果是GroupParser,需要一個個啟動CanalEventParser。我們主要看下startEventParserInternal方法。我們只關注MysqlEventParser,因為他支持HA。
if (eventParser instanceof MysqlEventParser) {
MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
CanalHAController haController = mysqlEventParser.getHaController();
if (haController instanceof HeartBeatHAController) {
((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
}
if (!haController.isStart()) {
haController.start();
}
}
啟動一個HeartBeatHAController。主要作用是用於當parser失敗次數超過閾值時,執行mysql的主備切換。
2.1.5.2 eventParser.start()
這里也區分是GroupParser還是單個的MysqlParser,其實最終都是啟動Parser,不過前者是啟動多個而已。我們看下單個的start方法。具體實現在AbstractMysqlEventParser中
public void start() throws CanalParseException {
if (enableTsdb) {
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
}
}
super.start();
}
首先如果啟用了Tsdb功能(也就是DDL后表結構的回溯),那么需要從xml中初始化表結構源數據,然后調用AbstractEventParser的start方法。
- 首先初始化緩沖隊列transactionBuffer,默認隊列長度為1024
- 初始化BinlogParser,將其running狀態置為true
- 啟動工作線程parseThread,開始訂閱binlog,這個線程中做的事在下一篇文章中有。
2.1.5.3 afterStartEventParser
protected void afterStartEventParser(CanalEventParser eventParser) {
// 讀取一下歷史訂閱的filter信息
List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
for (ClientIdentity clientIdentity : clientIdentitys) {
subscribeChange(clientIdentity);
}
}
這塊訂閱的主要是filter的變化。
public boolean subscribeChange(ClientIdentity identity) {
if (StringUtils.isNotEmpty(identity.getFilter())) {
logger.info("subscribe filter change to " + identity.getFilter());
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
boolean isGroup = (eventParser instanceof GroupEventParser);
if (isGroup) {
// 處理group的模式
List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
for (CanalEventParser singleEventParser : eventParsers) {// 需要遍歷啟動
((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
}
} else {
((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
}
}
// filter的處理規則
// a. parser處理數據過濾處理
// b. sink處理數據的路由&分發,一份parse數據經過sink后可以分發為多份,每份的數據可以根據自己的過濾規則不同而有不同的數據
// 后續內存版的一對多分發,可以考慮
return true;
}
至此,CanalInstance啟動成功。
2.2 停止
同樣的,停止的觸發也是在ServerRunningMonitor中,停止的代碼如下:
public void stop() {
super.stop();
logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });
if (eventParser.isStart()) {
beforeStopEventParser(eventParser);
eventParser.stop();
afterStopEventParser(eventParser);
}
if (eventSink.isStart()) {
eventSink.stop();
}
if (eventStore.isStart()) {
eventStore.stop();
}
if (metaManager.isStart()) {
metaManager.stop();
}
if (alarmHandler.isStart()) {
alarmHandler.stop();
}
logger.info("stop successful....");
}
2.2.1 停止EventParser
和啟動一樣,在前后也可以做一些事情。
- 停止前,目前默認什么都不做;
- 停止時,我們主要看MysqlEventParser
- 首先斷開mysql的連接
- 清理緩存中表結構源數據tableMetaCache.clearTableMeta()
- 調用AbstractMysqlEventParser的stop方法,首先從spring上下文中,刪除tableMetaTSDB。然后調用AbstractEventParser中的stop方法。
public void stop() {
super.stop();
stopHeartBeat(); // 先停止心跳
parseThread.interrupt(); // 嘗試中斷
eventSink.interrupt();
try {
parseThread.join();// 等待其結束
} catch (InterruptedException e) {
// ignore
}
if (binlogParser.isStart()) {
binlogParser.stop();
}
if (transactionBuffer.isStart()) {
transactionBuffer.stop();
}
}
首先關閉心跳的定時器,然后中斷解析線程,等待當前運行的任務結束后,停止binlogParser,清空transactionBuffer。這里看下怎么清空transactionBuffer的。
public void stop() throws CanalStoreException {
putSequence.set(INIT_SQEUENCE);
flushSequence.set(INIT_SQEUENCE);
entries = null;
super.stop();
}
將put和flush的序列置為初始序列,也就是不再允許向隊列中put數據。
停止parser后,停止位點管理和HAController。其實只是將running置為false。
2.2.2 停止EventSink
類似於啟動,停止也不復雜。
public void stop() {
super.stop();
for (CanalEventDownStreamHandler handler : getHandlers()) {
if (handler.isStart()) {
handler.stop();
}
}
}
2.2.3 停止EventStore
主要部分在這邊
public void cleanAll() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
putSequence.set(INIT_SQEUENCE);
getSequence.set(INIT_SQEUENCE);
ackSequence.set(INIT_SQEUENCE);
putMemSize.set(0);
getMemSize.set(0);
ackMemSize.set(0);
entries = null;
// for (int i = 0; i < entries.length; i++) {
// entries[i] = null;
// }
} finally {
lock.unlock();
}
}
其實也是將RingBuffer的指針置為初始值。
2.2.4 停止metaManager
我們看下PeriodMixedMetaManager。主要調用了兩塊的stop,一個是MemoryMetaManager,另一個是ZooKeeperMetaManager。清理內存中的數據,然后讓zk的管理器running狀態改為false。
2.2.5 停止alarmHandler
將running狀態置為false。