一、概念介紹
1、分布式緩存的理解:Hadoop為MapReduce框架提供的一種分布式緩存機制,它會將需要緩存的文件分發到各個執行任務的子節點的機器中,各個節點可以自行讀取本地文件系統上的數據進行處理。
2、符號連接:在原本HDFS文件路徑上加”#filename”來設置符號連接(這樣寫新API會默認創建符號連接,相當於一個快捷方式)
這樣在MapReduce程序中可以直接通過:File file = new File("filename");
來獲得這個文件。
要使用符號連接,需要檢查是否啟用了符號連接,通過 FileSystem.areSymlinksEnabled()的布爾返回值來確定。
如果未開啟,使用 FileSystem.enableSymlinks()方法來開啟符號連接。
二、使用
1、添加分布式緩存:添加分布式緩存,將文件復制到任務節點
舊版本的DistributedCache已經被注解為過時,使用新的API來解決這個問題,如下:
不同文件類型的添加方法:
job.addArchiveToClassPath(archive); // 緩存jar包到task運行節點的classpath中 job.addFileToClassPath(file); // 緩存普通文件到task運行節點的classpath中 job.addCacheArchive(uri); // 緩存壓縮包文件到task運行節點的工作目錄 job.addCacheFile(uri) // 緩存普通文件到task運行節點的工作目錄
舉個具體的例子:
在main方法中,創建job時,添加需要使用到的緩存文件,如果需要使用第二個參數,URI參數那里可以不寫hdfs://node:8020(舉例)
job.addCacheFile(new URI("/foo/bar//fileName.txt#fileName.txt"));//添加hdfs文件做緩存
2、使用分布式緩存
在Mapper類的setup方法中讀取分布式緩存文件:Mapper類主要有4個方法:setup(),map(),cleanup(),run(),run()方法調用其它三個方法,順序是:setup()-map()-cleanup。並且setup()執行且僅
執行一次,主要用來為map()方法做一些准備工作,所以很多初始化的工作盡量放在這里做。Reducer類也是類似的。
setup()方法實現示例如下:
@Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { BufferedReader br = br = new BufferedReader(new InputStreamReader(new FileInputStream("fileName")));
// 這里的fileName應該是上一步中 # 后面的名字,需要對應 String line; while ((line = br.readLine()) != null) { String[] temp = line.split("\t"); // 具體的處理邏輯 } br.close(); } }
參考:http://www.codeweblog.com/yarn-mapreduce-2-0-%E4%B8%8B%E5%88%86%E5%B8%83%E5%BC%8F%E7%BC%93%E5%AD%98-distributedcache-%E7%9A%84%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9/