Flink-1.10.0中的readTextFile解讀
最近在學習Flink,研究了一些東西,在准備自定義一個簡單的監聽文件的source作為練手的時候,遇到了一個問題。就是應該如何在自己的source中決定哪個分區讀取哪個文件?學習過spark的我們知道,source會被切分,然后每個分區讀取自己分區的輸入切片數據即可。那么Flink如何進行輸入分片的切分的呢?我們如果自定義的source需要是一個並行的source時,又該如何實現呢?
帶着這個疑問,查看了Flink-1.10.0的源代碼,查看Flink的readTextFile算子是如何實現的。
首先,使用以下代碼演示一個問題
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> test = env.socketTextStream("localhost", 8888);
System.out.println("test source parallel:\t" + test.getParallelism()); //test的分區數量為1
test.print(); // socket的每一行元素會在不同的分區進行輸出
通過上面的簡單的代碼展示我們可以知道:
- print是一個並行的sink,即使和單並行的source一起使用也會並行的輸出。
- getParallelism方法可以查看DateStream的分區數量。
那么我們來查看分析一下Flink中的readTextFile的源碼吧。
首先,在IDEA中一步步查看readTextFile的實現,前面的方法基本都是檢查參數和補全一些默認參數,最后調用的方法為createFileInput。代碼如下
private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
long interval) {
// 檢查參數
Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
"The path monitoring interval cannot be less than " +
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
// 輸入分片構建的函數類
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// 讀取輸入分片的具體實現類
ContinuousFileReaderOperator<OUT> reader =
new ContinuousFileReaderOperator<>(inputFormat);
/*
* 和我們使用env.addSource一樣,但是后面進跟着調用了一個transform。
* 這里就是整個解析中要重點說明的一點,monitoringFunction中只是負責構建數據切片的
* 到這一步,其實這個source的並行度還是1
*
* 調用transform方法之后,將數據切片中的內容讀取出來,這里的並行度才是配置文件中的並行度
*/
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
.transform("Split Reader: " + sourceName, typeInfo, reader);
return new DataStreamSource<>(source);
}
為了驗證沒有調用transform之前的並行度,我們可以使用一下代碼進行測試
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String inputPath = "/Users/xxx/test/flink_test";
TextInputFormat format = new TextInputFormat(new Path(inputPath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
format.setFilePath(inputPath);
FileProcessingMode monitoringMode = FileProcessingMode.PROCESS_ONCE;
ContinuousFileMonitoringFunction<String> function = new ContinuousFileMonitoringFunction<>(format, monitoringMode, 12, -1);
// 只構建到addSource這一步,不再進行transform的調用
DataStreamSource<TimestampedFileInputSplit> test = env.addSource(function, "test");
System.out.println("test source parallel:\t" + test.getParallelism());
test.print();
env.execute("user_defind_source");
執行結果如下
test source parallel: 1
10> [8] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 14 + 7
1> [0] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 0 + 7
8> [6] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 0 + 7
6> [5] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 35 + 4
5> [4] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 28 + 7
12> [10] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 28 + 7
11> [9] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 21 + 7
9> [7] file:/Users/xxx/test/flink_test/word.txt mod@ 1582184141000 : 7 + 7
4> [3] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 21 + 7
3> [2] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 14 + 7
2> [1] file:/Users/xxx/test/flink_test/out mod@ 1582184340000 : 7 + 7
可以看出,不調用transform方法的話,其實只是構建出了數據切片而已。數據切片的構建規則仔細讀讀源碼還是可以看懂的,就是根據分區數和文件長度計算的。
讓我們再來看一下ContinuousFileReaderOperator這個類。
/**
* 該類的open方法中,獲取了Flink的getRuntimeContext相關信息
* getRuntimeContext中包含了subtask得索引信息
* 該類中還包含了一個SplitReader內部類,該類繼承了Thread方法
* 其run方法完成了具體的輸入分片的讀取任務
*/
@Override
public void open() throws Exception {
super.open();
checkState(this.reader == null, "The reader is already initialized.");
checkState(this.serializer != null, "The serializer has not been set. " +
"Probably the setOutputType() was not called. Please report it.");
// 將Flink的RuntimeContext取出
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
this.checkpointLock = getContainingTask().getCheckpointLock();
// 根據時間特征設置讀者上下文
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.readerContext = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
checkpointLock,
getContainingTask().getStreamStatusMaintainer(),
output,
watermarkInterval,
-1);
// 並初始化拆分讀取線程
this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
this.restoredReaderState = null;
this.reader.start();
}
/**
* 該方法是實現了OneInputStreamOperator中的接口
* 可以看出該方法就是向SplitReader中的隊列添加新的元素的
*/
@Override
public void processElement(StreamRecord<TimestampedFileInputSplit> element) throws Exception {
reader.addSplit(element.getValue());
}
/**
* SplitReader類的run方法
* readTextFile方法debug來看,執行順序是先執行上面的open()方法,open方法中啟動了下面的run方法
* run方法首先會進入一個循環中進行等待,等待第一個輸入切片的完成,然后就可以開始讀數據了
* 讀取第一個輸入切片的過程中,外部還可以繼續向切片隊列中添加切片。
* this.pendingSplits是一個輸入切片的保存隊列,提供了外部向隊列添加輸入切片的方法
*/
@Override
public void run() {
try {
Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
this.format.openInputFormat();
while (this.isRunning) {
synchronized (checkpointLock) {
if (currentSplit == null) {
// 從隊列中取數據
currentSplit = this.pendingSplits.poll();
// if the list of pending splits is empty (currentSplit == null) then:
// 1) if close() was called on the operator then exit the while loop
// 2) if not wait 50 ms and try again to fetch a new split to read
// 如果輸入切片為空,則等待50ms之后重復while到這段的內容
if (currentSplit == null) {
if (this.shouldClose) {
isRunning = false;
} else {
checkpointLock.wait(50);
}
continue;
}
}
if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
// recovering after a node failure with an input
// format that supports resetting the offset
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).
reopen(currentSplit, currentSplit.getSplitState());
} else {
// we either have a new split, or we recovered from a node
// failure but the input format does not support resetting the offset.
this.format.open(currentSplit);
}
// reset the restored state to null for the next iteration
this.currentSplit.resetSplitState();
this.isSplitOpen = true;
}
LOG.debug("Reading split: " + currentSplit);
try {
// 讀取數據,並且將數據放入context中
OT nextElement = serializer.createInstance();
while (!format.reachedEnd()) {
synchronized (checkpointLock) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
readerContext.collect(nextElement);
} else {
break;
}
}
}
completedSplitsCounter.inc();
} finally {
// close and prepare for the next iteration
synchronized (checkpointLock) {
this.format.close();
this.isSplitOpen = false;
this.currentSplit = null;
}
}
}
} catch (Throwable e) {
getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e);
} finally {
synchronized (checkpointLock) {
LOG.debug("Reader terminated, and exiting...");
try {
this.format.closeInputFormat();
} catch (IOException e) {
getContainingTask().handleAsyncException(
"Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e);
}
this.isSplitOpen = false;
this.currentSplit = null;
this.isRunning = false;
checkpointLock.notifyAll();
}
}
}
上面的過程我們基本理清了整體的框架,但是還沒有解決自己的疑問,如何確定哪個分區讀哪些輸入切片呢?
其實這個過程並不能在上面的輸入切片構建和真實讀取文件的過程中看出來。那么對於這么一個陌生的系統,我們應該這么梳理呢?其實這個過程的切入點應該在Source構建輸入切片的代碼中查看,一個context.collect(split)方法中。
collect方法不是收集的意思,這個collect是Flink中的方法,該方法主要的作用是將數據發送出去,發送到下游。那么問題來了,這條數據應該發送到哪個分區呢?在readTextFile方法中,是按照輪循的方法,挨個分區進行循環。當然如果我們手動設置該算子的並行度為1時,就會發送到0號分區中。
source發送出去數據切片,split reader接收到數據然后反序列化進行讀取。完整的過程如下:
關於Flink中readTextFile執行流程梳理
首先,readTextFile分成兩個階段,一個Source,一個Split Reader。這兩個階段可以分為多個線程,不一定是2個線程。因為Split Reader的並行度時根據配置文件或者啟動參數來決定的。
Source的執行流程如下,Source的是用來構建輸入切片的,不做數據的讀取操作。這里是按照本地運行模式整理的。
Task.run()
|-- invokable.invoke()
| |-- StreamTask.invoke()
| | |-- beforeInvoke()
| | | |-- init()
| | | | |-- SourceStreamTask.init()
| | | |-- initializeStateAndOpen()
| | | | |-- operator.initializeState()
| | | | |-- operator.open()
| | | | | |-- SourceStreamTask.LegacySourceFunctionThread.run()
| | | | | | |-- StreamSource.run()
| | | | | | | |-- userFunction.run(ctx)
| | | | | | | | |-- ContinuousFileMonitoringFunction.run()
| | | | | | | | | |-- RebalancePartitioner.selectChannel()
| | | | | | | | | |-- RecordWriter.emit()
Split Reader的代碼執行流程如下:
Task.run()
|-- invokable.invoke()
| |-- StreamTask.invoke()
| | |-- beforeInvoke()
| | | |-- init()
| | | | |--OneInputStreamTask.init()
| | | |-- initializeStateAndOpen()
| | | | |-- operator.initializeState()
| | | | | |-- ContinuousFileReaderOperator.initializeState()
| | | | |-- operator.open()
| | | | | |-- ContinuousFileReaderOperator.open()
| | | | | | |-- ContinuousFileReaderOperator.SplitReader.run()
| | |-- runMailboxLoop()
| | | |-- StreamTask.processInput()
| | | | |-- StreamOneInputProcessor.processInput()
| | | | | |-- StreamTaskNetworkInput.emitNext() while循環不停的處理輸入數據
| | | | | | |-- ContinuousFileReaderOperator.processElement()
| | |-- afterInvoke()
這個過程中並沒有去分析state的狀態等其他操作,比較基礎淺顯。主要作為自己掌握多線程調試和Flink源碼閱讀的切入點吧。