淺談hadoop中mapreduce的文件分發


近期在做數據分析的時候。須要在mapreduce中調用c語言寫的接口。此時就須要把動態鏈接庫so文件分發到hadoop的各個節點上,原來想自己來做這個分發,大概過程就是把so文件放在hdfs上面,然后做mapreduce的時候把so文件從hdfs下載到本地,但查詢資料后發現hadoop有對應的組件來幫助我們完畢這個操作,這個組件就是DistributedCache,分布式緩存,運用這個東西能夠做到第三方文件的分發和緩存功能,以下具體解釋:


假設我們須要在map之間共享一些數據,假設信息量不大,我們能夠保持在conf中。可是假設我們須要共享一些配置文件,jar包之類的。此時DistributedCache能夠滿足我們的需求,使用DistributedCache的過程例如以下:

1.正確配置被分發的文件的路徑(hdfs上的路徑)

2.在自己定義的mapper或reducer中獲取文件下載到本地后的路徑(linux文件系統路徑)。通常是重寫configure或者重寫setup(新方式)

3.在自己定義的mapper或reducer類中讀取這些文件的內容

以下以代碼說明三個步驟:

1.Configuration conf = new Configuration();

  DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/libJMeshCalc.so"), conf);

  DistributedCache.addCacheArchive(new URI("/user/tinfo/zhangguochen/libJMeshCalc.zip"),conf);
  DistributedCache.addFileToClassPath(new URI("/user/tinfo/zhangguochen/libMeshCal.jar"), conf);

    或者

    conf.set("mapred.cache.files", "/myapp/file");

   conf.set("mapred.cache. archives", "/mayapp/file.zip");

   以上是配置須要分發的hdfs上的文件,可是前提是這些文件必須在hdfs上存在,看源代碼可知道DistributedCache的靜態方法事實上就是封裝了conf.set的動作。

2.在自己的mapper類中,使用DistributedCache獲取下載到本地的文件。大部分情況下這些操作都是重寫configure接口(或者setup),然后把本地文件路徑保存在mapper類的成員變量中。供map方法使用。代碼例如以下:

   private Path[] localFiles;

   public void setup(Context context) {

       localFiles = DistributeCache.getLocalCacheFiles(context.getConfiguration());

       for(Path temp:localFiles) {

            String path = temp.toString();//path就是此文件在本地的路徑

            if(path.contains("myfileName")) {//獲取到自己須要的文件

            }

       }

    }

getLocalCacheFiles返回的是數組(元素類型是Path),數組內容是這個task(map或reduce)所屬的job設定的全部須要被分發的文件,假設設置了     多個文件。能夠遍歷Path數組,用String.contains("KeyWord")來推斷是否是你所須要的文件。


   獲取壓縮包的路徑

   private  File[] inputFiles;

   private Path[] localArchives;

   public void setup(Context context) {

       localArchives = DistributeCache.getLocalCacheArvhives();

       for(Path archive : localArchives) {

            if(archive.toString.contains("mytarName")) {//找到自己須要的文件

                 inputFiles = new File(archive.toString()).listFiles();//獲取壓縮包下的全部 文件

            }

       }

   }

    也能夠用DistributedCache將所使用到的第三方jar包載入到classpath中DistributedCache.addFileToClassPath

  

   通過以上代碼發現假設要使用這些分發到各個節點上的文件操作比較復雜,DistributedCache也提供一種更方便的使用方法。即能夠為每個分發的文件創建一個符號鏈接,然后hadoop就會在當前mapreduce的運行路徑下創建一個到源文件的鏈接,我們就能夠在mapreduce中直接使用這些文件,而不必關心這些文件在本地的路徑。

  演示樣例:

  1.把文件分發到緩存中

   Configuration conf = new Configuration();

   DistributedCache.createSymlink(conf);//創建符號鏈接
   DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/file1#myfile"), conf);//增加分布式緩存,myfile是符號


   2.在mapreduce中使用

    public void setup(Context context) {

       File myfile = new File("myfile");//在這里就能夠直接通過符號myfile使用此文件

    }

   或者用下面方式:

    conf.set("mapred.cache.files", "/data/data#mData");
     conf.set("mapred.cache.archives", "/data/data.zip#mDataZip");
     conf.set("mapred.create.symlink", "yes"); // 是yes。不是true
     DistributedCache.createSymlink(Configuration)
     在map階段,僅僅須要File file = new File("mData");就可以獲得該文件……

下面資料來自網絡。如有雷同,純屬意外

