先說明:本文基於hadoop 0.20.2版本。
文章來源:http://www.codelast.com/
(1)首先我們需要知道map-reduce的基本原理,這里不說了。其次我們需要知道,在用C++編寫hadoop應用程序時,需要包含三個頭文件:
#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"
這三個文件在hadoop安裝包的 “c++\Linux-amd64-64\include\” 或 “c++\Linux-i386-32\include\” 子目錄下(根據你的操作系統是64位或32位,分別對應不同的目錄)。
既然有頭文件,就需要有對應的實現文件,或者動態/靜態庫,這里我用的是靜態庫 libhadooppipes.a 和 libhadooputils.a 。靜態庫是在Makefile中指定的,后面再說。這里特別提醒一下大家:如果你的hadoop集群不是只有一台服務器,那么如果你編譯時使用了任何動態庫的話,在運行的時候就要保證在別的hadoop服務器上也能找到相應的動態庫,否則就會在hadoop JobTracker的詳細信息中看到找不到動態庫的錯誤提示。
文章來源:http://www.codelast.com/
(2)下面來看看程序:
#include"Pipes.hh"
#include"TemplateFactory.hh"
#include"StringUtils.hh"
class DataCountMap:public HadoopPipes::Mapper {
public:
DataCountMap(HadoopPipes::TaskContext&context){}
void map(HadoopPipes::MapContext&context) {
std::vector<std::string>words=HadoopUtils::splitString(context.getInputValue()," "); // 這里是分割字符串,如前文所述,每一行數據中的各項是以空格來分割的。分割的結果保存到了一個std::vector中
if("kkk"==words[1]) {
context.emit("kkk","1");
} else if("nnn"==words[1]) {
context.emit("nnn","1");
}
}
};
class DataCountReduce:public HadoopPipes::Reducer {
public:
DataCountReduce(HadoopPipes::TaskContext&context){}
void reduce(HadoopPipes::ReduceContext&context)
{
int sum=0;
while(context.nextValue()) {
sum++;
}
context.emit(context.getInputKey(),HadoopUtils::toString(sum));
}
};
int main(int argc,char*argv[])
{
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<DataCountMap, DataCountReduce>());
}
上面的程序挺簡單的,只要你知道了map-reduce的基本原理。
一個map類,一個reduce類,一個執行任務的main函數。
map類對每一行數據進行拆分,當找到我們感興趣的“kkk”或“nnn”時,就生成一條輸出的記錄(emit函數的作用);recude類對map的數據進行匯總,這里只是簡單地計數,所以每次+1。
文章來源:http://www.codelast.com/
(3)有了代碼,我們接着就要編寫相應的Makefile了。我的Makefile如下:
HADOOP_INSTALL = /usr/local/hadoop
INCLUDE_PATH = $(HADOOP_INSTALL)/src/c++/
CC = g++
CXXFLAGS = -Wall -g \
-I${INCLUDE_PATH}pipes/api/hadoop \
-I${INCLUDE_PATH}utils/api/hadoop
LDFLAGS = -ljvm -lhadooppipes -lhadooputils -lpthread
OBJECTS=dz_count.o
dz_count: $(OBJECTS)
$(CC) $(CXXFLAGS) -o $@ $(OBJECTS) $(LDFLAGS)
其中,HADOOP_INSTALL是你的hadoop安裝路徑,其余的 INCLUDE_PATH 等請對照你的目錄做相應更改,最后生成的可執行程序名為dz_count。這里沒有考慮release,因為僅作簡單的說明用。
文章來源:http://www.codelast.com/
(4)有了代碼和Makefile,就可以編譯了。編譯得到可執行程序dz_count。將其上傳到hdfs中:
hadoop fs -put dz_count /my_dir/
其中 “/my_dir/” 是你在hdfs中的目錄。
文章來源:http://www.codelast.com/
(5)下面就可以運行我們的hadoop程序了:
hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input /data/ -output /my_dir/output -program /my_dir/dz_count
其中,-input /data/ 表明你的輸入數據(即你的源數據)所處的hdfs目錄為 /data/,-output /my_dir/output 表明你的輸出文件目錄為 /my_dir/output,“output” 這一級目錄必須不存在(如果存在會報錯),程序運行時會生成它。-program /my_dir/dz_count 表明你要運行的程序為 /my_dir/ 目錄下的 dz_count 程序。
回車之后程序就開始執行,隨后你可以在命令行下看到它的狀態在更新,或者在hadoop JobTracker中也可以觀察到程序的運行狀態。
文章來源:http://www.codelast.com/
(6)等程序執行完后,如果任務沒有失敗的話,我們可以看到,你前面指定的hdfs輸出目錄 /my_dir/output 里生成了一個文件(假設其名為“part-00000”),我們就可以查看執行結果了:
hadoop fs -cat /my_dir/output/part-00000
輸出結果形為:
kkk 178099387
nnn 678219805
表明第二項為“kkk”的數據行共有178099387條,而“nnn”則為678219805條。
文章來源:http://www.codelast.com/
順便再說一點廢話:
(1)如何中止一個hadoop任務?當你在命令行下提交了一個hadoop job后,就算你按Ctrl+C,也不能中止掉那個job,因為它已經被Jobtracker接管了。這時,你要用如下命令中止它:
hadoop job -kill Job_ID
其中,Job_ID就是你提交的job的ID,可以在Jobtracker中查看到。
(2)一些基本概念:
map-reduce過程中,在map時,hadoop會將輸入的數據按一定的大小(例如100M,這個值是可以配置的)分為若干塊來處理,一個塊對應一個map類,也就是說,一個塊只會執行map類的構造函數一次。而每一行記錄則對應一個map()方法,也就是說,一行記錄就會執行一次map()方法。因此,如果你有什么信息需要輸出(例如std::cout)的話,就要注意了:如果在map()方法中輸出,則當輸入數據量很大時,可能就會輸出太多的信息,如果可以在map的構造函數中輸出的話,則輸出的信息會少得多。
在reduce時,對map輸出的同一個key,有一個reduce類,也就是說,無論你的同一個key有多少個value,在reduce的時候只要是同一個key,就會出現在同一個reduce類里,在這個類里的reduce方法中,你用 while (context.nextValue()) 循環可以遍歷所有的value,這樣就可以處理同一個key的N個value了。
正因為在默認情況下,相同key的記錄會落到同一個reducer中,所以,當你的key的數量比你設置的reducer的數量要少的時候,就導致了某些reducer分配不到任何數據,最終輸出的某些文件(part-r-xxxxx)是空文件。如果你設置的reducer數量要少於key的數量(這是最常見的情況),那么就會有多個key落入同一個reducer中被處理,但是,每一次reduce()方法被調用時,其中將只包含一個key,同一個reducer里的多個key就會導致reduce()方法被多次調用。
文章來源:http://www.codelast.com/
這樣,我們就完成了一個完整的C++ hadoop分布式應用程序的編寫。