1 分布式緩存
- Flink提供了一個分布式緩存,類似於hadoop,可以使用戶在並行函數中很方便的讀取本地文件,並把它放在taskmanager節點中,防止task重復拉取。
- 此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統,例如hdfs或者s3),通過ExecutionEnvironment注冊緩存文件並為它起一個名稱。當程序執行,Flink自動將文件或者目錄復制到所有taskmanager節點的本地文件系統,僅會執行一次。用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節點的本地文件系統訪問它
-
2 使用技巧
-
1:注冊一個文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") -
2:訪問數據
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
-
3 應用案例實戰
3.1 在D盤創建一個文件discache.txt,並進行registerCachedFile
3.2 每一個TaskManager都會存在一份,防止MapTask重復拉取文件。
import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object BatchDemoDisCacheScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //1:注冊文件 env.registerCachedFile("d:\\data\\file\\a.txt","b.txt") val data = env.fromElements("a","b","c","d") val result = data.map(new RichMapFunction[String,String] { override def open(parameters: Configuration): Unit = { super.open(parameters) val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt") val lines = FileUtils.readLines(myFile) val it = lines.iterator() while (it.hasNext){ val line = it.next(); println("line:"+line) } } override def map(value: String) = { value } }) result.print() } }
參考:
https://blog.csdn.net/shenshouniu/article/details/84499655
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/
