在spark中操作mysql數據 ---- spark學習之七


使用spark的 DataFrame 來操作mysql數據。

DataFrame是比RDD更高一個級別的抽象,可以應用SQL語句進行操作,詳細參考:

https://spark.apache.org/docs/latest/sql-programming-guide.html

 

這里暫時使用spark-shell進行操作,

1.首先,必須要先下載一個mysql的jdbc的驅動

可以從這里下載

2.然后呢,就好辦了。

#具體的啟動spark-shell的方法(帶上mysql的driver)
$~/spark-shell --driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar
#定義mysql的信息
val url
="jdbc:mysql://10.181.176.226:3306/geo_info" val prop = new java.util.Properties prop.setProperty("user","geo") prop.setProperty("password","xxxxxx”)
#指定讀取條件,這里 Array("country_code='CN'") 是where過濾條件
val cnFlight = sqlContext.read.jdbc(url,"gps_location",Array("country_code='CN'"),prop)

#然后進行groupby 操作,獲取數據集合 val emailList
= cnFlight.groupBy("gps_city", "user_mail”)
#計算數目,並根據數目進行降序排序 val sorted = emailList.count().orderBy( desc("count") ) #顯示前10條 sorted.show(10) #存儲到文件(這里會有很多分片文件。。。) sorted.rdd.saveAsTextFile("/home/qingpingzhang/data/flight_top”) #存儲到mysql表里
sorted.
write.jdbc(url,"table_name",prop)

 

3.具體文件編寫代碼,然后提交worker也類似,主要是DataFrame的 sqlContext聲明會不一樣。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 這里如果要用spark-submit,則會有坑,即便你是用sbt的assembly來打包的一個全的jar包:

參考:http://www.iteblog.com/archives/1300

[itelbog@iteblog ~]$  bin/spark-submit --master local[2]     --driver-class-path lib/mysql-connector-java-5.1.35.jar    --class  spark.SparkToJDBC ./spark-test_2.10-1.0.jar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM