8月份 FlinkCDC 發布2.0.0版本,相較於1.0版本,在全量讀取階段支持分布式讀取、支持checkpoint,且在全量 + 增量讀取的過程在不鎖表的情況下保障數據一致性。
Flink CDC2.0 數據讀取邏輯並不復雜,復雜的是 FLIP-27: Refactor Source Interface 的設計及對Debezium Api的不了解。本文重點對 Flink CDC 的處理邏輯進行介紹, FLIP-27 的設計及 Debezium 的API調用不做過多講解。
本文先以Flink SQL 案例來介紹Flink CDC2.0的使用,接着介紹CDC中的核心設計包含切片划分、切分讀取、增量讀取,最后對數據處理過程中涉及flink-mysql-cdc 接口的調用及實現進行代碼講解。
案例
全量讀取+增量讀取 Mysql表數據,以changelog-json 格式寫入kafka,觀察 RowKind 類型及影響的數據條數。
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
env.setParallelism(3);
// note: 增量同步需要開啟CK
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key(order_id) NOT ENFORCED" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'cdc',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'demo_orders'," +
// 全量 + 增量同步
" 'scan.startup.mode' = 'initial' " +
" )");
tableEnvironment.executeSql("CREATE TABLE sink (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key (order_id) NOT ENFORCED " +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'topic' = 'mqTest02',\n" +
" 'format' = 'changelog-json' "+
")");
tableEnvironment.executeSql("insert into sink select * from demoOrders");}
全量數據輸出:
{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
修改表數據,增量捕獲:
## 更新 1005 的值
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
## 刪除 1000
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}
核心設計
切片划分
全量階段數據讀取方式為分布式讀取,會先對當前表數據按主鍵划分成多個Chunk,后續子任務讀取Chunk 區間內的數據。根據主鍵列是否為自增整數類型,對表數據划分為均勻分布的Chunk及非均勻分布的Chunk。
均勻分布
主鍵列自增且類型為整數類型(int,bigint,decimal)。查詢出主鍵列的最小值,最大值,按 chunkSize 大小將數據均勻划分,因為主鍵為整數類型,根據當前chunk 起始位置、chunkSize大小,直接計算chunk 的結束位置。
// 計算主鍵列數據區間
select min(`order_id`), max(`order_id`) from demo_orders;
// 將數據划分為 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)
非均勻分布
主鍵列非自增或者類型為非整數類型。主鍵為非數值類型,每次划分需要對未划分的數據按主鍵進行升序排列,取出前 chunkSize 的最大值為當前 chunk 的結束位置。
// 未拆分的數據排序后,取 chunkSize 條數據取最大值,作為切片的終止位置。
chunkend = SELECT MAX(`order_id`) FROM (
SELECT `order_id` FROM `demo_orders`
WHERE `order_id` >= [前一個切片的起始位置]
ORDER BY `order_id` ASC
LIMIT [chunkSize]
) AS T
全量切片數據讀取
Flink 將表數據划分為多個Chunk,子任務在不加鎖的情況下,並行讀取 Chunk數據。因為全程無鎖在數據分片讀取過程中,可能有其他事務對切片范圍內的數據進行修改,此時無法保證數據一致性。因此,在全量階段Flink 使用快照記錄讀取+Binlog數據修正的方式來保證數據的一致性。
快照讀取
通過JDBC執行SQL查詢切片范圍的數據記錄。
## 快照記錄數據讀取SQL
SELECT * FROM `test`.`demo_orders`
WHERE order_id >= [chunkStart]
AND NOT (order_id = [chunkEnd])
AND order_id <= [chunkEnd]
數據修正
在快照讀取操作前、后執行 SHOW MASTER STATUS 查詢binlog文件的當前偏移量,在快照讀取完畢后,查詢區間內的binlog數據並對讀取的快照記錄進行修正。
快照讀取+Binlog數據讀取時的數據組織結構。
BinlogEvents 修正 SnapshotEvents 規則。
-
未讀取到binlog數據,即在執行select階段沒有其他事務進行操作,直接下發所有快照記錄。
-
讀取到binlog數據,且變更的數據記錄不屬於當前切片,下發快照記錄。
-
讀取到binlog數據,且數據記錄的變更屬於當前切片。delete 操作從快照內存中移除該數據,insert 操作向快照內存添加新的數據,update操作向快照內存中添加變更記錄,最終會輸出更新前后的兩條記錄到下游。
修正后的數據組織結構:
以讀取切片[1,11)范圍的數據為例,描述切片數據的處理過程。c,d,u代表Debezium捕獲到的新增、刪除、更新操作。
修正前數據及結構:
修正后數據及結構:
單個切片數據處理完畢后會向 SplitEnumerator 發送已完成切片數據的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。
單個切片數據處理完畢后會向 SplitEnumerator 發送已完成切片數據的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用來為增量讀取指定起始偏移量。
增量切片數據讀取
全量階段切片數據讀取完成后,SplitEnumerator 會下發一個 BinlogSplit 進行增量數據讀取。BinlogSplit讀取最重要的屬性就是起始偏移量,偏移量如果設置過小下游可能會有重復數據,偏移量如果設置過大下游可能是已超期的臟數據。而 Flink CDC增量讀取的起始偏移量為所有已完成的全量切片最小的Binlog偏移量,只有滿足條件的數據才被下發到下游。
數據下發條件:
- 捕獲的Binlog數據的偏移量 > 數據所屬分片的Binlog的最大偏移量。
例如,SplitEnumerator 保留的已完成切片信息為。
增量讀取時,從偏移量 800 開始讀取Binlog數據 ,當捕獲到數據 <data:123, offset:1500> 時,先找到 123 所屬快照分片,並找到對應的最大Binlog 偏移量 800。當前偏移量大於快照讀的最大偏移量,則下發數據,否則直接丟棄。
代碼詳解
關於 FLIP-27: Refactor Source Interface 設計不做詳細介紹,本文側重對 flink-mysql-cdc 接口調用及實現進行講解。
MySqlSourceEnumerator 初始化
SourceCoordinator作為OperatorCoordinator對Source的實現,運行在Master節點,在啟動時通過調用MySqlParallelSource#createEnumerator 創建 MySqlSourceEnumerator 並調用start方法,做一些初始化工作。
- 創建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 對全量+增量數據進行切片,使用 MySqlValidator 對 mysql 版本、配置進行校驗。
2.MySqlValidator 校驗:
- mysql版本必須大於等於5.7。
- binlog_format 配置必須為 ROW。
- binlog_row_image 配置必須為 FULL。
3.MySqlSplitAssigner 初始化:
- 創建 ChunkSplitter用來划分切片。
- 篩選出要讀的表名稱。
4.啟動周期調度線程,要求 SourceReader 向 SourceEnumerator 發送已完成但未發送ACK事件的切片信息。
private void syncWithReaders(int[] subtaskIds, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t);
}
// when the SourceEnumerator restores or the communication failed between
// SourceEnumerator and SourceReader, it may missed some notification event.
// tell all SourceReader(s) to report there finished but unacked splits.
if (splitAssigner.waitingForFinishedSplits()) {
for (int subtaskId : subtaskIds) {
// note: 發送 FinishedSnapshotSplitsRequestEvent
context.sendEventToSourceReader(
subtaskId, new FinishedSnapshotSplitsRequestEvent());
}
}
}
MySqlSourceReader 初始化
SourceOperator 集成了SourceReader,通過OperatorEventGateway 和 SourceCoordinator 進行交互。
1. SourceOperator 在初始化時,通過 MySqlParallelSource 創建 MySqlSourceReader。MySqlSourceReader 通過 SingleThreadFetcherManager 創建Fetcher拉取分片數據,數據以 MySqlRecords 格式寫入到 elementsQueue。
MySqlParallelSource#createReader
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext) throws Exception {
// note: 數據存儲隊列
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final Configuration readerConfiguration = getReaderConfig(readerContext);
// note: Split Reader 工廠類
Supplier<MySqlSplitReader> splitReaderSupplier =
() -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask());
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
new MySqlRecordEmitter<>(deserializationSchema),
readerConfiguration,
readerContext);
}
2.將創建的 MySqlSourceReader 以事件的形式傳遞給 SourceCoordinator 進行注冊。SourceCoordinator 接收到注冊事件后,將reader 地址及索引進行保存。
SourceCoordinator#handleReaderRegistrationEvent
// note: SourceCoordinator 處理Reader 注冊事件
private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
enumerator.addReader(event.subtaskId());
}
3.MySqlSourceReader 啟動后會向 MySqlSourceEnumerator 發送請求分片事件,從而收集分配的切片數據。
4.SourceOperator 初始化完畢后,調用 emitNext 由 SourceReaderBase 從 elementsQueue 獲取數據集合並下發給 MySqlRecordEmitter。接口調用示意圖:
MySqlSourceEnumerator 處理分片請求
MySqlSourceReader 啟動時會向 MySqlSourceEnumerator 發送請求 RequestSplitEvent 事件,根據返回的切片范圍讀取區間數據。MySqlSourceEnumerator 全量讀取階段分片請求處理邏輯,最終返回一個MySqlSnapshotSplit。
1.處理切片請求事件,為請求的Reader分配切片,通過發送AddSplitEvent時間傳遞MySqlSplit(全量階段MySqlSnapshotSplit、增量階段MySqlBinlogSplit)。
MySqlSourceEnumerator#handleSplitRequest
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
// note: 將reader所屬的subtaskId存儲到TreeSet, 在處理binlog split時優先分配個task-0
readersAwaitingSplit.add(subtaskId);
assignSplits();
}
// note: 分配切片
private void assignSplits() {
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}
//note: 由 MySqlSplitAssigner 分配切片
Optional<MySqlSplit> split = splitAssigner.getNext();
if (split.isPresent()) {
final MySqlSplit mySqlSplit = split.get();
// note: 發送AddSplitEvent, 為 Reader 返回切片信息
context.assignSplit(mySqlSplit, nextAwaiting);
awaitingReader.remove();
LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
break;
}
}
}
2.MySqlHybridSplitAssigner 處理全量切片、增量切片的邏輯。
- 任務剛啟動時,remainingTables不為空,noMoreSplits返回值為false,創建 SnapshotSplit。
- 全量階段分片讀取完成后,noMoreSplits返回值為true, 創建 BinlogSplit。
MySqlHybridSplitAssigner#getNext
@Override
public Optional<MySqlSplit> getNext() {
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
if (isBinlogSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;
//note: snapshot split 切片完成后,創建BinlogSplit。
return Optional.of(createBinlogSplit());
} else {
// binlog split is not ready by now
return Optional.empty();
}
} else {
// note: 由MySqlSnapshotSplitAssigner 創建 SnapshotSplit
// snapshot assigner still have remaining splits, assign split from it
return snapshotSplitAssigner.getNext();
}
}
3.MySqlSnapshotSplitAssigner 處理全量切片邏輯,通過 ChunkSplitter 生成切片,並存儲到Iterator中。
@Override
public Optional<MySqlSplit> getNext() {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
MySqlSnapshotSplit split = iterator.next();
iterator.remove();
//note: 已分配的切片存儲到 assignedSplits 集合
assignedSplits.put(split.splitId(), split);
return Optional.of(split);
} else {
// note: 初始化階段 remainingTables 存儲了要讀取的表名
TableId nextTable = remainingTables.pollFirst();
if (nextTable != null) {
// split the given table into chunks (snapshot splits)
// note: 初始化階段創建了 ChunkSplitter,調用generateSplits 進行切片划分
Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
// note: 保留所有切片信息
remainingSplits.addAll(splits);
// note: 已經完成分片的 Table
alreadyProcessedTables.add(nextTable);
// note: 遞歸調用該該方法
return getNext();
} else {
return Optional.empty();
}
}
}
4.ChunkSplitter 將表划分為均勻分布 or 不均勻分布切片的邏輯。讀取的表必須包含物理主鍵。
public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
Table schema = mySqlSchema.getTableSchema(tableId).getTable();
List<Column> primaryKeys = schema.primaryKeyColumns();
// note: 必須有主鍵
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
tableId));
}
// use first field in primary key as the split key
Column splitColumn = primaryKeys.get(0);
final List<ChunkRange> chunks;
try {
// note: 按主鍵列將數據划分成多個切片
chunks = splitTableIntoChunks(tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
//note: 主鍵數據類型轉換、ChunkRange 包裝成MySqlSnapshotSplit。
// convert chunks into splits
List<MySqlSnapshotSplit> splits = new ArrayList<>();
RowType splitType = splitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
MySqlSnapshotSplit split =
createSnapshotSplit(
tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
splits.add(split);
}
return splits;
}
5.splitTableIntoChunks 根據物理主鍵划分切片。
private List<ChunkRange> splitTableIntoChunks(TableId tableId, Column splitColumn)
throws SQLException {
final String splitColumnName = splitColumn.name();
// select min, max
final Object[] minMaxOfSplitColumn = queryMinMax(jdbc, tableId, splitColumnName);
final Object min = minMaxOfSplitColumn[0];
final Object max = minMaxOfSplitColumn[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> chunks;
if (splitColumnEvenlyDistributed(splitColumn)) {
// use evenly-sized chunks which is much efficient
// note: 按主鍵均勻划分
chunks = splitEvenlySizedChunks(min, max);
} else {
// note: 按主鍵非均勻划分
// use unevenly-sized chunks which will request many queries and is not efficient.
chunks = splitUnevenlySizedChunks(tableId, splitColumnName, min, max);
}
return chunks;
}
/** Checks whether split column is evenly distributed across its range. */
private static boolean splitColumnEvenlyDistributed(Column splitColumn) {
// only column is auto-incremental are recognized as evenly distributed.
// TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future.
if (splitColumn.isAutoIncremented()) {
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
// currently, we only support split column with type BIGINT, INT, DECIMAL
return typeRoot == LogicalTypeRoot.BIGINT
|| typeRoot == LogicalTypeRoot.INTEGER
|| typeRoot == LogicalTypeRoot.DECIMAL;
} else {
return false;
}
}
/**
* 根據拆分列的最小值和最大值將表拆分為大小均勻的塊,並以 {@link #chunkSize} 步長滾動塊。
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in {@link #chunkSize} step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max) {
if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, chunkSize);
// chunkEnd <= max
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
/** 通過連續計算下一個塊最大值,將表拆分為大小不均勻的塊。
* Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
TableId tableId, String splitColumnName, Object min, Object max) throws SQLException {
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(min, tableId, splitColumnName, max);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(chunkEnd, tableId, splitColumnName, max);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
Object previousChunkEnd, TableId tableId, String splitColumnName, Object max)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
} else {
return chunkEnd;
}
}
MySqlSourceReader 處理切片分配請求
MySqlSourceReader接收到切片分配請求后,會為先創建一個 SplitFetcher線程,向 taskQueue 添加、執行AddSplitsTask 任務用來處理添加分片任務,接着執行 FetchTask 使用Debezium API進行讀取數據,讀取的數據存儲到elementsQueue中,SourceReaderBase 會從該隊列中獲取數據,並下發給 MySqlRecordEmitter。
1.處理切片分配事件時,創建SplitFetcher向taskQueue添加AddSplitsTask。
SingleThreadFetcherManager#addSplits
public void addSplits(List<SplitT> splitsToAdd) {
SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
if (fetcher == null) {
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
fetcher.addSplits(splitsToAdd);
startFetcher(fetcher);
} else {
fetcher.addSplits(splitsToAdd);
}
}
// 創建 SplitFetcher
protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
if (closed) {
throw new IllegalStateException("The split fetcher manager has closed.");
}
// Create SplitReader.
SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
int fetcherId = fetcherIdGenerator.getAndIncrement();
SplitFetcher<E, SplitT> splitFetcher =
new SplitFetcher<>(
fetcherId,
elementsQueue,
splitReader,
errorHandler,
() -> {
fetchers.remove(fetcherId);
elementsQueue.notifyAvailable();
});
fetchers.put(fetcherId, splitFetcher);
return splitFetcher;
}
public void addSplits(List<SplitT> splitsToAdd) {
enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
wakeUp(true);
}
2.執行 SplitFetcher線程,首次執行 AddSplitsTask 線程添加分片,以后執行 FetchTask 線程拉取數據。
SplitFetcher#runOnce
void runOnce() {
try {
if (shouldRunFetchTask()) {
runningTask = fetchTask;
} else {
runningTask = taskQueue.take();
}
if (!wakeUp.get() && runningTask.run()) {
LOG.debug("Finished running task {}", runningTask);
runningTask = null;
checkAndSetIdle();
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
maybeEnqueueTask(runningTask);
synchronized (wakeUp) {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
}
}
3.AddSplitsTask 調用 MySqlSplitReader 的 handleSplitsChanges方法,向切片隊列中添加已分配的切片信息。在下一次fetch()調用時,從隊列中獲取切片並讀取切片數據。
AddSplitsTask#run
public boolean run() {
for (SplitT s : splitsToAdd) {
assignedSplits.put(s.splitId(), s);
}
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
return true;
}
MySqlSplitReader#handleSplitsChanges
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
//note: 添加切片 到隊列。
splits.addAll(splitsChanges.splits());
}
4.MySqlSplitReader 執行fetch(),由DebeziumReader讀取數據到事件隊列,在對數據修正后以MySqlRecords格式返回。
MySqlSplitReader#fetch
@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
// note: 創建Reader 並讀取數據
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
try {
// note: 對讀取的數據進行修正
dataIt = currentReader.pollSplitRecords();
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
}
// note: 返回的數據被封裝為 MySqlRecords 進行傳輸
return dataIt == null
? finishedSnapshotSplit()
: MySqlRecords.forRecords(currentSplitId, dataIt);
}
private void checkSplitOrStartNext() throws IOException {
// the binlog reader should keep alive
if (currentReader instanceof BinlogSplitReader) {
return;
}
if (canAssignNextSplit()) {
// note: 從切片隊列讀取MySqlSplit
final MySqlSplit nextSplit = splits.poll();
if (nextSplit == null) {
throw new IOException("Cannot fetch from another split - no split remaining");
}
currentSplitId = nextSplit.splitId();
// note: 區分全量切片讀取還是增量切片讀取
if (nextSplit.isSnapshotSplit()) {
if (currentReader == null) {
final MySqlConnection jdbcConnection = getConnection(config);
final BinaryLogClient binaryLogClient = getBinaryClient(config);
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
// note: 創建SnapshotSplitReader,使用Debezium Api讀取分配數據及區間Binlog值
currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
}
} else {
// point from snapshot split to binlog split
if (currentReader != null) {
LOG.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close();
}
final MySqlConnection jdbcConnection = getConnection(config);
final BinaryLogClient binaryLogClient = getBinaryClient(config);
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
LOG.info("Create binlog reader");
// note: 創建BinlogSplitReader,使用Debezium API進行增量讀取
currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
}
// note: 執行Reader進行數據讀取
currentReader.submitSplit(nextSplit);
}
}
DebeziumReader 數據處理
DebeziumReader 包含全量切片讀取、增量切片讀取兩個階段,數據讀取后存儲到 ChangeEventQueue,執行pollSplitRecords 時對數據進行修正。
1.SnapshotSplitReader 全量切片讀取。全量階段的數據讀取通過執行Select語句查詢出切片范圍內的表數據,在寫入隊列前后執行 SHOW MASTER STATUS 時,寫入當前偏移量。
public void submitSplit(MySqlSplit mySqlSplit) {
......
executor.submit(
() -> {
try {
currentTaskRunning = true;
// note: 數據讀取,在數據前后插入Binlog當前偏移量
// 1. execute snapshot read task。
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();
SnapshotResult snapshotResult =
splitSnapshotReadTask.execute(sourceContext);
// note: 為增量讀取做准備,包含了起始偏移量
final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext);
final MySqlOffsetContext mySqlOffsetContext =
statefulTaskContext.getOffsetContext();
mySqlOffsetContext.setBinlogStartPoint(
appendBinlogSplit.getStartingOffset().getFilename(),
appendBinlogSplit.getStartingOffset().getPosition());
// note: 從起始偏移量開始讀取
// 2. execute binlog read task
if (snapshotResult.isCompletedOrSkipped()) {
// we should only capture events for the current table,
Configuration dezConf =
statefulTaskContext
.getDezConf()
.edit()
.with(
"table.whitelist",
currentSnapshotSplit.getTableId())
.build();
// task to read binlog for current split
MySqlBinlogSplitReadTask splitBinlogReadTask =
new MySqlBinlogSplitReadTask(
new MySqlConnectorConfig(dezConf),
mySqlOffsetContext,
statefulTaskContext.getConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getErrorHandler(),
StatefulTaskContext.getClock(),
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext
.getStreamingChangeEventSourceMetrics(),
statefulTaskContext
.getTopicSelector()
.getPrimaryTopic(),
appendBinlogSplit);
splitBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
}
});
}
2.SnapshotSplitReader 增量切片讀取。增量階段切片讀取重點是判斷BinlogSplitReadTask什么時候停止,在讀取到分片階段的結束時的偏移量即終止。
MySqlBinlogSplitReadTask#handleEvent
protected void handleEvent(Event event) {
// note: 事件下發 隊列
super.handleEvent(event);
// note: 全量讀取階段需要終止Binlog讀取
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
new BinlogOffset(
offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
Long.parseLong(
offsetContext
.getOffset()
.get(BINLOG_POSITION_OFFSET_KEY)
.toString()));
// note: currentBinlogOffset > HW 停止讀取
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
logger.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// 終止binlog讀取
// tell reader the binlog task finished
((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
}
}
}
3.SnapshotSplitReader 執行pollSplitRecords 時對隊列中的原始數據進行修正。具體處理邏輯查看 RecordUtils#normalizedSplitRecords。
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
if (hasNextElement.get()) {
// data input: [low watermark event][snapshot events][high watermark event][binlogevents][binlog-end event]
// data output: [low watermark event][normalized events][high watermark event]
boolean reachBinlogEnd = false;
final List<SourceRecord> sourceRecords = new ArrayList<>();
while (!reachBinlogEnd) {
// note: 處理隊列中寫入的 DataChangeEvent 事件
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());
if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
reachBinlogEnd = true;
break;
}
}
}
// snapshot split return its data once
hasNextElement.set(false);
// ************ 修正數據 ***********
return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
return null;
}
4.BinlogSplitReader 數據讀取。讀取邏輯比較簡單,重點是起始偏移量的設置,起始偏移量為所有切片的HW。
5.BinlogSplitReader 執行pollSplitRecords 時對隊列中的原始數據進行修正,保障數據一致性。增量階段的Binlog讀取是無界的,數據會全部下發到事件隊列,BinlogSplitReader 通過shouldEmit()判斷數據是否下發。
BinlogSplitReader#pollSplitRecords
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
}
}
}
return sourceRecords.iterator();
}
事件下發條件:
1.新收到的event post 大於 maxwm
2.當前 data值所屬某個snapshot spilt & 偏移量大於 HWM,下發數據。
/**
*
* Returns the record should emit or not.
*
* <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
*
* <pre> E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark
// note: 新收到的event post 大於 maxwm ,直接下發
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
Object[] key =
getSplitKey(
currentBinlogSplit.getSplitKeyType(),
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster());
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
/**
* note: 當前 data值所屬某個snapshot spilt & 偏移量大於 HWM,下發數據
*/
if (RecordUtils.splitKeyRangeContains(
key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
&& position.isAtOrBefore(splitInfo.getHighWatermark())) {
return true;
}
}
// not in the monitored splits scope, do not emit
return false;
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}
MySqlRecordEmitter 數據下發
SourceReaderBase 從隊列中獲取切片讀取的DataChangeEvent數據集合,將數據類型由Debezium的DataChangeEvent 轉換為Flink 的RowData類型。
1.SourceReaderBase 處理切片數據流程
org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
if (recordsWithSplitId == null) {
return trace(finishedOrAvailableLater());
}
}
// we need to loop here, because we may have to go across splits
while (true) {
// Process one record.
// note: 通過MySqlRecords從迭代器中讀取單條數據
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
// else fall through the loop
}
}
private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
splitFetcherManager.checkErrors();
LOG.trace("Getting next source data batch from queue");
// note: 從elementsQueue 獲取數據
final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
return null;
}
currentFetch = recordsWithSplitId;
return recordsWithSplitId;
}
2.MySqlRecords 返回單條數據集合。
com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit
public SourceRecord nextRecordFromSplit() {
final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
if (recordsForSplit != null) {
if (recordsForSplit.hasNext()) {
return recordsForSplit.next();
} else {
return null;
}
} else {
throw new IllegalStateException();
}
}
3.MySqlRecordEmitter 通過 RowDataDebeziumDeserializeSchema 將數據轉換為Rowdata。
com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {
BinlogOffset watermark = getWatermark(element);
if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
HistoryRecord historyRecord = getHistoryRecord(element);
Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
for (TableChanges.TableChange tableChange : changes) {
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
}
} else if (isDataChangeRecord(element)) {
// note: 數據的處理
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
debeziumDeserializationSchema.deserialize(
element,
new Collector<T>() {
@Override
public void collect(final T t) {
output.collect(t);
}
@Override
public void close() {
// do nothing
}
});
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
}
}
RowDataDebeziumDeserializeSchema 序列化過程。
com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(before);
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
}
MySqlSourceReader 匯報切片讀取完成事件
MySqlSourceReader處理完一個全量切片后,會向MySqlSourceEnumerator發送已完成的切片信息,包含切片ID、HighWatermar ,然后繼續發送切片請求。
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
/**
* note: 發送切片完成事件
*/
reportFinishedSnapshotSplitsIfNeed();
// 上一個spilt處理完成后繼續發送切片請求
context.sendSplitRequest();
}
private void reportFinishedSnapshotSplitsIfNeed() {
if (!finishedUnackedSplits.isEmpty()) {
final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
// note: 發送切片ID,及最大偏移量
finishedOffsets.put(split.splitId(), split.getHighWatermark());
}
FinishedSnapshotSplitsReportEvent reportEvent =
new FinishedSnapshotSplitsReportEvent(finishedOffsets);
context.sendSourceEventToCoordinator(reportEvent);
LOG.debug(
"The subtask {} reports offsets of finished snapshot splits {}.",
subtaskId,
finishedOffsets);
}
}
MySqlSourceEnumerator 分配增量切片
全量階段所有分片讀取完畢后,MySqlHybridSplitAssigner 會創建BinlogSplit 進行后續增量讀取,在創建BinlogSplit 會從全部已完成的全量切片中篩選最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量總是從0開始,最新master分支已經修復這個BUG.
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
BinlogOffset minBinlogOffset = null;
// note: 從所有assignedSnapshotSplit中篩選最小偏移量
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) < 0) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
tableSchemas.putAll(split.getTableSchemas());
}
final MySqlSnapshotSplit lastSnapshotSplit =
assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
lastSnapshotSplit.getSplitKeyType(),
minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
finishedSnapshotSplitInfos,
tableSchemas);
}