使用DBOutputFormat把MapReduce產生的結果集導入到mysql中


數據在HDFS和關系型數據庫之間的遷移,主要有以下兩種方式

  1、按照數據庫要求的文件格式生成文件,然后由數據庫提供的導入工具進行導入

  2、采用JDBC的方式進行導入

MapReduce默認提供了DBInputFormat和DBOutputFormat,分別用於數據庫的讀取和數據庫的寫入

 

1、需求

  下面使用DBOutputFormat,將MapReduce處理后的學生信息導入到mysql中

2、數據集

  張明明    45
  李成友    78
  張輝燦    56
  王露      56
  陳東明    67
  陳果      31
  李華明    32
  張明東    12
  李明國    34
  陳道亮    35
  陳家勇    78     
  陳旻昊    13
  陳潘      78
  陳學澄    18

3、實現

package com.buaa; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName DBOutputormatDemo * @PackageName com.buaa * @ClassName MysqlDBOutputormatDemo * @Description TODO * @Author 劉吉超 * @Date 2016-05-06 09:15:57 */ @SuppressWarnings({ "unused", "deprecation" }) public class MysqlDBOutputormatDemo extends Configured implements Tool { /** * 實現DBWritable * * TblsWritable需要向mysql中寫入數據 */  
    public static class TblsWritable implements Writable, DBWritable { String tbl_name; int tbl_age; public TblsWritable() { } public TblsWritable(String name, int age) { this.tbl_name = name; this.tbl_age = age; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setInt(2, this.tbl_age); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_age = resultSet.getInt(2); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeInt(this.tbl_age); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_age = in.readInt(); } public String toString() { return new String(this.tbl_name + " " + this.tbl_age); } } public static class StudentMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class StudentReducer extends Reducer<LongWritable, Text, TblsWritable, TblsWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException { // values只有一個值,因為key沒有相同的
            StringBuilder value = new StringBuilder(); for(Text text : values){ value.append(text); } String[] studentArr = value.toString().split("\t"); if(StringUtils.isNotBlank(studentArr[0])){ /* * 姓名 年齡(中間以tab分割) * 張明明 45 */ String name = studentArr[0].trim(); int age = 0; try{ age = Integer.parseInt(studentArr[1].trim()); }catch(NumberFormatException e){ } context.write(new TblsWritable(name, age), null); } } } public static void main(String[] args) throws Exception { // 數據輸入路徑和輸出路徑
        String[] args0 = { "hdfs://ljc:9000/buaa/student/student.txt" }; int ec = ToolRunner.run(new Configuration(), new MysqlDBOutputormatDemo(), args0); System.exit(ec); } @Override public int run(String[] arg0) throws Exception { // 讀取配置文件
        Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://172.26.168.2:3306/test", "hadoop", "123"); // 新建一個任務
        Job job = new Job(conf, "DBOutputormatDemo"); // 設置主類
        job.setJarByClass(MysqlDBOutputormatDemo.class); // 輸入路徑
        FileInputFormat.addInputPath(job, new Path(arg0[0])); // Mapper
        job.setMapperClass(StudentMapper.class); // Reducer
        job.setReducerClass(StudentReducer.class); // mapper輸出格式
        job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // 輸入格式,默認就是TextInputFormat // job.setInputFormatClass(TextInputFormat.class); // 輸出格式
        job.setOutputFormatClass(DBOutputFormat.class); // 輸出到哪些表、字段
        DBOutputFormat.setOutput(job, "student", "name", "age"); // 添加mysql數據庫jar // job.addArchiveToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar")); // DistributedCache.addFileToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar"), conf); //提交任務
        return job.waitForCompletion(true)?0:1; } }

  mr程序很簡單,只是讀取文件內容,在這里我們主要關注的是怎么將mr處理后的結果集導入mysql中的

  數據庫中表是student,為student表編寫對應的bean類TblsWritable,該類需要實現Writable接口和DBWritable接口。

  1、Writable接口

@Override  
public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeInt(this.tbl_age); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_age = in.readInt(); }

 上面兩個方法對應着Writable接口,用對象序列化,這里不再多說,前面文章有介紹

  2、DBWritable接口

@Override  
public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setInt(2, this.tbl_age); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_age = resultSet.getInt(2); }

  上面兩個方法對應着DBWriteable接口。readFields方法負責從結果集中讀取數據庫數據(注意ResultSet的下標是從1開始的),一次讀取查詢SQL中篩選的某一列。Write方法負責將數據寫入到數據庫,將每一行的每一列依次寫入。

  最后進行Job的一些配置,具體如下面代碼所示

 1 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://172.26.168.2:3306/test", "hadoop", "123") 

  上面的配置主要包括以下幾項:

    1、數據庫驅動的名稱:com.mysql.jdbc.Driver

    2、數據庫URL:jdbc:mysql://172.26.168.2:3306/test

    3、用戶名:hadoop

    4、密碼:123

  還有以下幾項需要配置

    1、數據庫表以及每列的名稱:DBOutputFormat.setOutput(job, "student", "name", "age");

    2、輸出格式改為:job.setOutputFormatClass(DBOutputFormat.class);

  需要提醒的是DBOutputFormat以MapReduce的方式運行,會並行的連接數據庫。在這里需要合適的設置map、reduce的個數,以便將並行連接的數量控制在合理的范圍之內

4、運行效果

  image

5、注意事項

 運行項目可能會報如下錯誤

   image

 解決方法:

   共有3種解決方法,但我喜歡第三種

   1、在每個節點下的${HADOOP_HOME}/lib下添加該包。重啟集群,一般是比較原始的方法。

   2、把jar包傳到集群上,命令如下

    1 hadoop fs -put mysql-connector-java-5.1.31.jar /lib/mysql 

   在mr程序提交job前,添加如下兩個語句中一個就行

    (1)DistributedCache.addFileToClassPath(new Path(“hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar”), conf);

    這條語句不推薦使用了,建議使用下面這條語句

    (2)job.addArchiveToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar"));

   注意:用這種方式,在本地運行,依然報“java.io.IOException: com.mysql.jdbc.Driver”,但放到hadoop運行環境就可以啦

   3、把依賴的jar打到項目中,然后配置MANIFEST.MF文件中Class-Path選項

   image

   具體配置,請參考“通過指定manifest.mf文件的打包

如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

實現代碼及數據:下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM