Flink分布式緩存Distributed Cache


1 分布式緩存

  • Flink提供了一個分布式緩存,類似於hadoop,可以使用戶在並行函數中很方便的讀取本地文件,並把它放在taskmanager節點中,防止task重復拉取。
  • 此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統,例如hdfs或者s3),通過ExecutionEnvironment注冊緩存文件並為它起一個名稱。當程序執行,Flink自動將文件或者目錄復制到所有taskmanager節點的本地文件系統,僅會執行一次。用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節點的本地文件系統訪問它
  • 2 使用技巧

    • 1:注冊一個文件

        env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  
    • 2:訪問數據

        File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

 

  3 應用案例實戰

 

3.1 在D盤創建一個文件discache.txt,並進行registerCachedFile

3.2 每一個TaskManager都會存在一份,防止MapTask重復拉取文件。

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration


object BatchDemoDisCacheScala {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._


    //1:注冊文件
    env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")

    val data = env.fromElements("a","b","c","d")

    val result = data.map(new RichMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")
        val lines = FileUtils.readLines(myFile)
        val it = lines.iterator()
        while (it.hasNext){
          val line = it.next();
          println("line:"+line)
        }
      }
      override def map(value: String) = {
        value
      }
    })

    result.print()

  }

}

 

參考:

https://blog.csdn.net/shenshouniu/article/details/84499655

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM