摘要
通過實現MapReduce計算結果保存到MySql數據庫過程,掌握多種方式保存計算結果的技術,加深了對MapReduce的理解;
Api 文檔地址:http://hadoop.apache.org/docs/current/api/index.html
maven資源庫:https://mvnrepository.com/repos/central ##用於配置pom.xml的時候查詢資源
1.master主機安裝mysql
參見文章:https://www.cnblogs.com/hemomo/p/11942661.html
創建maven項目,項目名稱hdfs,這里不再說明。
2.修改pom.xml文件
紅色部分為增加內容:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.scitc</groupId> <artifactId>hdfs</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>hdfs</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.7.5</hadoop.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> <scope>compile</scope> <optional>true</optional> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2. 自定義數據類型(WordCountTb)
Hadoop給封裝了許多輸入輸出的類型,如LongWritable、Text、 IntWritable、NullWritable等基礎類型,這些類型和Java的基本數據類型一樣,不能滿足實際的業務需求;因此,我們可以通關過自定義輸入輸出類型來實現。
在com.scitc.hdfs下新建WordCountTb.java類:
代碼如下:
package com.scitc.hdfs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class WordCountTb implements Writable, DBWritable {
//定義字段和構造函數
String name;
int value;
public WordCountTb(String name, int value) {
this.name = name;
this.value = value;
}
//獲取數據庫表的字段值
@Override
public void readFields(ResultSet resultSet) throws SQLException {
// TODO Auto-generated method stub
this.name = resultSet.getString(1);
this.value = resultSet.getInt(2);
}
@Override
public void write(PreparedStatement statement) throws SQLException {
// TODO Auto-generated method stub
statement.setString(1, this.name);
statement.setInt(2, this.value);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(name);
out.writeInt(value);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
name = in.readUTF();
value = in.readInt();
}
}
3.數據庫屬性類StaticConstant
普通類中定義常量://參考https://blog.csdn.net/rlnlo2pnefx9c/article/details/81277528
在com.scitc.hdfs下新建StaticConstant.java類
代碼如下:
package com.scitc.hdfs; public class StaticConstant { public static final String jdbcDriver = "com.mysql.jdbc.Driver"; public static final String jdbcUrl = "jdbc:mysql://192.168.56.110:3306/test?useUnicode=true&characterEncoding=utf8"; public static final String jdbcUser = "root"; public static final String jdbcPassword = "bigData@123"; }
3.編寫MapReduce類WordCountToDb
在com.scitc.hdfs下新建WordCountToDb.java類
代碼如下:
package com.scitc.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WordCountToDb {
static class Maps extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 將讀入的每行數據按空格切分
String[] dataArr = value.toString().split(" ");
if(dataArr.length>0){
// 將每個單詞作為map的key,value設置為1
for (String word : dataArr) {
context.write(new Text(word), one);}
}
}
}
static class Reduces extends Reducer<Text, IntWritable, WordCountTb, WordCountTb> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new WordCountTb(key.toString(), sum), null);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1:實例化Configuration類、配置數據庫類DBConfiguration、新建一個job任務
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, StaticConstant.jdbcDriver,
StaticConstant.jdbcUrl, StaticConstant.jdbcUser, StaticConstant.jdbcPassword);
Job job = Job.getInstance(conf, "word-count");
//2:設置jar加載的路徑
job.setJarByClass(WordCountToDb.class);
//3:設置Map類和reduce類
job.setMapperClass(Maps.class);
job.setReducerClass(Reduces.class);
//4:設置Map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5:設置reduce最終輸出kv類型
job.setOutputKeyClass(WordCountTb.class);
job.setOutputValueClass(WordCountTb.class);
//6:設置輸入路徑
String inputPath = "hdfs://master:9000/he/input/wordcount.txt";
// 如果有傳入文件地址,接收參數為輸入文件地址
if(args != null && args.length > 0){
inputPath = args[0];
}
FileInputFormat.addInputPath(job, new Path(inputPath));
//7:設置數據庫輸出格式、輸出到哪些表、字段
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "wordcount", "name", "value");
//本地提交沒問題,在集群提交會出現,Error: java.io.IOException: com.mysql.jdbc.Driver
job.addArchiveToClassPath(new Path("hdfs://master:9000/lib/mysql/mysql-connector-java-5.1.27.jar"));
//8:提交任務
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
4:本地運行程序
本地測試非常方便調試。省去排除錯誤的時候,來回打包在集群運行。
在WordCountToDb類的編輯界面上右擊鼠標,在彈出的菜單中選中Run As -> Java Application開始運行該類。
eclipse的console輸出如下:
打開數據庫wordcount表查看運行結果:
5:打包、上傳、在集群中運行
運行之前記得刪除掉mysql中表wordcount里之前本地運行生成的數據
1.打包
項目名hdfs上右鍵>>Run As>>Maven clean
項目名hdfs上右鍵>>Run As>>Maven install
2.上傳
項目根目錄下的target文件夾中找到hdfs-0.0.1-SNAPSHOT.jar,改文件名為hdfs1.jar,上傳到master的/opt/data/目錄中
3.用hadoop jar 命令運行hdfs1.jar包
cd /opt/data
hadoop jar hdfs1.jar com.scitc.hdfs. WordCountToDb
##命令語法:hadoop jar jar包 類的全名稱
查看結果:
在集群中運行,出現問題:Error: java.io.IOException: com.mysql.jdbc.Driver
解決方法1:
pom配置的插件maven-assembly-plugin
在mavne install之后有兩個jar包
一個hdfs-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含所有依賴
因此在集群運行這個jar包,也會正常執行。 ##測試通過
但是這樣jar包40多M,太大了。
解決方法2:(推薦)
把jar包傳到集群上,命令如下
hadoop fs –mkdir –p /lib/mysql ##創建目錄
hadoop fs -put mysql-connector-java-5.1.27.jar /lib/mysql ##上傳驅動到hdfs的lib/mysql目錄中
在WordCountToDb.java中提交任務代碼前。添加如下代碼:
job.addArchiveToClassPath(new Path("hdfs://master:9000/lib/mysql/mysql-connector-java-5.1.27.jar"));
//8:提交任務
boolean result = job.waitForCompletion(true);
查看結果:
查看集群執行結果:沒問題,輸出為0字節,因為我們是輸出到mysql的。
查看mysql數據庫:
============================
問題集:
問題1:集群中運行jar包,報錯:Error: java.io.IOException: com.mysql.jdbc.Driver
解決參考資料:https://www.cnblogs.com/codeOfLife/p/5464613.html
修改說明:
2020-02-22,增加WordCountToDb代碼