使用spark的 DataFrame 來操作mysql數據。
DataFrame是比RDD更高一個級別的抽象,可以應用SQL語句進行操作,詳細參考:
https://spark.apache.org/docs/latest/sql-programming-guide.html
這里暫時使用spark-shell進行操作,
1.首先,必須要先下載一個mysql的jdbc的驅動
可以從這里下載
2.然后呢,就好辦了。
#具體的啟動spark-shell的方法(帶上mysql的driver)
$~/spark-shell --driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar
#定義mysql的信息
val url="jdbc:mysql://10.181.176.226:3306/geo_info" val prop = new java.util.Properties prop.setProperty("user","geo") prop.setProperty("password","xxxxxx”)
#指定讀取條件,這里 Array("country_code='CN'") 是where過濾條件
val cnFlight = sqlContext.read.jdbc(url,"gps_location",Array("country_code='CN'"),prop)
#然后進行groupby 操作,獲取數據集合 val emailList = cnFlight.groupBy("gps_city", "user_mail”)
#計算數目,並根據數目進行降序排序 val sorted = emailList.count().orderBy( desc("count") ) #顯示前10條 sorted.show(10) #存儲到文件(這里會有很多分片文件。。。) sorted.rdd.saveAsTextFile("/home/qingpingzhang/data/flight_top”) #存儲到mysql表里
sorted.write.jdbc(url,"table_name",prop)
3.具體文件編寫代碼,然后提交worker也類似,主要是DataFrame的 sqlContext聲明會不一樣。
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
這里如果要用spark-submit,則會有坑,即便你是用sbt的assembly來打包的一個全的jar包:
參考:http://www.iteblog.com/archives/1300
[itelbog@iteblog ~]$ bin/spark-submit --master local[2] --driver-class-path lib/mysql-connector-java-5.1.35.jar --class spark.SparkToJDBC ./spark-test_2.10-1.0.jar