數據在HDFS和關系型數據庫之間的遷移,主要有以下兩種方式
1、按照數據庫要求的文件格式生成文件,然后由數據庫提供的導入工具進行導入
2、采用JDBC的方式進行導入
MapReduce默認提供了DBInputFormat和DBOutputFormat,分別用於數據庫的讀取和數據庫的寫入
1、需求
下面使用DBOutputFormat,將MapReduce處理后的學生信息導入到mysql中
2、數據集
張明明 45
李成友 78
張輝燦 56
王露 56
陳東明 67
陳果 31
李華明 32
張明東 12
李明國 34
陳道亮 35
陳家勇 78
陳旻昊 13
陳潘 78
陳學澄 18
3、實現
package com.buaa; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName DBOutputormatDemo * @PackageName com.buaa * @ClassName MysqlDBOutputormatDemo * @Description TODO * @Author 劉吉超 * @Date 2016-05-06 09:15:57 */ @SuppressWarnings({ "unused", "deprecation" }) public class MysqlDBOutputormatDemo extends Configured implements Tool { /** * 實現DBWritable * * TblsWritable需要向mysql中寫入數據 */
public static class TblsWritable implements Writable, DBWritable { String tbl_name; int tbl_age; public TblsWritable() { } public TblsWritable(String name, int age) { this.tbl_name = name; this.tbl_age = age; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setInt(2, this.tbl_age); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_age = resultSet.getInt(2); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeInt(this.tbl_age); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_age = in.readInt(); } public String toString() { return new String(this.tbl_name + " " + this.tbl_age); } } public static class StudentMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class StudentReducer extends Reducer<LongWritable, Text, TblsWritable, TblsWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException { // values只有一個值,因為key沒有相同的
StringBuilder value = new StringBuilder(); for(Text text : values){ value.append(text); } String[] studentArr = value.toString().split("\t"); if(StringUtils.isNotBlank(studentArr[0])){ /* * 姓名 年齡(中間以tab分割) * 張明明 45 */ String name = studentArr[0].trim(); int age = 0; try{ age = Integer.parseInt(studentArr[1].trim()); }catch(NumberFormatException e){ } context.write(new TblsWritable(name, age), null); } } } public static void main(String[] args) throws Exception { // 數據輸入路徑和輸出路徑
String[] args0 = { "hdfs://ljc:9000/buaa/student/student.txt" }; int ec = ToolRunner.run(new Configuration(), new MysqlDBOutputormatDemo(), args0); System.exit(ec); } @Override public int run(String[] arg0) throws Exception { // 讀取配置文件
Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://172.26.168.2:3306/test", "hadoop", "123"); // 新建一個任務
Job job = new Job(conf, "DBOutputormatDemo"); // 設置主類
job.setJarByClass(MysqlDBOutputormatDemo.class); // 輸入路徑
FileInputFormat.addInputPath(job, new Path(arg0[0])); // Mapper
job.setMapperClass(StudentMapper.class); // Reducer
job.setReducerClass(StudentReducer.class); // mapper輸出格式
job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // 輸入格式,默認就是TextInputFormat // job.setInputFormatClass(TextInputFormat.class); // 輸出格式
job.setOutputFormatClass(DBOutputFormat.class); // 輸出到哪些表、字段
DBOutputFormat.setOutput(job, "student", "name", "age"); // 添加mysql數據庫jar // job.addArchiveToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar")); // DistributedCache.addFileToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar"), conf); //提交任務
return job.waitForCompletion(true)?0:1; } }
mr程序很簡單,只是讀取文件內容,在這里我們主要關注的是怎么將mr處理后的結果集導入mysql中的
數據庫中表是student,為student表編寫對應的bean類TblsWritable,該類需要實現Writable接口和DBWritable接口。
1、Writable接口
@Override
public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeInt(this.tbl_age); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_age = in.readInt(); }
上面兩個方法對應着Writable接口,用對象序列化,這里不再多說,前面文章有介紹
2、DBWritable接口
@Override
public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setInt(2, this.tbl_age); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_age = resultSet.getInt(2); }
上面兩個方法對應着DBWriteable接口。readFields方法負責從結果集中讀取數據庫數據(注意ResultSet的下標是從1開始的),一次讀取查詢SQL中篩選的某一列。Write方法負責將數據寫入到數據庫,將每一行的每一列依次寫入。
最后進行Job的一些配置,具體如下面代碼所示
1 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://172.26.168.2:3306/test", "hadoop", "123")
上面的配置主要包括以下幾項:
1、數據庫驅動的名稱:com.mysql.jdbc.Driver
2、數據庫URL:jdbc:mysql://172.26.168.2:3306/test
3、用戶名:hadoop
4、密碼:123
還有以下幾項需要配置
1、數據庫表以及每列的名稱:DBOutputFormat.setOutput(job, "student", "name", "age");
2、輸出格式改為:job.setOutputFormatClass(DBOutputFormat.class);
需要提醒的是DBOutputFormat以MapReduce的方式運行,會並行的連接數據庫。在這里需要合適的設置map、reduce的個數,以便將並行連接的數量控制在合理的范圍之內
4、運行效果
5、注意事項
運行項目可能會報如下錯誤
解決方法:
共有3種解決方法,但我喜歡第三種
1、在每個節點下的${HADOOP_HOME}/lib下添加該包。重啟集群,一般是比較原始的方法。
2、把jar包傳到集群上,命令如下
1 hadoop fs -put mysql-connector-java-5.1.31.jar /lib/mysql
在mr程序提交job前,添加如下兩個語句中一個就行
(1)DistributedCache.addFileToClassPath(new Path(“hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar”), conf);
這條語句不推薦使用了,建議使用下面這條語句
(2)job.addArchiveToClassPath(new Path("hdfs://ljc:9000/lib/mysql/mysql-connector-java-5.1.31.jar"));
注意:用這種方式,在本地運行,依然報“java.io.IOException: com.mysql.jdbc.Driver”,但放到hadoop運行環境就可以啦
3、把依賴的jar打到項目中,然后配置MANIFEST.MF文件中Class-Path選項
具體配置,請參考“通過指定manifest.mf文件的打包”