Flink BLOB架構


Flink中支持的BLOB文件類型

  • jar包

      被user classloader使用的jar包
    
  • 高負荷RPC消息

      1. RPC消息長度超出了akka.framesize的大小
      2. 在HA摸式中,利用底層分布式文件系統分發單個高負荷RPC消息,比如: TaskDeploymentDescriptor,給多個接受對象。
      3. 失敗導致重新部署過程中復用RPC消息
    
  • TaskManager的日志文件

      為了在web ui上展示taskmanager的日志
    

按存儲特性又分為兩類

  • PERMANENT_BLOB

      生命周期和job的生命周期一致,並且是可恢復的。會上傳到BlobStore分布式文件系統中。
    
  • TRANSIENT_BLOB

      生命周期由用戶自行管理,並且是不可恢復的。不會上傳到BlobStore分布式文件系統中。
    

架構圖

BlobStore

BLOB底層存儲,支持多種實現`HDFS`,`S3`,`FTP`等,HA中使用BlobStore進行文件的恢復。

BlobServer

* 提供了基於jobId和BlobKey進行文件上傳和下載的方法 
* 本地文件系統的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* HA 分布式文件系統的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* 負責本地文件系統和分布式文件系統的清理工作
* 先存儲到本地文件系統中,然后如果需要的話再存儲到分布式文件系統中
* 下載請求優先使用本地文件系統中的文件
* 進行HA恢復中,下載分布式系統中的文件到本地文件系統中

BlobClient

* 基於jobId和BlobKey對BlobServer中的文件進行本地文件緩存
* 本地文件的讀寫,基於`<path>/<jobId>/<BlobKey>`目錄結構
* 優先使用本地文件系統中的文件,然后嘗試從HA分布式文件中獲取,最后才嘗試從BlobServer中下載
* 負責本地文件系統的清理工作

LibraryCacheManager

橋接task的classloader和緩存的庫文件,其`registerJob`,`registerTask`會構建並緩存job,task運行需要的classloader

示例解析:standalone模式中的jar包管理

  JobManager會創建BlobStore、BlobServer、BlobLibraryCacheManager具體過程見JobManager的createJobManagerComponents方法

    try {
      blobServer = new BlobServer(configuration, blobStore)
      blobServer.start()
      instanceManager = new InstanceManager()
      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
      libraryCacheManager =
        new BlobLibraryCacheManager(
          blobServer,
          ResolveOrder.fromString(classLoaderResolveOrder),
          alwaysParentFirstLoaderPatterns)

      instanceManager.addInstanceListener(scheduler)
    }

  TaskManager注冊到Jobmanager后會創建BlobCacheService、BlobLibraryCacheManager具體過程見TaskManager的associateWithJobManager方法

    try {
      val blobcache = new BlobCacheService(
        address,
        config.getConfiguration(),
        highAvailabilityServices.createBlobStore())
      blobCache = Option(blobcache)
      libraryCacheManager = Some(
        new BlobLibraryCacheManager(
          blobcache.getPermanentBlobService,
          config.getClassLoaderResolveOrder(),
          config.getAlwaysParentFirstLoaderPatterns))
    }

  JobClient在向集群提交job的過程中會調用JobSubmissionClientActor的tryToSubmitJob方法進而調用JobGraph對象的uploadUserJars方法

				try {
					jobGraph.uploadUserJars(blobServerAddress, clientConfig);
				} catch (IOException exception) {
					getSelf().tell(
						decorateMessage(new JobManagerMessages.JobResultFailure(
							new SerializedThrowable(
								new JobSubmissionException(
									jobGraph.getJobID(),
									"Could not upload the jar files to the job manager.",
									exception)
							)
						)),
						ActorRef.noSender());

					return null;
				}

				LOG.info("Submit job to the job manager {}.", jobManager.path());

				jobManager.tell(
					decorateMessage(
						new JobManagerMessages.SubmitJob(
							jobGraph,
							ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
					getSelf());
	public void uploadUserJars(
			InetSocketAddress blobServerAddress,
			Configuration blobClientConfig) throws IOException {
		if (!userJars.isEmpty()) {
			List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(
				blobServerAddress, blobClientConfig, jobID, userJars);

			for (PermanentBlobKey blobKey : blobKeys) {
				if (!userJarBlobKeys.contains(blobKey)) {
					userJarBlobKeys.add(blobKey);
				}
			}
		}
	}

  然后在JobManager的submitJob方法中會調用BlobLibraryCacheManager的registerJob創建並緩存該job的classloader

        try {
          libraryCacheManager.registerJob(
            jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
        }
        catch {
          case t: Throwable =>
            throw new JobSubmissionException(jobId,
              "Cannot set up the user code libraries: " + t.getMessage, t)
        }

        val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)

  TaskManager在執行Task時,首先會調用LibraryCacheManager的registerTask從BlobServer下載相應的jar包並創建classloader

		blobService.getPermanentBlobService().registerJob(jobId);

		// first of all, get a user-code classloader
		// this may involve downloading the job's JAR files and/or classes
		LOG.info("Loading JAR files for task {}.", this);

		userCodeClassLoader = createUserCodeClassloader();
	private ClassLoader createUserCodeClassloader() throws Exception {
		long startDownloadTime = System.currentTimeMillis();

		// triggers the download of all missing jar files from the job manager
		libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);

		LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",
				executionId, System.currentTimeMillis() - startDownloadTime);

		ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);
		if (userCodeClassLoader == null) {
			throw new Exception("No user code classloader available.");
		}
		return userCodeClassLoader;
	}

涉及到的相關配置

參數 默認值 描述
high-availability.storageDir HA BlobStore根目錄
blob.storage.directory <java.io.tmpdir> BlobServer 本地文件根目錄
blob.fetch.num-concurrent 50 BlobServer fetch文件的最大並行度
blob.fetch.backlog 1000 允許最大的排隊等待鏈接數
blob.service.cleanup.interval 3600 BlobServer cleanup 線程運行的間隔
blob.fetch.retries 5 從BlobServer下載文件錯誤重試次數
blob.server.port 0 BlobServer端口范圍
blob.offload.minsize 1024 * 1024 運行通過BlobServer傳遞的最小消息大小
classloader.resolve-order child-first classloader類加載順序


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM