使用idea 編碼
package streaming import java.sql.DriverManager import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SvaeToMysql { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("savetomysql") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val lines = ssc.socketTextStream("master", 9999) val wordcounts = lines.flatMap(x => x.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcounts.foreachRDD(rdd => rdd.foreachPartition(line => { Class.forName("com.mysql.jdbc.Driver") //獲取mysql連接 val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456") //把數據寫入mysql try { for (row <- line) { val sql = "insert into wordcount(titleName,count)values('" + row._1 + "','" + row._2 + "')" conn.prepareStatement(sql).executeUpdate() } } finally { conn.close() } })) ssc.start() ssc.awaitTermination() } }
在這之前先創建數據庫;
create database test;
create table if not exists wordcount (titleName varchar(100) not null,count int not null);
mvn assembly:assembly 打包上傳到虛擬機上savetomysql.sh,執行sh savetomysql.sh
新開一個對話窗,nc -lk 9999
輸入單詞:

然后進入對應 的數據庫查看數據:
