在日常工作中,有時候需要讀取mysql的數據作為DataFrame數據源進行后期的Spark處理,Spark自帶了一些方法供我們使用,讀取mysql我們可以直接使用表的結構信息,而不需要自己再去定義每個字段信息。
下面是我的實現方式。
1.mysql的信息:
mysql的信息我保存在了外部的配置文件,這樣方便后續的配置添加。
1 mysql的信息我保存在了外部的配置文件,這樣方便后續的配置添加。 2 //配置文件示例: 3 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 4 #mysql數據庫配置 5 mysql.driver=com.mysql.jdbc.Driver 6 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true 7 mysql.username=user 8 mysql.password=123456
2.需要的jar依賴
sbt版本,maven的對應修改即可
1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2" 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2" 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2" 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2" 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2" 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2" 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2" 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38" 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2" 10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"
3.完整實現代碼
1 import java.io.FileInputStream 2 import java.util.Properties 3 4 import org.apache.spark.sql.hive.HiveContext 5 import org.apache.spark.sql.{DataFrame, SQLContext} 6 import org.apache.spark.{SparkConf, SparkContext} 7 8 /** 9 * @author Administrator 10 * 2018/10/16-9:18 11 * 12 */ 13 object TestReadMysql { 14 var hdfsPath: String = "" 15 var proPath: String = "" 16 var DATE: String = "" 17 18 val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName) 19 val sc: SparkContext = new SparkContext(sparkConf) 20 val sqlContext: SQLContext = new HiveContext(sc) 21 22 def main(args: Array[String]): Unit = { 23 hdfsPath = args(0) 24 proPath = args(1) 25 //不過濾讀取 26 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath) 27 dim_sys_city_dict.show(10) 28 29 //過濾讀取 30 val dim_sys_city_dict1: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", s"city_id=240", proPath) 31 dim_sys_city_dict1.show(10) 32 } 33 34 /** 35 * 獲取 Mysql 表的數據 36 * 37 * @param sqlContext 38 * @param tableName 讀取Mysql表的名字 39 * @param proPath 配置文件的路徑 40 * @return 返回 Mysql 表的 DataFrame 41 */ 42 def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = { 43 val properties: Properties = getProPerties(proPath) 44 sqlContext 45 .read 46 .format("jdbc") 47 .option("url", properties.getProperty("mysql.url")) 48 .option("driver", properties.getProperty("mysql.driver")) 49 .option("user", properties.getProperty("mysql.username")) 50 .option("password", properties.getProperty("mysql.password")) 51 // .option("dbtable", tableName.toUpperCase) 52 .option("dbtable", tableName) 53 .load() 54 55 } 56 57 /** 58 * 獲取 Mysql 表的數據 添加過濾條件 59 * 60 * @param sqlContext 61 * @param table 讀取Mysql表的名字 62 * @param filterCondition 過濾條件 63 * @param proPath 配置文件的路徑 64 * @return 返回 Mysql 表的 DataFrame 65 */ 66 def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = { 67 val properties: Properties = getProPerties(proPath) 68 var tableName = "" 69 tableName = "(select * from " + table + " where " + filterCondition + " ) as t1" 70 sqlContext 71 .read 72 .format("jdbc") 73 .option("url", properties.getProperty("mysql.url")) 74 .option("driver", properties.getProperty("mysql.driver")) 75 .option("user", properties.getProperty("mysql.username")) 76 .option("password", properties.getProperty("mysql.password")) 77 .option("dbtable", tableName) 78 .load() 79 } 80 81 /** 82 * 獲取配置文件 83 * 84 * @param proPath 85 * @return 86 */ 87 def getProPerties(proPath: String) = { 88 val properties: Properties = new Properties() 89 properties.load(new FileInputStream(proPath)) 90 properties 91 } 92 93 94 }
4.測試
1 def main(args: Array[String]): Unit = { 2 hdfsPath = args(0) 3 proPath = args(1) 4 //不過濾讀取 5 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath) 6 dim_sys_city_dict.show(10) 7 8 //過濾讀取 9 val dim_sys_city_dict1: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", s"city_id=240", proPath) 10 dim_sys_city_dict1.show(10) 11 }
5.運行結果
數據因為保密原因進行了處理
1 // 不過濾讀取結果 2 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ 3 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name| 4 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ 5 | 1| 249| **| **_ab| 100| **按時| **-查到|xcaasd...| 21| 張三公司| 6 | 2| 240| **| **_ab| 300| **按時| **-查到|xcaasd...| 21| 張三公司| 7 | 3| 240| **| **_ab| 100| **按時| **-查到|xcaasd...| 21| 張三公司| 8 | 4| 242| **| **_ab| 300| **按時| **-查到|xcaasd...| 01| 張三公司| 9 | 5| 246| **| **_ab| 100| **按時| **-查到|xcaasd...| 01| 張三公司| 10 | 6| 246| **| **_ab| 300| **按時| **-查到|xcaasd...| 01| 張三公司| 11 | 7| 248| **| **_ab| 200| **按時| **-查到|xcaasd...| 01| 張三公司| 12 | 8| 242| **| **_ab| 400| **按時| **-查到|xcaasd...| 01| 張三公司| 13 | 9| 247| **| **_ab| 200| **按時| **-查到|xcaasd...| 01| 張三公司| 14 | 0| 243| **| **_ab| 400| **按時| **-查到|xcaasd...| 01| 張三公司| 15 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ 16 17 // 過濾讀取結果 18 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ 19 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name| 20 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ 21 | 2| 240| **| **_JM| 300| **按時| **-查到|xcaasd...| 21| 張三公司| 22 | 3| 240| **| **_ZS| 100| **按時| **-查到|xcaasd...| 21| 張三公司| 23 | 6| 240| **| **_JY| 400| **按時| **-查到|xcaasd...| 01| 張三公司| 24 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
6.總結
讀取mysql其實不難,就是一些參數的配置而已。
在此處記錄下。
本文章為工作日常總結,轉載請標明出處!!!!!!!