DistributedCache是Hadoop提供的文件緩存工具。它能夠自己主動將指定的文件分發到各個節點上,緩存到本地,供用戶程序讀取使用。它具有下面幾個特點:緩存的文件是僅僅讀的,改動這些文件內容沒有意義;用戶能夠調整文件可見范圍(比方僅僅能用戶自己使用,全部用戶都能夠使用等),進而防止反復拷貝現象;按需拷貝,文件是通過HDFS作為共享數據中心分發到各節點的,且僅僅發給任務被調度到的節點。本文將介紹DistributedCache在Hadoop 1.0和2.0中的用法及實現原理。

Hadoop DistributedCache有下面幾種典型的應用場景:1)分發字典文件。一些情況下Mapper或者Reducer須要用到一些外部字典。比方黑白名單、詞表等;2)map-side join:當多表連接時。一種場景是一個表非常大,一個表非常小。小到足以載入到內存中,這時能夠使用DistributedCache將小表分發到各個節點上,以供Mapper載入使用;3)自己主動化軟件部署:有些情況下,MapReduce需依賴於特定版本號的庫,比方依賴於某個版本號的PHP解釋器,一種做法是讓集群管理員把這個版本號的PHP裝到各個機器上,這通常比較麻煩,還有一種方法是使用DistributedCache分發到各個節點上,程序執行完后,Hadoop自己主動將其刪除。


Hadoop提供了兩種DistributedCache使用方式。一種是通過API。在程序中設置文件路徑,第二種是通過命令行(-files。-archives或-libjars)參數告訴Hadoop,個人建議使用第二種方式。該方式可使用下面三個參數設置文件:

(1)-files:將指定的本地/hdfs文件分發到各個Task的工作文件夾下。不正確文件進行不論什么處理;

(2)-archives:將指定文件分發到各個Task的工作文件夾下,並對名稱后綴為“.jar”、“.zip”。“.tar.gz”、“.tgz”的文件自己主動解壓,默認情況下。解壓后的內容存放到工作文件夾下名稱為解壓前文件名稱的文件夾中,比方壓縮包為dict.zip,則解壓后內容存放到文件夾dict.zip中。為此,你能夠給文件起個別名/軟鏈接,比方dict.zip#dict,這樣,壓縮包會被解壓到文件夾dict中。

(3)-libjars:指定待分發的jar包,Hadoop將這些jar包分發到各個節點上后,會將其自己主動加入到任務的CLASSPATH環境變量中。

hadoop jar xxx.jar -files hdfs://xxx/xx

hadoop jar xxx.jar -libjars hdfs://xxx/xxx.jar,hdfs://xxx/xx2.jar


前面提到。DistributedCache分發的文件是有可見范圍的。有的文件能夠僅僅對當前程序可見,程序執行完后,直接刪除;有的文件僅僅對當前用戶可見(該用戶全部程序都能夠訪問)。有的文件對全部用戶可見。DistributedCache會為每種資源(文件)計算一個唯一ID,以識別每一個資源,從而防止資源反復下載。舉個樣例。假設文件可見范圍是全部用戶。則在每一個節點上,第一個使用該文件的用戶負責緩存該文件,之后的用戶直接使用就可以,無需反復下載。那么。Hadoop是如何區分文件可見范圍的呢?

在Hadoop 1.0版本號中。Hadoop是以HDFS文件的屬性作為標識推斷文件可見性的,須要注意的是,待緩存的文件即使是在Hadoop提交作業的client上。也會首先上傳到HDFS的某一文件夾下,再分發到各個節點上的,因此。HDFS是緩存文件的必經之路。

對於常常使用的文件或者字典。建議放到HDFS上,這樣能夠防止每次反復下載,做法例如以下:

比方將數據保存在HDFS的/dict/public文件夾下。並將/dict和/dict/public兩層文件夾的可運行權限全部打開(在Hadoop中。可運行權限的含義與linux中的不同,該權限僅僅對文件夾有意義,表示能夠查看該文件夾中的子文件夾),這樣,里面全部的資源(文件)便是全部用戶可用的,而且第一個用到的應用程序會將之緩存到各個節點上,之后全部的應用程序無需反復下載。能夠在提交作業時通過下面命令指定:

-files hdfs:///dict/public/blacklist.txt, hdfs:///dict/public/whilelist.txt

假設有多個HDFS集群能夠指定namenode的對外rpc地址:

-files hdfs://host:port/dict/public/blacklist.txt, hdfs://host:port/dict/public/whilelist.txt

DistributedCache會將blacklist.txt和whilelist.txt兩個文件緩存到各個節點的一個公共文件夾下。並在須要時,在任務的工作文件夾下建立一個指向這兩個文件的軟連接。

假設可運行權限沒有打開。則默認僅僅對該應用程序的擁有者可見,該用戶全部應用程序可共享這些文件。

一旦你對/dict/public下的某個文件進行了改動,則下次有作業用到相應文件時,會發現文件被改動過了,進而自己主動又一次緩存文件。

對於一些頻繁使用的字典,不建議存放在client。每次通過-files指定,這種文件,每次都要經歷下面流程:上傳到HDFS上—》緩存到各個節點上—》之后不再使用這些文件,直到被清除,也就是說,這種文件。僅僅會被這次執行的應用程序使用,假設再次執行相同的應用程序,即使文件沒有被改動。也會又一次經歷以上流程。很耗費時間,尤其是字典許多,很大時。

DistributedCache內置緩存置換算法。一旦緩存(文件數目達到一定上限或者文件總大小超過某一上限)滿了之后。會踢除最久沒有使用的文件。

在Hadopo 2.0中。自帶的MapReduce框架仍支持1.0的這樣的DistributedCache使用方式。但DistributedCache本身是由YARN實現的。不再集成到MapReduce中。

YARN還提供了非常多相關編程接口供用戶調用,有興趣的能夠閱讀源碼。

以下介紹Hadoop 2.0中。DistributedCache通過命令行分發文件的基本使用方式:

(1)執行Hadoop自帶的example樣例, dict.txt會被緩存到各個Task的工作文件夾下,因此,直接像讀取本地文件一樣,在Mapper和Reducer中,讀取dict.txt就可以:

1
2
3
4
5
6
bin /Hadoop jar \
share /hadoop/mapreduce/hadoop-mapreduce-examples-2 .2.0.jar \
wordcount \
-files hdfs: ///dict/public/dict .txt \
/test/input \
/test/output

(2)Hadoop Streaming樣例,須要通過-files指定mapper和reducer可運行文件或者腳本文件,這些文件就是通過DistributedCache分發到各個節點上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
HADOOP_HOME= /opt/yarn-client
INPUT_PATH= /test/input/data
OUTPUT_PATH= /test/output/data
echo "Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME /bin/hadoop fs -rmr $OUTPUT_PATH
 
${HADOOP_HOME} /bin/hadoop jar\
    ${HADOOP_HOME} /share/hadoop/tools/lib/hadoop-streaming-2 .2.0.jar\
   -D mapred.reduce.tasks=2\
   -files mapper,reducer\
   -input $INPUT_PATH\
   -output $OUTPUT_PATH\
   -mapper mapper\
   -reducer reducer

(3)接下給出一個緩存壓縮文件的樣例。如果壓縮文件為dict.zip。里面存的數據為:

1
2
3
4
data /1 .txt
data /2 .txt
mapper.list
reducer.list

通過-archives參數指定dict.zip后。該文件被解壓后,將被緩存(實際上是軟連接)到各個Task的工作文件夾下的dict.zip文件夾下,組織結構例如以下:

1
2
3
4
5
6
dict.zip/
     data/
         1.txt
         2.txt
     mapper.list
     reducer.list

你能夠在Mapper或Reducer程序中。使用類似以下的代碼讀取解壓后的文件:

 

1
2
3
File file2 = read(“dict.zip/data/1.txt”, “r”);
…….
File file3 = read(“dict.zip/mapper.list”, “r”);

假設你想直接將內容解壓到Task工作文件夾下,而不是子文件夾dict.zip中。能夠用“-files”(注意,不要使用-archives,“-files”指定的文件不會被解壓)指定dict.zip,並自己在程序中實現解壓縮:

1
2
3
4
#include <cstdlib>
…….
system (“unzip –q dict.zip”); //C++代碼
……

總之,Hadoop DistributedCache是一個很好用的工具,合理的使用它可以解決許多很困難的問題。 

   總結下面:假設mr程序中須要第三方jar包,能夠通過在程序中使用DistributedCache,也能夠在命令中使用-libjars來實現,可是這些引入的jar都僅僅能夠在mr任務啟動之后來使用。假設你在啟動MR任務之前調用了第三方jar包的類,那這就會有問題,會在啟動任務的時候找不到這個類。

此時能夠使用例如以下方式解決:

   在你的project里面建立一個lib目錄。然后把全部的第三方jar包放到里面去。hadoop會自己主動載入lib依賴里面的jar。 這樣就能夠在mr啟動之前也能夠使用第三方jar了。

   方法調用順序為(以libjars為例): -libjars --->conf.set("tmpjars")--->

DistributedCache.addArchiveToClassPath--->conf.set("mapreduce.job.cache.archives","")

相關文章鏈接:http://blog.csdn.net/xiaolang85/article/details/11782539

                        http://blog.csdn.net/lazy0zz/article/details/7505712

              

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM