MapReduce編程實戰(2)-詞頻統計結果存入mysql數據庫


摘要

通過實現MapReduce計算結果保存到MySql數據庫過程,掌握多種方式保存計算結果的技術,加深了對MapReduce的理解;

Api 文檔地址:http://hadoop.apache.org/docs/current/api/index.html

maven資源庫:https://mvnrepository.com/repos/central     ##用於配置pom.xml的時候查詢資源

 

1.master主機安裝mysql

參見文章:https://www.cnblogs.com/hemomo/p/11942661.html

創建maven項目,項目名稱hdfs,這里不再說明。

2.修改pom.xml文件

紅色部分為增加內容:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.scitc</groupId>
  <artifactId>hdfs</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>hdfs</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.7.5</hadoop.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>${hadoop.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
     
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
    </dependency> 
    
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
      <scope>compile</scope>
      <optional>true</optional>
    </dependency>
    
    <dependency>  
      <groupId>jdk.tools</groupId>  
      <artifactId>jdk.tools</artifactId>  
      <version>1.8</version>  
      <scope>system</scope>  
      <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>  
   </dependency>
   
  </dependencies>
  
  <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
            <source>1.8</source>
            <target>1.8</target>
            </configuration>
    </plugin>
    
    <plugin>
             <artifactId>maven-assembly-plugin</artifactId>
             <configuration>
                 <descriptorRefs>
                     <descriptorRef>jar-with-dependencies</descriptorRef>
                 </descriptorRefs>
                 <archive>
                     <manifest>
                         <mainClass></mainClass>
                     </manifest>
                 </archive>
             </configuration>
             <executions>
                 <execution>
                     <id>make-assembly</id>
                     <phase>package</phase>
                     <goals>
                         <goal>single</goal>
                     </goals>
                 </execution>
             </executions>
         </plugin>
    
    </plugins>
</build>
  
</project>

2. 自定義數據類型(WordCountTb)

Hadoop給封裝了許多輸入輸出的類型,如LongWritable、Text、 IntWritable、NullWritable等基礎類型,這些類型和Java的基本數據類型一樣,不能滿足實際的業務需求;因此,我們可以通關過自定義輸入輸出類型來實現。

com.scitc.hdfs下新建WordCountTb.java類:

 

 

 代碼如下:

package com.scitc.hdfs;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class WordCountTb implements Writable, DBWritable {  
    
//定義字段和構造函數
String name; int value; public WordCountTb(String name, int value) { this.name = name; this.value = value; }

//獲取數據庫表的字段值 @Override public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub this.name = resultSet.getString(1); this.value = resultSet.getInt(2); } @Override public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, this.name); statement.setInt(2, this.value); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(name); out.writeInt(value); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub name = in.readUTF(); value = in.readInt(); } }

3.數據庫屬性類StaticConstant

普通類中定義常量://參考https://blog.csdn.net/rlnlo2pnefx9c/article/details/81277528 

com.scitc.hdfs下新建StaticConstant.java類

代碼如下:

package com.scitc.hdfs;

public class StaticConstant {
    public static final String jdbcDriver = "com.mysql.jdbc.Driver";
    public static final String jdbcUrl = "jdbc:mysql://192.168.56.110:3306/test?useUnicode=true&characterEncoding=utf8";
    public static final String jdbcUser = "root";
    public static final String jdbcPassword = "bigData@123";
}

3.編寫MapReduce類WordCountToDb

com.scitc.hdfs下新建WordCountToDb.java類

 代碼如下:

package com.scitc.hdfs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class WordCountToDb {

static class Maps extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 將讀入的每行數據按空格切分
String[] dataArr = value.toString().split(" ");
if(dataArr.length>0){
// 將每個單詞作為map的key,value設置為1
for (String word : dataArr) {
context.write(new Text(word), one);}
}
}
}

static class Reduces extends Reducer<Text, IntWritable, WordCountTb, WordCountTb> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new WordCountTb(key.toString(), sum), null);
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1:實例化Configuration類、配置數據庫類DBConfiguration、新建一個job任務
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, StaticConstant.jdbcDriver,
StaticConstant.jdbcUrl, StaticConstant.jdbcUser, StaticConstant.jdbcPassword);
Job job = Job.getInstance(conf, "word-count");

//2:設置jar加載的路徑
job.setJarByClass(WordCountToDb.class);

//3:設置Map類和reduce類
job.setMapperClass(Maps.class);
job.setReducerClass(Reduces.class);

//4:設置Map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//5:設置reduce最終輸出kv類型
job.setOutputKeyClass(WordCountTb.class);
job.setOutputValueClass(WordCountTb.class);

//6:設置輸入路徑
String inputPath = "hdfs://master:9000/he/input/wordcount.txt";
// 如果有傳入文件地址,接收參數為輸入文件地址
if(args != null && args.length > 0){
inputPath = args[0];
}
FileInputFormat.addInputPath(job, new Path(inputPath));

//7:設置數據庫輸出格式、輸出到哪些表、字段
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "wordcount", "name", "value");

//本地提交沒問題,在集群提交會出現,Error: java.io.IOException: com.mysql.jdbc.Driver
job.addArchiveToClassPath(new Path("hdfs://master:9000/lib/mysql/mysql-connector-java-5.1.27.jar"));

//8:提交任務
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}

4:本地運行程序

本地測試非常方便調試。省去排除錯誤的時候,來回打包在集群運行。

在WordCountToDb類的編輯界面上右擊鼠標,在彈出的菜單中選中Run As -> Java Application開始運行該類。

eclipse的console輸出如下:

 

打開數據庫wordcount表查看運行結果:

5:打包、上傳、在集群中運行

運行之前記得刪除掉mysql中表wordcount里之前本地運行生成的數據

1.打包

項目名hdfs上右鍵>>Run As>>Maven clean

項目名hdfs上右鍵>>Run As>>Maven install

2.上傳

項目根目錄下的target文件夾中找到hdfs-0.0.1-SNAPSHOT.jar,改文件名為hdfs1.jar,上傳到master的/opt/data/目錄中

3.用hadoop jar 命令運行hdfs1.jar包

cd /opt/data

hadoop jar hdfs1.jar com.scitc.hdfs. WordCountToDb 

##命令語法:hadoop jar  jar包 類的全名稱

 

查看結果:

在集群中運行,出現問題:Error: java.io.IOException: com.mysql.jdbc.Driver

 

解決方法1:

pom配置的插件maven-assembly-plugin

在mavne install之后有兩個jar包

一個hdfs-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含所有依賴

因此在集群運行這個jar包,也會正常執行。  ##測試通過

但是這樣jar包40多M,太大了。

 

解決方法2:(推薦)

把jar包傳到集群上,命令如下

hadoop fs –mkdir –p /lib/mysql     ##創建目錄

hadoop fs -put mysql-connector-java-5.1.27.jar /lib/mysql        ##上傳驅動到hdfs的lib/mysql目錄中

在WordCountToDb.java中提交任務代碼前。添加如下代碼:

job.addArchiveToClassPath(new Path("hdfs://master:9000/lib/mysql/mysql-connector-java-5.1.27.jar"));

//8:提交任務
boolean result = job.waitForCompletion(true);

查看結果:

查看集群執行結果:沒問題,輸出為0字節,因為我們是輸出到mysql的。

 查看mysql數據庫:

 

 

============================

問題集:

問題1:集群中運行jar包,報錯:Error: java.io.IOException: com.mysql.jdbc.Driver

解決參考資料:https://www.cnblogs.com/codeOfLife/p/5464613.html

 

修改說明:

2020-02-22,增加WordCountToDb代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM