Hadoop集群搭建及MapReduce應用


一、Hadoop集群的搭建與配置

1、節點准備

集群規划:

主機名 IP 安裝的軟件 運行的進程

weekend 01 192.168.1.60 jdk、hadoop NameNode、DFSZKFailoverController

weekend 02 192.168.1.61 jdk、hadoop NameNode、DFSZKFailoverController

weekend 03 192.168.1.62 jdk、hadoop ResourceManager

weekend 04 192.168.1.63 jdk、hadoop ResourceManager

weekend 05 192.168.1.64 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

weekend 06 192.168.1.65 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

weekend 07 192.168.1.66 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

2、Java環境安裝

此部分主要描述Java的安裝及環境配置。

上傳JDK軟件包及hadoop相關軟件包並解壓,如下:

[hadoop@weekend01 src]$ pwd

/usr/local/src

[hadoop@weekend01 src]$ ls

hadoop-2.6.4 jdk1.8.0_92 zookeeper-3.4.6

hbase-1.2.1 mongodb-linux-x86_64-rhel62-3.2.6 zookeeper-3.4.6.tar.gz

授權給hadoop用戶可以編輯/etc/profile文件,(此步驟非必要)

添加如下內容至/etc/profile文件中

export JAVA_HOME=/usr/local/src/jdk1.8.0_92

export HADOOP_HOME=/usr/local/src/hadoop-2.6.4

export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib$CLASSPATH

export HBASE_HOME=/usr/local/src/hbase-1.2.1

export ZK_HOME=/usr/local/src/zookeeper-3.4.6

export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$PATH:$ZK_HOME/bin

保存並退出並執行[hadoop@weekend01 ~]$ source /etc/profile(如不行,reboot就可以)然后驗證java環境,和hadoop命令,

[hadoop@weekend01 ~]$ java -version

java version "1.8.0_92"

Java(TM) SE Runtime Environment (build 1.8.0_92-b14)

Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

[hadoop@weekend01 ~]$ hadoop version

Hadoop 2.6.4

Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6

Compiled by jenkins on 2016-02-12T09:45Z

Compiled with protoc 2.5.0

From source with checksum 8dee2286ecdbbbc930a6c87b65cbc010

This command was run using /usr/local/src/hadoop-2.6.4/share/hadoop/common/hadoop-common-2.6.4.jar

到此就成功配置了JDK和hadoop的環境

3、SSH配置

hadoop控制腳本依賴ssh來執行針對整個集群的操作,因此為了支持無縫工作,此部分主要描述SSH的配置。

[hadoop@weekend01 ~]$ ssh-keygen

下面直接回車就可以

然后將公鑰分別考別到自己和其他6個節點

[hadoop@weekend01 ~]$ ssh-copy-id weekend01

測試

[hadoop@weekend01 ~]$ ssh weekend01

Last login: Tue May 31 16:30:47 2016 from slave2

[hadoop@weekend01 ~]$ exit

logout

Connection to weekend01 closed.

[hadoop@weekend01 ~]$ ssh weekend02

Last login: Tue Nov 8 11:33:42 2016 from master

[hadoop@weekwnd02 ~]$ exit

logout

Connection to weekend02 closed.

4、Hadoop配置

此部分主要描述對Hadoop各個配置文件的修改。

[hadoop@weekend01 hadoop]$ cat hdfs-site.xml

<configuration>

<property>

<name>dfs.namenode.dir</name>

<value>/weekend01/namenomde/dir</value>

</property>

<property>

<name>dfs.blocksize</name>

<value>268435456</value>

</property>

<property>

<name>dfs.namenode.handler.count</name>

<value>100</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>/weekend01.datanode/dir</value>

</property>

</configuration>

[hadoop@weekend01 hadoop]$ cat -n hadoop-env.sh

26 export JAVA_HOME=/usr/local/src/jdk1.8.0_92

[hadoop@weekend01 hadoop]$ cat core-site.xml

<configuration>

<!-- 指定hdfs的nameservice為weekend01 -->

<property>

<name>fs.defaultFS</name>

<value>hdfs://192.168.1.60:9000</value>

</property>

<property>

<name>io.file.buffer.size</name>

<value>131072</value>

</property>

<!-- 指定hadoop臨時目錄 -->

<!--<property>

<name>hadoop.tmp.dir</name>

<value>/home/hadoop/app/hadoop-2.4.1/tmp</value>

</property>

-->

<!-- 指定zookeeper地址 -->

<!--<property>

<name>ha.zookeeper.quorum</name>

<value>weekend05:2181,weekend06:2181,weekend07:2181</value>

</property>-->

</configuration>

[hadoop@weekend01 hadoop]$ cat mapred-site.xml

<configuration>

