MapReduce中的分布式緩存使用
@(Hadoop)
簡介
DistributedCache是Hadoop為MapReduce框架提供的一種分布式緩存機制,它會將需要緩存的文件分發到各個執行任務的子節點的機器中,各個節點可以自行讀取本地文件系統上的數據進行處理。
符號鏈接
可以同在原本HDFS文件路徑上+”#somename”來設置符號連接(相當於一個快捷方式)
這樣在MapReduce程序中可以直接通通過:
File file = new File("somename");
來獲得這個文件
緩存在本地的目錄設置
以下為默認值:
<property>
<name>mapred.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/localdir/filecache</value>
</property>
<property>
<name>local.cache.size</name>
<value>10737418240</value>
</property>
應用場景
1.分發第三方庫(jar,so等)
2.共享一些可以裝載進內存的文件
3.進行類似join連接時,小表的分發
使用方式
舊版本的DistributedCache已經被注解為過時,以下為Hadoop-2.2.0以上的新API接口,測試的Hadoop版本為2.7.2。
Job job = Job.getInstance(conf);
//將hdfs上的文件加入分布式緩存
job.addCacheFile(new URI("hdfs://url:port/filename#symlink"));
由於新版API中已經默認創建符號連接,所以不需要再調用setSymlink(true)方法了,可以通過
System.out.println(context.getSymlink());
來查看是否開啟了創建符號連接。
之后在map/reduce函數中可以通過context來訪問到緩存的文件,一般是重寫setup方法來進行初始化:
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
String path = context.getLocalCacheFiles()[0].getName();
File itermOccurrenceMatrix = new File(path);
FileReader fileReader = new FileReader(itermOccurrenceMatrix);
BufferedReader bufferedReader = new BufferedReader(fileReader);
String s;
while ((s = bufferedReader.readLine()) != null) {
//TODO:讀取每行內容進行相關的操作
}
bufferedReader.close();
fileReader.close();
}
}
得到的path為本地文件系統上的路徑。
這里的getLocalCacheFiles方法也被注解為過時了,只能使用context.getCacheFiles方法,和getLocalCacheFiles不同的是,getCacheFiles得到的路徑是HDFS上的文件路徑,如果使用這個方法,那么程序中讀取的就不再試緩存在各個節點上的數據了,相當於共同訪問HDFS上的同一個文件。
可以直接通過符號連接來跳過getLocalCacheFiles獲得本地的文件。
單機安裝的hadoop沒有通過,提示找不到該文件,待在集群上進行測試。
注意事項
1.需要分發的文件必須是存儲在HDFS上了
2.文件只讀
3.不緩存太大的文件,執行task之前對進行文件的分發,影響task的啟動速度
作者:@小黑
