分布式緩存
Flink提供了一個分布式緩存,類似於hadoop,可以使用戶在並行函數中很方便的讀取本地文件,並把它放在taskmanager節點中,防止task重復拉取。
此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或者遠程文件系統,例如hdfs或者s3),通過ExecutionEnvironment注冊緩存文件並為它起一個名稱。
當程序執行,Flink自動將文件或者目錄復制到所有taskmanager節點的本地文件系統,僅會執行一次。用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節點的本地文件系統訪問它。
示例
在ExecutionEnvironment中注冊一個文件:
//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:注冊一個文件,可以使用hdfs上的文件 也可以是本地文件進行測試 env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
在用戶函數中訪問緩存文件或者目錄(這里是一個map函數)。這個函數必須繼承RichFunction,因為它需要使用RuntimeContext讀取數據:
DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式緩存為:" + line); } } @Override public String map(String value) throws Exception { //在這里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //業務邏輯 return dataList +":" + value; } }); result.printToErr(); }
完整代碼如下,仔細看注釋:
public class DisCacheTest { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:注冊一個文件,可以使用hdfs上的文件 也可以是本地文件進行測試 //text 中有4個單詞:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式緩存為:" + line); } } @Override public String map(String value) throws Exception { //在這里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //業務邏輯 return dataList +":" + value; } }); result.printToErr(); } }//
輸出結果如下:
[hello, flink, hello, FLINK]:a [hello, flink, hello, FLINK]:b [hello, flink, hello, FLINK]:c [hello, flink, hello, FLINK]:d
公眾號推薦
- 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
- 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
- 更多大數據技術歡迎和作者一起探討~

image