<!-- 指定mr框架為yarn方式 -->

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>`

</property>

</configuration>

[hadoop@weekend01 hadoop]$ cat slaves

weekend05

weekend06

weekend07

[hadoop@weekend01 hadoop]$ cat yarn-site.xml

<configuration>

<!-- Site specific YARN configuration properties -->

<property>

<name>yarn.acl.enable</name>

<value>false</value>

</property>

<property>

<name>yarn.admin.acl</name>

<value>*</value>

</property>

<property>

<name>yarn.log-aggreationanable</name>

<value>false</value>

</property>

<property>

<name>yarn.resourcemanager.address</name>

<value>192.168.1.60:9000</value>

</property>

<property>

<name>yarn.resourcemanager.scheduler.address</name>

<value>192.168.1.60:9000</value>

</property>

<property>

<name>yarn.resourcemanager.resourcetracker.address</name>

<value>192.168.1.60:9000</value>

</property>

<property>

<name>yarn.resourcemanager.admin.address</name>

<value>192.168.1.60:9000</value>

</property>

<property>

<name>yarn.resourcemanager.webapp.address</name>

<value>192.168.1.60:9000</value>

</property>

<property>

<name>yarn.resourcemanager.hostname</name>

<value>weekend01</value>

</property>

<property>

<name>yarn.resourcemanager.scheduler,class</name>

<value>CapacityScheduler</value>

</property>

<property>

<name>yarn.</name>

<value></value>

</property>

<property>

<name>yarn.resourcemanager.</name>

<value></value>

</property>

<property>

<name>yarn.resourcemanager.</name>

<value></value>

</property>

<property>

<name>yarn.resourcemanager.</name>

<value></value>

</property>

<property>

<name>yarn.resourcemanager.</name>

<value></value>

</property>

</configuration>

5、Hadoop測試

此部分主要對Hadoop進行測試。

格式化HDFS文件系統,在weekend01上

[hadoop@weekend01 ~]$ hdfs namenode -format

…..

16/11/08 11:00:37 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted #這句話是格式化成功的標志

…….

然后在weekend05、weekend06、weekend07上分別啟動zookeeper集群和journalnode進程

[hadoop@weekend05 ~]$ zkServer.sh start

JMX enabled by default

Using config: /usr/local/src/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[hadoop@weekend05 ~]$ hadoop-daemon.sh start journalnode

starting journalnode, logging to /usr/local/src/hadoop-2.6.4/logs/hadoop-hadoop-journalnode-weekend05.out

[hadoop@weekend05 ~]$ jps #看到這三個進程標志着啟動成功

4833 JournalNode

4881 Jps

4779 QuorumPeerMain

然后在weekend01上格式化ZKFC

[hadoop@weekend01 ~]$ hdfs zkfc -formatZK

[hadoop@weekend01 ~]$ jps

4420 SecondaryNameNode

5148 Jps

4141 NameNode

[hadoop@weekend03 ~]$ start-yarn.sh

[hadoop@weekend03 ~]$ jps

4837 ResourceManager

4902 Jps

[hadoop@weekend01 ~]$ start-dfs.sh

[hadoop@weekend05 ~]$ jps

4833 JournalNode

4948 DataNode

5530 Jps

4779 QuorumPeerMain

5403 NodeManager

[hadoop@weekend06 ~]$ jps

2464 QuorumPeerMain

5448 NodeManager

2520 JournalNode

4953 DataNode

5581 Jps

[hadoop@weekend07 ~]$ jps

4864 QuorumPeerMain

5713 Jps

5581 NodeManager

5086 DataNode

4974 JournalNode

image

二、MapReduce應用

1、應用描述

使用hadoop進行數據統計,並做去重處理,該實驗由於采用高可用避免了集群的單點故障,可以有效避免由於namenode單點故障引起的集群崩潰

2、數據准備

image

呼出終端,輸入下面指令:

bin/hadoop fs -mkdir hdfsInput

執行這個命令時可能會提示類似安全的問題,如果提示了,請使用

bin/hadoop dfsadmin -safemode leave

來退出安全模式。

意思是在HDFS遠程創建一個輸入目錄,我們以后的文件需要上載到這個目錄里面才能執行。

在終端依次輸入下面指令:

cd hadoop-1.2.1

bin/hadoop fs -put file/myTest*.txt hdfsInput

image

3、設計思路

1)將文件拆分成splits,由於測試用的文件較小,所以每個文件為一個split,並將文件按行分割形成<key,value>對,key為偏移量(包括了回車符),value為文本行。這一步由MapReduce框架自動完成,如下圖:

image

2)將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對,如下圖所示:

image

3)得到map方法輸出的<key,value>對后,Mapper會將它們按照key值進行排序,並執行Combine過程,將key值相同的value值累加,得到Mapper的最終輸出結果。如下圖:

image

4)Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的<key,value>對,並作為WordCount的輸出結果,如下圖:

image

4、程序代碼

package com.felix;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

/**

*

* 描述:WordCount explains by Felix

* @author Hadoop Dev Group

*/

public class WordCount

{

/**

* MapReduceBase類:實現了Mapper和Reducer接口的基類(其中的方法只是實現接口,而未作任何事情)

* Mapper接口:

* WritableComparable接口:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此接口。

* Reporter 則可用於報告整個應用的運行進度,本例中未使用。

*

*/

public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable>

{

/**

* LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口,

* 都能夠被串行化從而便於在分布式環境中進行數據交換,你可以將它們分別視為long,int,String 的替代品。

*/

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

/**

* Mapper接口中的map方法:

* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)

* 映射一個單個的輸入k/v對到一個中間的k/v對

* 輸出對不需要和輸入對是相同的類型,輸入對可以映射到0個或多個輸出對。

* OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對。

* OutputCollector接口的collect(k, v)方法:增加一個(k,v)對到output

*/

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException

{

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens())

{

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

public static class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException

{

int sum = 0;

while (values.hasNext())

{

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception

{

/**

* JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工作

* 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等

*/

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount"); //設置一個用戶定義的job名稱

conf.setOutputKeyClass(Text.class); //為job的輸出數據設置Key類

conf.setOutputValueClass(IntWritable.class); //為job輸出設置value類

conf.setMapperClass(Map.class); //為job設置Mapper類

conf.setCombinerClass(Reduce.class); //為job設置Combiner類

conf.setReducerClass(Reduce.class); //為job設置Reduce類

conf.setInputFormat(TextInputFormat.class); //為map-reduce任務設置InputFormat實現類

conf.setOutputFormat(TextOutputFormat.class); //為map-reduce任務設置OutputFormat實現類

/**

* InputFormat描述map-reduce中對job的輸入定義

* setInputPaths():為map-reduce job設置路徑數組作為輸入列表

* setInputPath():為map-reduce job設置路徑數組作為輸出列表

*/

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf); //運行一個job

}

}

package com.hadoop.sample;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

//繼承mapper接口,設置map的輸入類型為<Object,Text>

//輸出類型為<Text,IntWritable>

public static class Map extends Mapper<Object,Text,Text,IntWritable>{

//one表示單詞出現一次

private static IntWritable one = new IntWritable(1);

//word存儲切下的單詞

private Text word = new Text();

public void map(Object key,Text value,Context context) throws IOException,InterruptedException{

//對輸入的行切詞

StringTokenizer st = new StringTokenizer(value.toString());

while(st.hasMoreTokens()){

word.set(st.nextToken());//切下的單詞存入word

context.write(word, one);

}

}

}

//繼承reducer接口,設置reduce的輸入類型<Text,IntWritable>

//輸出類型為<Text,IntWritable>

public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{

//result記錄單詞的頻數

private static IntWritable result = new IntWritable();

public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{

int sum = 0;

//對獲取的<key,value-list>計算value的和

for(IntWritable val:values){

sum += val.get();

}

//將頻數設置到result

result.set(sum);

//收集結果

context.write(key, result);

}

}

/**

* @param args

*/

public static void main(String[] args) throws Exception{

// TODO Auto-generated method stub

Configuration conf = new Configuration();

//檢查運行命令

String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length != 2){

System.err.println("Usage WordCount <int> <out>");

System.exit(2);

}

//配置作業名

Job job = new Job(conf,"word count");

//配置作業各個類

job.setJarByClass(WordCount.class);

job.setMapperClass(Map.class);

job.setCombinerClass(Reduce.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

5、運行結果用戶登錄

在終端輸入下面指令:

bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput

注意,這里的示例程序是1.2.1版本的,可能每個機器有所不一致,那么請用*通配符代替版本號

bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput

應該出現下面結果:

image

Hadoop命令會啟動一個JVM來運行這個MapReduce程序,並自動獲得Hadoop的配置,同時把類的路徑(及其依賴關系)加入到Hadoop的庫中。以上就是Hadoop Job的運行記錄,從這里可以看到,這個Job被賦予了一個ID號:job_201202292213_0002,而且得知輸入文件有兩個(Total input paths to process : 2),同時還可以了解map的輸入輸出記錄(record數及字節數),以及reduce輸入輸出記錄。

查看HDFS上hdfsOutput目錄內容:

在終端輸入下面指令:

bin/hadoop fs -ls hdfsOutput

image

從上圖中知道生成了三個文件,我們的結果在"part-r-00000"中。

使用下面指令查看結果輸出文件內容

bin/hadoop fs -cat output/part-r-00000

image

注:文中部分圖引自其他博文,此文僅作為實驗筆記!如有侵犯請郵箱至1255560195@qq.com,未經允許不得轉載!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM