043 hive數據同步到mysql


一:意義

1.意義

  如果可以實現這個功能,就可以使用spark代替sqoop,功能程序就實現這個功能。

 

二:hive操作

1.准備數據

  啟動hive

  

  否則報錯,因為在hive與spark集成的時候,配置過配置項。

  

  后來,又看見這個文檔,感覺很好的解釋了我存在的問題:https://blog.csdn.net/freedomboy319/article/details/44828337

 

2.新建部門員工表

  

 1 -》創建員工表
 2 create table emp(
 3 empno int,
 4 ename string,
 5 job string,
 6 mgr int,
 7 hiredate string,
 8 sal double,
 9 comm double,
10 deptno int
11 )
12 row format delimited fields terminated by '\t';
13 load data local inpath '/opt/datas/emp.txt' into table emp;
14 
15 
16 -》部門表
17 create table dept(
18 deptno int,
19 dname string,
20 loc string
21 )
22 row format delimited fields terminated by '\t';
23 load data local inpath '/opt/datas/dept.txt' into table dept;

 

3.效果

  

 

三:程序

1.大綱

  

 

2.前提

  需要hive-site.xml

 

3.需要的依賴

 1         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
 2         <dependency>
 3             <groupId>org.apache.spark</groupId>
 4             <artifactId>spark-hive_2.10</artifactId>
 5             <version>${spark.version}</version>
 6             <scope>provided</scope>
 7         </dependency>
 8 
 9         <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
10         <dependency>
11             <groupId>mysql</groupId>
12             <artifactId>mysql-connector-java</artifactId>
13             <version>6.0.4</version>
14         </dependency>

 

4.報錯如下

 1 Exception in thread "main" java.sql.SQLNonTransientConnectionException: CLIENT_PLUGIN_AUTH is required
 2     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:550)
 3     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:537)
 4     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:527)
 5     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:512)
 6     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:480)
 7     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:498)
 8     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:494)
 9     at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72)
10     at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1634)
11     at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:637)
12     at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:351)
13     at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:224)
14     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:61)
15     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52)
16     at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
17     at com.scala.it.HiveToMysql$.main(HiveToMysql.scala:28)
18     at com.scala.it.HiveToMysql.main(HiveToMysql.scala)
19 Caused by: com.mysql.cj.core.exceptions.UnableToConnectException: CLIENT_PLUGIN_AUTH is required
20     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
21     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
22     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
23     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
24     at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54)
25     at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73)
26     at com.mysql.cj.mysqla.io.MysqlaProtocol.rejectConnection(MysqlaProtocol.java:319)
27     at com.mysql.cj.mysqla.authentication.MysqlaAuthenticationProvider.connect(MysqlaAuthenticationProvider.java:207)
28     at com.mysql.cj.mysqla.io.MysqlaProtocol.connect(MysqlaProtocol.java:1361)
29     at com.mysql.cj.mysqla.MysqlaSession.connect(MysqlaSession.java:132)
30     at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1754)
31     at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1624)
32     ... 8 more

原因:

  mysql-connect版本不匹配,換5.1.17版本。

 

5.程序

 1 package com.scala.it
 2 
 3 import java.util.Properties
 4 
 5 import org.apache.spark.sql.SaveMode
 6 import org.apache.spark.sql.hive.HiveContext
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 
 9 object HiveToMysql {
10   def main(args: Array[String]): Unit = {
11     val conf = new SparkConf()
12       .setMaster("local[*]")
13       .setAppName("hive-yo-mysql")
14     val sc = SparkContext.getOrCreate(conf)
15     val sqlContext = new HiveContext(sc)
16     val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456")
17     val props = new Properties()
18     props.put("user", username)
19     props.put("password", password)
20 
21     // ==================================
22     // 第一步:同步hive的dept表到mysql中
23     sqlContext
24       .read
25       .table("hadoop09.dept") // database.tablename
26       .write
27       .mode(SaveMode.Overwrite) // 存在覆蓋
28       .jdbc(url, "mysql_dept", props)
29   }
30 }

 

6.效果

  

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM