本篇文章首發於頭條號Flink程序是如何執行的?通過源碼來剖析一個簡單的Flink程序,歡迎關注頭條號和微信公眾號“大數據技術和人工智能”(微信搜索bigdata_ai_tech)獲取更多干貨,也歡迎關注我的CSDN博客。
在這之前已經介紹了如何在本地搭建Flink環境和如何創建Flink應用和如何構建Flink源碼,這篇文章用官方提供的SocketWindowWordCount例子來解析一下一個常規Flink程序的每一個基本步驟。
示例程序
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
上面這個是官網的SocketWindowWordCount
程序示例,它首先從命令行中獲取socket連接的host和port,然后獲取執行環境、從socket連接中讀取數據、解析和轉換數據,最后輸出結果數據。
每個Flink程序都包含以下幾個相同的基本部分:
- 獲得一個execution environment,
- 加載/創建初始數據,
- 指定此數據的轉換,
- 指定放置計算結果的位置,
- 觸發程序執行
Flink執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Flink程序都是從這句代碼開始,這行代碼會返回一個執行環境,表示當前執行程序的上下文。如果程序是獨立調用的,則此方法返回一個由createLocalEnvironment()
創建的本地執行環境LocalStreamEnvironment
。從其源碼里可以看出來:
//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
}
獲取輸入數據
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
這個例子里的源數據來自於socket,這里會根據指定的socket配置創建socket連接,然后創建一個新數據流,包含從套接字無限接收的字符串,接收的字符串由系統的默認字符集解碼。當socket連接關閉時,數據讀取會立即終止。通過查看源碼可以發現,這里實際上是通過指定的socket配置來構造一個SocketTextStreamFunction
實例,然后源源不斷的從socket連接里讀取輸入的數據創建數據流。
//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
SocketTextStreamFunction
的類繼承關系如下:
可以看出SocketTextStreamFunction
是SourceFunction
的子類,SourceFunction
是Flink中所有流數據源的基本接口。SourceFunction
的定義如下:
//代碼目錄:org/apache/flink/streaming/api/functions/source/SourceFunction.java
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
@Public
interface SourceContext<T> {
void collect(T element);
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
@PublicEvolving
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
SourceFunction
定義了run
和cancel
兩個方法和SourceContext
內部接口。
- run(SourceContex):實現數據獲取邏輯,並可以通過傳入的參數ctx進行向下游節點的數據轉發。
- cancel():用來取消數據源,一般在run方法中,會存在一個循環來持續產生數據,cancel方法則可以使該循環終止。
- SourceContext:source函數用於發出元素和可能的watermark的接口,返回source生成的元素的類型。
了解了SourceFunction
這個接口,再來看下SocketTextStreamFunction
的具體實現(主要是run
方法),邏輯就已經很清晰了,就是從指定的hostname和port持續不斷的讀取數據,按回車換行分隔符划分成一個個字符串,然后再將數據轉發到下游。現在回到StreamExecutionEnvironment
的socketTextStream
方法,它通過調用addSource
返回一個DataStreamSource
實例。思考一下,例子里的text
變量是DataStream
類型,為什么源碼里的返回類型卻是DataStreamSource
呢?這是因為DataStream
是DataStreamSource
的父類,下面的類關系圖可以看出來,這也體現出了Java的多態的特性。
數據流操作
對上面取到的DataStreamSource,進行flatMap
、keyBy
、timeWindow
、reduce
轉換操作。
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
這段邏輯中,對上面取到的DataStreamSource數據流分別做了flatMap
、keyBy
、timeWindow
、reduce
四個轉換操作,下面說一下flatMap
轉換,其他三個轉換操作讀者可以試着自己查看源碼理解一下。
先看一下flatMap
方法的源碼吧,如下。
//代碼目錄:org/apache/flink/streaming/api/datastream/DataStream.java
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
這里面做了兩件事,一是用反射拿到了flatMap
算子的輸出類型,二是生成了一個operator。flink流式計算的核心概念就是將數據從輸入流一個個傳遞給operator進行鏈式處理,最后交給輸出流的過程。對數據的每一次處理在邏輯上成為一個operator。上面代碼中的最后一行transform
方法的作用是返回一個SingleOutputStreamOperator
,它繼承了Datastream
類並且定義了一些輔助方法,方便對流的操作。在返回之前,transform
方法還把它注冊到了執行環境中。下面這張圖是一個由Flink程序映射為Streaming Dataflow的示意圖:
結果輸出
windowCounts.print().setParallelism(1);
每個Flink程序都是以source開始以sink結尾,這里的print
方法就是把計算出來的結果sink標准輸出流。在實際開發中,一般會通過官網提供的各種Connectors或者自定義的Connectors把計算好的結果數據sink到指定的地方,比如Kafka、HBase、FileSystem、Elasticsearch等等。這里的setParallelism
是設置此接收器的並行度的,值必須大於零。
執行程序
env.execute("Socket Window WordCount");
Flink有遠程模式和本地模式兩種執行模式,這兩種模式有一點不同,這里按本地模式來解析。先看下execute
方法的源碼,如下:
//代碼目錄:org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
這個方法包含三部分:將流程序轉換為JobGraph、使用用戶定義的內容添加(或覆蓋)設置、啟動一個miniCluster並執行任務。關於JobGraph暫先不講,這里就只說一下執行任務,跟進下return miniCluster.executeJobBlocking(jobGraph);
這行的源碼,如下:
//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (ExecutionException e) {
throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e);
}
try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
}
這段代碼的核心邏輯就是final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
,調用了MiniCluster
類的submitJob
方法,接着看這個方法:
//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
這里的Dispatcher
組件負責接收作業提交,持久化它們,生成JobManagers來執行作業並在主機故障時恢復它們。Dispatcher
有兩個實現,在本地環境下啟動的是MiniDispatcher
,在集群環境上啟動的是StandaloneDispatcher
。下面是類結構圖:
這里的Dispatcher
啟動了一個JobManagerRunner
,委托JobManagerRunner
去啟動該Job的JobMaster
。對應的代碼如下:
//代碼目錄:org/apache/flink/runtime/jobmaster/JobManagerRunner.java
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
JobMaster
經過一系列方法嵌套調用之后,最終執行到下面這段邏輯:
//代碼目錄:org/apache/flink/runtime/jobmaster/JobMaster.java
private void scheduleExecutionGraph() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);
try {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
這里executionGraph.scheduleForExecution();
調用了ExecutionGraph
的啟動方法。在Flink的圖結構中,ExecutionGraph
是真正被執行的地方,所以到這里為止,一個任務從提交到真正執行的流程就結束了,下面再回顧一下本地環境下的執行流程:
- 客戶端執行
execute
方法; MiniCluster
完成了大部分任務后把任務直接委派給MiniDispatcher
;Dispatcher
接收job之后,會實例化一個JobManagerRunner
,然后用這個實例啟動job;JobManagerRunner
接下來把job交給JobMaster
去處理;JobMaster
使用ExecutionGraph
的方法啟動整個執行圖,整個任務就啟動起來了。