0. 說明
- 本文基於Flink 1.12;
- 本文是在閱讀源碼過程結合自己理解所寫,不一定正確,歡迎大伙留言指出;
1. 集群部署
1.1. 部署方式
Flink集群部署可以簡要的分為以下兩種方式:
- 直接部署在服務器上(物理機、Kubernetes、docker等);
- 結合其他資源調度框架,如on Yarn、Mesos;
1.2. 部署在服務器上
常見於standalone模式。standalone模式是一種獨立的集群模式,可以不依賴外部資源調度系統直接運行,所需資源是直接基於服務器的,由管理員手工啟動。基於Kubernetes、docker的部署方式,其本質是將JobManager和TaskManager docker化,集群本身還是standalone的。
standalone模式可細分為:HA模式和非HA模式。HA模式需要注意:HA必須依賴於共享存儲文件系統,要保證JobManager的元數據信息對所有節點共享。
1.3. 結合其他資源調度框架
Flink集群所需資源是向調度框架申請,調度框架的資源則是基於物理資源。和直接基於物理機得standalone模式相比,基於資源調度框架更具有靈活性,資源也可以復用。此時,任務的可靠性也是通過資源調度框架來實現的。
2.任務提交模式
2.1 session模式
在該模式,多個任務共享一個提前創建好的Flink集群,整個集群的資源在session集群啟動時已固定了。常見的yarn-session模式,就是在先基於yarn啟動一個Flink集群,然后向該集群提交任務。在集群的standalone模式下,通常提交任務也是session模式,只不過整個集群是由管理員手工啟動。
2.2 Per-job模式
在該模式,每個任務在提交時都會創建各自集群,TM、JM的內存分配在啟動任務的時候可以根據需求制定。任務單獨啟動啟動、恢復都是單獨執行的,任務恢復速度快。
2.2 Application模式
在該模式,每個任務在提交時都會創建一個集群。該模式和其他模式的區別在於:其他模式下,任務的main()方法都是在client側運行,會在client側生成JobGraphs,並將JobGraph提交到集群中,可能對client機器的資源消耗過多;Application模式,則在JobManager上運行application的main()函數,可節省資源。
3. Flink任務涉及核心組件
簡單來說,集群部署模式的不同,在於Flink的啟動是由管理員手動啟動還是有資源調度框架啟動;任務提交模式的不同,在於任務與集群的關系或是任務main函數執行的位置不一樣。這些變化中,任務涉及到的核心組件時相同:JobMaster、ResourceManager、TaskExecutor,所以弄清這些核心組件有利於我們在變化中找到不變的。
3.1. JobMaster
JobMaster是負責單個任務的執行。與之容易混淆的是JobManager,JobManager是一個抽象的概念,其作為Flink集群的manager是一個單獨的進程(JVM),由一些service組成的(主要是Dispatcher、JobMaster、ResourceManager)。JobMaster僅是JobManager中的一個組件,JobManager為JobMaster提供任務資源、jar存儲的等服務。值得注意的是,JobMaster在是由JobManagerRunnerImpl啟動的。
- JobGraph
JobGraph的生成是和任務提交的模式相關的,如session模式JobGraph是在client側生成后提交到JobManager。JobMaster啟動之后,JobGraph會被轉換成ExecutionGraph,ExecutionGraph是JobGraph的並行版,兩者的區別在此不詳細展開。 - SlotPool
slotPool是JobMaster中管理資源的組件,在分配資源時會先在slotPool中分配,分配slot的策略有兩種:按照位置優先分配;按照之前已分配的slot分配。若從slotPool申請不到slot,則將請求緩存起來,等連接上ResourceManager獲取slot之后再分配slot。 - SchedulerNG
SchedulerNG會在JobMaster啟動時啟動,主要負責ExecutionGraph的執行。涉及SchedulerNG主要流程如下:
JobMaster#start()->startScheduling()
|
SchedulerBase#startScheduling()
|
| //啟動所有的coordinator
| --startAllOperatorCoordinators()
| //根據部署策略(lazy、Eager)分配資源啟動ExecutionGraph
| --startSchedulingInternal()
其中,OperatorCoordinator代表的是runtime operators,其運行在JobMaster中,一個OperatorCoordinator對應的是一個operator的Job vertex,其和operators的交互是通過operator event。主要負責subTask的重啟、失敗等,以及operator的checkpoint行為。
- HeartbeatServices
主要是為JobMaster和TaskManager、ResourceManager之前的心跳提供服務。 - HighAvailabilityServices
HighAvailabilityServices在JobMaster中主要是獲取ResourceManager信息、checkpoint信息。 - BackPressureStatsTracker
backPressure是指當一個operator的處理速度小於上游下發的速度,數據就會在input buffer里出現積壓,當buffer滿了,數據就會無處可放,Flink將這種情況稱為backPressure。Dispatcher通過JobMaster的BackPressureStatsTracker對每個TM的subTask做跟蹤。涉及的流程如下:
Dispatcher#構造函數 //在集群啟動時已生成Dispatcher實例
|
| //從配置文件獲取tracker的參數配置
JobManagerSharedServices#fromConfiguration()
判斷一個operator是否處於backpressure狀態可以看Task#isBackPressured()
方法。
- LeaderRetrievalService
LeaderRetrievalService獲取當前服務的leader,在JobMaster中,LeaderRetrievalService是負責與ResourceManager鏈接,然后JobMaster會向ResourceManager注冊。
3.2 ResourceManager
ResourceManager是Flink集群內部管理資源(slot)的組件。TaskManager向其提供slot,JobMaster向其請求slot執行任務。與此同時,RM會JobMaster、TaskManager保持心跳,其是心跳請求的發起方,當JobMaster、TaskManager失敗的時候會采取相應的對策。
- ResourceManager的啟動
RM的的啟動時在JobManager的過程啟動的,JobManager的啟動入口是ClusuterEntryPoint,過程如下:
ClusuterEntryPoint#startCluster
|
| //在該過程中會啟動Dispatcher、ResourceManager、 WebMonitorEndpoint
DefaultDispatcherResourceManagerComponentFactory#create
|
RpcEndpoint#start
整個過程如下:
- 在啟動JobManager時,會啟動haServices相關的服務;
- 在DefaultDispatcherResourceManagerComponentFactory#create()方法中,會先通過HA服務獲取leader節點信息;
- 通過RpcEndpoint#start啟動RM,以便和其他組件交互;
- RM中核心組件
RM中如LeaderElectionService 和JobMaster中的作用是相近的,這里不詳細展開。-
slotManager
slotManager維護Flink 集群中一張slot視圖,包括:所有注冊、分配的slot以及待滿足的slot請求(JobMaster中slotPool不能滿足任務需求時發起的slot請求)。為了釋放資源、防止內存泄漏,空閑的TaskManager將會被釋放、超時的slot請求會請求失敗。
slotManager在slot請求過程的角色如下,整個過程大致分為以下幾個步驟:
- 過程1:TM向RM注冊后,向RM上報slot信息,slot信息被保存在RM的slotManager中;
- 過程2:JM會首先向slotPool請求slot,若能,則直接提交任務;若不能滿足,則通過slotPool向RM請求slot資源;
- 過程3:若是RM中的slotManager的slot能滿足JM的請求,則會向TM發起RPC請求申請對應的slot,TM中TaskSlotTable會把slot信息以slotoffer提供JM的slotPool(過程4);若是不能滿足,則RM會向更底層的系統獲取資源。
-
3.3 TaskManager
TaskManager與TaskExecutor的關系類似於JobManager與JobMaster。TaskExecutor負責task的執行。
TaskExecutor中的組件有與RM、JM交互服務,如TaskExecutorToResourceManagerConnection、resourceManagerHeartbeatManager等;有與JM交互的服務,jobManagerHeartbeatManager,這些組件功能與RM和JM中的類似,在這里會嘗試分析TaskSlotTable、JobTable、KvStateService等。
- TaskManager的啟動過程:
TaskManager的啟動入口在TaskManagerRunner。
TaskManagerRunner#main
|
|-- runTaskManager
| //該過程會啟動TaskManager
|-- createTaskExecutorService
| //該過程會從配置中初始化一些服務如:TaskManagerServices、KvStateService,
| //返回一個初始化的TaskExecutor,在初始化的過程中會啟動相應的RPC endpoint
|-- startTaskManager
|---- TaskManagerServices.fromConfiguration //可以重點看看
|-- TaskExecutorService#start
這里我們會着重分析一下runTaskManager:
- 在
createTaskExecutorService->startTaskManager
的過程中會從獲取配置參數,最后會new TaskExecutor
時通過調用父類的構造方法啟動相應的RPC endpoint。在這個過程,我們可以看到taskManagerRunner是通過TaskExecutorToServiceAdapter.createFor()
,僅僅是一個適配器,其本質還是TaskExecutor。 TaskExecutorService#start
會逐步調用RpcEndpoint的start()方法,其RpcServer就是在上一步中初始化的。這里會逐步調用到TaskExecutor#onStart()。
-
TaskSlotTable
TaskSlotTable從不同的維度維護slot信息,如JobID和slot的關系,AllocationID與slot的關系,其具體存儲的方式是map,這樣根據不同的key就可以很快的獲取到slot信息。其中,對於那些已分配但是無法分配到具體JobManage的slot會啟動一個定時任務,若是超時會釋放slot以免內存泄漏。- TaskSlotTable的初始化過程是在
TaskManagerServices#fromConfiguration
方法中被初始化的; - TaskSlotTable的啟動時在
TaskExecutor#startTaskExecutorServices
中,這里對slot明確了free和負責timeout的線程。
TaskSlotTable在slot整個流轉中的作用見上文。
- TaskSlotTable的初始化過程是在
-
KvStateService
KvState的注冊服務,其啟動過程TaskManagerServices#fromConfiguration
方法中,具體過程如下:
public static TaskManagerServices fromConfiguration(){
//......
//從配置文件構造KvStateService
final KvStateService kvStateService =
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
//......
}
// 上面start()方法,最終會調用AbstractServerBase#start,該方法中要是bind到指定地址上。關鍵步驟如下:
private boolean attemptToBind(final int port) throws Throwable {
log.debug("Attempting to start {} on port {}.", serverName, port);
this.queryExecutor = createQueryExecutor();
this.handler = initializeHandler();
final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink " + serverName + " EventLoop Thread %d")
.build();
final NioEventLoopGroup nioGroup =
new NioEventLoopGroup(numEventLoopThreads, threadFactory);
this.bootstrap =
new ServerBootstrap()
.localAddress(bindAddress, port)
.group(nioGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.ALLOCATOR, bufferPool)
.childHandler(new ServerChannelInitializer<>(handler));
final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
//noinspection ConstantConditions
// (ignore warning here to make this flexible in case the configuration values change)
if (LOW_WATER_MARK > defaultHighWaterMark) {
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
} else { // including (newHighWaterMark < defaultLowWaterMark)
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
}
try {
final ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
final InetSocketAddress localAddress =
(InetSocketAddress) future.channel().localAddress();
serverAddress =
new InetSocketAddress(localAddress.getAddress(), localAddress.getPort());
return true;
}
// the following throw is to bypass Netty's "optimization magic"
// and catch the bind exception.
// the exception is thrown by the sync() call above.
throw future.cause();
} catch (BindException e) {
//.....
}
// any other type of exception we let it bubble up.
return false;
}
- JobTable
JobTable的任務是用來管理一個Job在TaskExecutor上的生命周期。其主要是維護了兩個Map:JobID與Job/Connection、ResourceId與JobID。其中Job接口反映了job和JobMaster的connect。
JobTable的初始化過程也是在TaskManagerServices#fromConfiguration
中,在此僅僅是初始了默認的DefaultJobTable。
JobTable的初始過程在RM向TM請求slot的過程中初始化的,具體過程如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the
// instance/registration Id
log.info(
"Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId,
jobId,
resourceManagerId);
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message =
String.format(
"TaskManager is not connected to the resource manager %s.",
resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
allocateSlot(slotId, jobId, allocationId, resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
//初始化job
job =
jobTable.getOrCreateJob(
jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(
new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}