Flink的分布式緩存


分布式緩存

Flink提供了一個分布式緩存,類似於hadoop,可以使用戶在並行函數中很方便的讀取本地文件,並把它放在taskmanager節點中,防止task重復拉取。
此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統,例如hdfs或者s3),通過ExecutionEnvironment注冊緩存文件並為它起一個名稱。
當程序執行,Flink自動將文件或者目錄復制到所有taskmanager節點的本地文件系統,僅會執行一次。用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節點的本地文件系統訪問它。

示例

在ExecutionEnvironment中注冊一個文件:

//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:注冊一個文件,可以使用hdfs上的文件 也可以是本地文件進行測試 env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); 

在用戶函數中訪問緩存文件或者目錄(這里是一個map函數)。這個函數必須繼承RichFunction,因為它需要使用RuntimeContext讀取數據:

DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式緩存為:" + line); } } @Override public String map(String value) throws Exception { //在這里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //業務邏輯 return dataList +":" + value; } }); result.printToErr(); } 

完整代碼如下,仔細看注釋:


public class DisCacheTest { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:注冊一個文件,可以使用hdfs上的文件 也可以是本地文件進行測試 //text 中有4個單詞:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式緩存為:" + line); } } @Override public String map(String value) throws Exception { //在這里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //業務邏輯 return dataList +":" + value; } }); result.printToErr(); } }// 

輸出結果如下:

[hello, flink, hello, FLINK]:a [hello, flink, hello, FLINK]:b [hello, flink, hello, FLINK]:c [hello, flink, hello, FLINK]:d 

公眾號推薦

  • 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
  • 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
  • 更多大數據技術歡迎和作者一起探討~
 
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM