MapReduce 程序mysql JDBC驅動類找不到原因及學習hadoop寫入數據到Mysql數據庫的方法


報錯 :ClassNotFoundException: com.mysql.jdbc.Driver

需求描述:
hadoop需要動態加載個三方jar包(比如mysql JDBC 驅動 包),是在MR結束后,使用hadoop 的filesystem系統讀取HDFS文件,調用JDBC驅動類插入數據庫,但是運行時報錯找不到驅動類。

第一個方法:加到HADOOP_HOME/lib下不可行,集群需要重啟(集群再用,隊列有任務進行中)。
第二個方法:job2.addFileToClassPath(file)和DistributedCache.addFileToClassPath()
以及利用hadoop jar xx.jar -libjars $yourpath/mysql-connector-java-5.0.3-bin.jar <other args.....> 這種原理上是一樣的,
這種方式加入了Mapreduce的classpath( 最后是加載到了每個節點 map container 或reduce container的JVM的classpath中),MR結束后通過filesystem調用JDBC,是不可行的,我的代碼在Diver類中調用,即客戶端拿不到這個類。
第三個方法:所有依賴三方包和MR程序打包到一個jar中,打包完接近180MB。jar包太大了。
第四個方法:依賴的第三方jar打包到lib中,本次任務不需要的jar包可以全刪掉 (這里注意:工程的jar和集群jar版本不一致可能導致沖突)。可以運行,導出的lib也小。
 
總結一下:
1.報錯Diver類里缺少三方包,扔到lib下(不是鼠標拖進lib目錄下,是打包打進去)
2.報錯Map或Reduce缺少三方包,分發到集群即可;可以用上面第二個方法中的2個方法 (job.addFileToClassPath()這個方法好像只有hadoop 2.7版本以上才有)或者直接用-libjars $yourpath/your-jar.jar (一定要寫在其他參數前面)把三方包分發到集群
 
other:
后來在網上搜了一下,發現可以直接使用hadoop的DBOutputFormat 和DBInputFormat類來直接對數據庫進行操作。
Diver類中的設置項:
     conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY,"com.mysql.jdbc.Driver");  
        conf.set(DBConfiguration.URL_PROPERTY,  
                        "jdbc:mysql://x.x.x.x:3306/test");  
        conf.set(DBConfiguration.USERNAME_PROPERTY, "xxx");  
        conf.set(DBConfiguration.PASSWORD_PROPERTY, "xxxx"); 

        //這三行代碼是本機測試用的
//        conf.set(DBConfiguration.URL_PROPERTY,  
//        "jdbc:mysql://localhost:3306/test"); 
//        conf.set(DBConfiguration.PASSWORD_PROPERTY, "xxx"); 
        Job job3 = Job.getInstance(conf,"step3:insertToMySqlDB");
        job3.setJarByClass(ECMerchantMapReduceV2.class);
        job3.setMapOutputKeyClass(DBOutputKey.class);
        job3.setMapOutputValueClass(NullWritable.class);
        job3.setMapperClass(ECMerchantThirdMap.class);
        job3.setOutputFormatClass(DBOutputFormat.class);
        job3.setNumReduceTasks(0);
        job3.addFileToClassPath(new Path("/xxxx/mysql-connector-java-5.0.3-bin.jar"));
        DBOutputFormat.setOutput(job3, "yourtablename", "column1","column2","column3","column4");
        FileInputFormat.addInputPath(job3, new Path(otherArgs[3]));
        if(!job3.waitForCompletion(true)) return ;
public static class DBOutputKey implements WritableComparable<DBOutputKey>,DBWritable {

        private String city = "";
        private String ec="";
        private String account="";
        private Date date2=null;    
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        public DBOutputKey() {
            
        }

        public DBOutputKey(String city, String ec, String account, Date date2) {
            super();
            this.city = city;
            this.ec = ec;
            this.account = account;
            this.date2 = date2;
        }

        public void set(String city, String ec, String account, Date date2) {
            this.city = city;
            this.ec = ec;
            this.account = account;
            this.date2 = date2;
        }


        @Override
        public void readFields(DataInput in) throws IOException {
            this.city = in.readUTF();
            this.ec = in.readUTF();
            this.account = in.readUTF();
            try {
                this.date2 = new Date(format.parse(in.readUTF()).getTime());
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(city);
            out.writeUTF(ec);
            out.writeUTF(account);
            out.writeUTF(date2.toString());
        }

        @Override
        public int compareTo(DBOutputKey other) {
            if (this.city.compareTo(other.city) != 0) {
                return this.city.compareTo(other.city);
            } else if (!this.ec .equals(other.ec) ) {
                return ec .compareTo(other.ec);
            } else if (!this.account.equals(other.account)) {
                return account.compareTo(other.account);
            }else {
                return 0;
            }
        }

        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this);
        }
//下面這2個方法是DBWritable接口的方法
        @Override
        public void write(PreparedStatement statement) throws SQLException {
            statement.setString(1, city);
            statement.setString(2, ec);
            statement.setString(3, account);
            statement.setDate(4, date2);
        }

        @Override
        public void readFields(ResultSet resultSet) throws SQLException {
            this.city=resultSet.getString(1);
            this.ec=resultSet.getString(2);
            this.account=resultSet.getString(3);
            this.date2=resultSet.getDate(4);
        }
    }
注意:
1.DBOutputFormat寫入數據庫,是按照key輸出的,沒有value,key要實現WritableComparable和DBWritable接口;

2.數據庫表字段有個為Sql.date類型private Date date2=null,readFields(DataInput in)和write(DataOutput out)輸入輸出流不支持Sql.Date類型,通過文中紅色代碼部分轉換格式,即可使Hadoop序列化的時候支持sql.Date數據類型。
  
    public static class ECMerchantThirdMap extends Mapper<LongWritable, Text, DBOutputKey, NullWritable> {
        DBOutputKey dbKey=new DBOutputKey();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String date;
        @Override
        protected void setup(Mapper<LongWritable, Text, DBOutputKey, NullWritable>.Context context)
                throws IOException, InterruptedException {
            date=context.getConfiguration().get("eradiusDate");
        }
        public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
            String[] strs = ivalue.toString().split("\\s+");
            if(strs.length!=3) return;
            
            Date date2=null;
            try {
                date2 = new Date(format.parse(date).getTime());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            dbKey.set(strs[0], strs[1], strs[2], date2);
            context.write(dbKey, NullWritable.get());
        }

    }

Map類直接輸出key就可以,不需要reduce類,可以設置reduce數量為0;

在Diver類中 job3.addFileToClassPath( new Path("/xxxx/mysql-connector-java-5.0.3-bin.jar" ));
測試后發現JDBC驅動包在ECMerchantThirdMap這個類寫數據到Mysql數據庫時是可以調用的,
而通過 job3.addFileToClassPath()方法添加JDBC驅動包,在客戶端是無法調用的。
所以該方法是添加jar包到運行Map或Reduce的節點的Classpath中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM