Flink中支持的BLOB文件類型
-
jar包
被user classloader使用的jar包
-
高負荷RPC消息
1. RPC消息長度超出了akka.framesize的大小 2. 在HA摸式中,利用底層分布式文件系統分發單個高負荷RPC消息,比如: TaskDeploymentDescriptor,給多個接受對象。 3. 失敗導致重新部署過程中復用RPC消息
-
TaskManager的日志文件
為了在web ui上展示taskmanager的日志
按存儲特性又分為兩類
-
PERMANENT_BLOB
生命周期和job的生命周期一致,並且是可恢復的。會上傳到BlobStore分布式文件系統中。
-
TRANSIENT_BLOB
生命周期由用戶自行管理,並且是不可恢復的。不會上傳到BlobStore分布式文件系統中。
架構圖
BlobStore
BLOB底層存儲,支持多種實現`HDFS`,`S3`,`FTP`等,HA中使用BlobStore進行文件的恢復。
BlobServer
* 提供了基於jobId和BlobKey進行文件上傳和下載的方法
* 本地文件系統的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* HA 分布式文件系統的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* 負責本地文件系統和分布式文件系統的清理工作
* 先存儲到本地文件系統中,然后如果需要的話再存儲到分布式文件系統中
* 下載請求優先使用本地文件系統中的文件
* 進行HA恢復中,下載分布式系統中的文件到本地文件系統中
BlobClient
* 基於jobId和BlobKey對BlobServer中的文件進行本地文件緩存
* 本地文件的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* 優先使用本地文件系統中的文件,然后嘗試從HA分布式文件中獲取,最后才嘗試從BlobServer中下載
* 負責本地文件系統的清理工作
LibraryCacheManager
橋接task的classloader和緩存的庫文件,其`registerJob`,`registerTask`會構建並緩存job,task運行需要的classloader
示例解析:standalone模式中的jar包管理
JobManager會創建BlobStore、BlobServer、BlobLibraryCacheManager具體過程見JobManager的createJobManagerComponents
方法
try {
blobServer = new BlobServer(configuration, blobStore)
blobServer.start()
instanceManager = new InstanceManager()
scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
libraryCacheManager =
new BlobLibraryCacheManager(
blobServer,
ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns)
instanceManager.addInstanceListener(scheduler)
}
TaskManager注冊到Jobmanager后會創建BlobCacheService、BlobLibraryCacheManager具體過程見TaskManager的associateWithJobManager
方法
try {
val blobcache = new BlobCacheService(
address,
config.getConfiguration(),
highAvailabilityServices.createBlobStore())
blobCache = Option(blobcache)
libraryCacheManager = Some(
new BlobLibraryCacheManager(
blobcache.getPermanentBlobService,
config.getClassLoaderResolveOrder(),
config.getAlwaysParentFirstLoaderPatterns))
}
JobClient在向集群提交job的過程中會調用JobSubmissionClientActor的tryToSubmitJob
方法進而調用JobGraph對象的uploadUserJars
方法
try {
jobGraph.uploadUserJars(blobServerAddress, clientConfig);
} catch (IOException exception) {
getSelf().tell(
decorateMessage(new JobManagerMessages.JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(
jobGraph.getJobID(),
"Could not upload the jar files to the job manager.",
exception)
)
)),
ActorRef.noSender());
return null;
}
LOG.info("Submit job to the job manager {}.", jobManager.path());
jobManager.tell(
decorateMessage(
new JobManagerMessages.SubmitJob(
jobGraph,
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
getSelf());
public void uploadUserJars(
InetSocketAddress blobServerAddress,
Configuration blobClientConfig) throws IOException {
if (!userJars.isEmpty()) {
List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(
blobServerAddress, blobClientConfig, jobID, userJars);
for (PermanentBlobKey blobKey : blobKeys) {
if (!userJarBlobKeys.contains(blobKey)) {
userJarBlobKeys.add(blobKey);
}
}
}
}
然后在JobManager的submitJob
方法中會調用BlobLibraryCacheManager的registerJob
創建並緩存該job的classloader
try {
libraryCacheManager.registerJob(
jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
}
catch {
case t: Throwable =>
throw new JobSubmissionException(jobId,
"Cannot set up the user code libraries: " + t.getMessage, t)
}
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
TaskManager在執行Task時,首先會調用LibraryCacheManager的registerTask
從BlobServer下載相應的jar包並創建classloader
blobService.getPermanentBlobService().registerJob(jobId);
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
private ClassLoader createUserCodeClassloader() throws Exception {
long startDownloadTime = System.currentTimeMillis();
// triggers the download of all missing jar files from the job manager
libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",
executionId, System.currentTimeMillis() - startDownloadTime);
ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);
if (userCodeClassLoader == null) {
throw new Exception("No user code classloader available.");
}
return userCodeClassLoader;
}
涉及到的相關配置
參數 | 默認值 | 描述 |
---|---|---|
high-availability.storageDir | 無 | HA BlobStore根目錄 |
blob.storage.directory | <java.io.tmpdir> | BlobServer 本地文件根目錄 |
blob.fetch.num-concurrent | 50 | BlobServer fetch文件的最大並行度 |
blob.fetch.backlog | 1000 | 允許最大的排隊等待鏈接數 |
blob.service.cleanup.interval | 3600 | BlobServer cleanup 線程運行的間隔 |
blob.fetch.retries | 5 | 從BlobServer下載文件錯誤重試次數 |
blob.server.port | 0 | BlobServer端口范圍 |
blob.offload.minsize | 1024 * 1024 | 運行通過BlobServer傳遞的最小消息大小 |
classloader.resolve-order | child-first | classloader類加載順序 |