SparkStreaming 寫入數據到mysql


使用idea 編碼
package streaming
 
import java.sql.DriverManager
 
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
object SvaeToMysql {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("savetomysql")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
 
    val lines = ssc.socketTextStream("master", 9999)
    val wordcounts = lines.flatMap(x => x.split(" ")).map((_, 1)).reduceByKey(_ + _)
 
    wordcounts.foreachRDD(rdd => rdd.foreachPartition(line => {
      Class.forName("com.mysql.jdbc.Driver")
      //獲取mysql連接
      val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
      //把數據寫入mysql
      try {
        for (row <- line) {
          val sql = "insert into wordcount(titleName,count)values('" + row._1 + "','" + row._2 + "')"
          conn.prepareStatement(sql).executeUpdate()
        }
      } finally {
        conn.close()
      }
    }))
 
    ssc.start()
    ssc.awaitTermination()
 
  }
}

 

 
 
在這之前先創建數據庫;
create database test;
create table if not exists wordcount (titleName varchar(100) not null,count int not null);
mvn assembly:assembly 打包上傳到虛擬機上savetomysql.sh,執行sh savetomysql.sh
新開一個對話窗,nc -lk 9999
輸入單詞:
然后進入對應 的數據庫查看數據:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM