hadoop(c++實現)


先說明:本文基於hadoop 0.20.2版本。

 

文章來源:http://www.codelast.com/

 

(1)首先我們需要知道map-reduce的基本原理,這里不說了。其次我們需要知道,在用C++編寫hadoop應用程序時,需要包含三個頭文件:

 

#include "Pipes.hh"

 

#include "TemplateFactory.hh"

 

#include "StringUtils.hh"

 

這三個文件在hadoop安裝包的 “c++\Linux-amd64-64\include\” 或 “c++\Linux-i386-32\include\” 子目錄下(根據你的操作系統是64位或32位,分別對應不同的目錄)。

 

既然有頭文件,就需要有對應的實現文件,或者動態/靜態庫,這里我用的是靜態庫 libhadooppipes.a 和 libhadooputils.a 。靜態庫是在Makefile中指定的,后面再說。這里特別提醒一下大家:如果你的hadoop集群不是只有一台服務器,那么如果你編譯時使用了任何動態庫的話,在運行的時候就要保證在別的hadoop服務器上也能找到相應的動態庫,否則就會在hadoop JobTracker的詳細信息中看到找不到動態庫的錯誤提示。

 

文章來源:http://www.codelast.com/

 

(2)下面來看看程序:

 

#include"Pipes.hh"

#include"TemplateFactory.hh"

#include"StringUtils.hh"

 

class DataCountMap:public HadoopPipes::Mapper {

public:

  DataCountMap(HadoopPipes::TaskContext&context){}

  void map(HadoopPipes::MapContext&context) {

    std::vector<std::string>words=HadoopUtils::splitString(context.getInputValue()," ");    // 這里是分割字符串,如前文所述,每一行數據中的各項是以空格來分割的。分割的結果保存到了一個std::vector中

    if("kkk"==words[1]) {

      context.emit("kkk","1");

    } else if("nnn"==words[1]) {

      context.emit("nnn","1");

    }

  }

};

 

class DataCountReduce:public HadoopPipes::Reducer {

public:

  DataCountReduce(HadoopPipes::TaskContext&context){}

  void reduce(HadoopPipes::ReduceContext&context)

  {

    int sum=0;

    while(context.nextValue()) {

      sum++;

    }

    context.emit(context.getInputKey(),HadoopUtils::toString(sum));

  }

};

 

int main(int argc,char*argv[])

{

  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<DataCountMap, DataCountReduce>());

}

上面的程序挺簡單的,只要你知道了map-reduce的基本原理。

 

一個map類,一個reduce類,一個執行任務的main函數。

 

map類對每一行數據進行拆分,當找到我們感興趣的“kkk”或“nnn”時,就生成一條輸出的記錄(emit函數的作用);recude類對map的數據進行匯總,這里只是簡單地計數,所以每次+1。

 

文章來源:http://www.codelast.com/

 

(3)有了代碼,我們接着就要編寫相應的Makefile了。我的Makefile如下:

 

HADOOP_INSTALL = /usr/local/hadoop

 

INCLUDE_PATH = $(HADOOP_INSTALL)/src/c++/

 

 

 

CC              = g++

 

CXXFLAGS = -Wall -g \

 

                  -I${INCLUDE_PATH}pipes/api/hadoop \

 

                  -I${INCLUDE_PATH}utils/api/hadoop

 

LDFLAGS = -ljvm -lhadooppipes -lhadooputils -lpthread

 

 

 

OBJECTS=dz_count.o

 

dz_count: $(OBJECTS)

 

$(CC) $(CXXFLAGS) -o $@ $(OBJECTS) $(LDFLAGS)

 

 

 

其中,HADOOP_INSTALL是你的hadoop安裝路徑,其余的 INCLUDE_PATH 等請對照你的目錄做相應更改,最后生成的可執行程序名為dz_count。這里沒有考慮release,因為僅作簡單的說明用。

 

文章來源:http://www.codelast.com/

 

(4)有了代碼和Makefile,就可以編譯了。編譯得到可執行程序dz_count。將其上傳到hdfs中:

 

hadoop fs -put dz_count /my_dir/

 

其中 “/my_dir/” 是你在hdfs中的目錄。

 

文章來源:http://www.codelast.com/

 

(5)下面就可以運行我們的hadoop程序了:

 

hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input /data/ -output /my_dir/output -program /my_dir/dz_count

 

其中,-input /data/ 表明你的輸入數據(即你的源數據)所處的hdfs目錄為 /data/,-output /my_dir/output 表明你的輸出文件目錄為 /my_dir/output,“output” 這一級目錄必須不存在(如果存在會報錯),程序運行時會生成它。-program /my_dir/dz_count 表明你要運行的程序為 /my_dir/ 目錄下的 dz_count 程序。

 

回車之后程序就開始執行,隨后你可以在命令行下看到它的狀態在更新,或者在hadoop JobTracker中也可以觀察到程序的運行狀態。

 

文章來源:http://www.codelast.com/

 

(6)等程序執行完后,如果任務沒有失敗的話,我們可以看到,你前面指定的hdfs輸出目錄 /my_dir/output 里生成了一個文件(假設其名為“part-00000”),我們就可以查看執行結果了:

 

hadoop fs -cat /my_dir/output/part-00000

 

輸出結果形為:

 

kkk   178099387

 

nnn   678219805

 

表明第二項為“kkk”的數據行共有178099387條,而“nnn”則為678219805條。

 

文章來源:http://www.codelast.com/

 

順便再說一點廢話:

 

(1)如何中止一個hadoop任務?當你在命令行下提交了一個hadoop job后,就算你按Ctrl+C,也不能中止掉那個job,因為它已經被Jobtracker接管了。這時,你要用如下命令中止它:

 

hadoop job -kill Job_ID

 

其中,Job_ID就是你提交的job的ID,可以在Jobtracker中查看到。

 

(2)一些基本概念:

 

map-reduce過程中,在map時,hadoop會將輸入的數據按一定的大小(例如100M,這個值是可以配置的)分為若干塊來處理,一個塊對應一個map類,也就是說,一個塊只會執行map類的構造函數一次。而每一行記錄則對應一個map()方法,也就是說,一行記錄就會執行一次map()方法。因此,如果你有什么信息需要輸出(例如std::cout)的話,就要注意了:如果在map()方法中輸出,則當輸入數據量很大時,可能就會輸出太多的信息,如果可以在map的構造函數中輸出的話,則輸出的信息會少得多。

 

在reduce時,對map輸出的同一個key,有一個reduce類,也就是說,無論你的同一個key有多少個value,在reduce的時候只要是同一個key,就會出現在同一個reduce類里,在這個類里的reduce方法中,你用 while (context.nextValue()) 循環可以遍歷所有的value,這樣就可以處理同一個key的N個value了。

正因為在默認情況下,相同key的記錄會落到同一個reducer中,所以,當你的key的數量比你設置的reducer的數量要少的時候,就導致了某些reducer分配不到任何數據,最終輸出的某些文件(part-r-xxxxx)是空文件。如果你設置的reducer數量要少於key的數量(這是最常見的情況),那么就會有多個key落入同一個reducer中被處理,但是,每一次reduce()方法被調用時,其中將只包含一個key,同一個reducer里的多個key就會導致reduce()方法被多次調用。

 

文章來源:http://www.codelast.com/

 

這樣,我們就完成了一個完整的C++ hadoop分布式應用程序的編寫。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM