streaming+kafka 數據來源mysql數據庫


package spark

import java.util.Properties

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{ SparkContext, SparkConf }
import spark.bean.orders

object SelectFromOneTable {
  def main(args: Array[String]) {
    val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1")
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("producer.type", "async")

    val producer = new KafkaProducer[String, String](props)

    val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local")
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)
    val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123";
    val prop = new Properties();
    val df = sqlContext.read.jdbc(url, "flow", prop).collect()

    for (a <- df) {
      println(a)
      val message = new ProducerRecord[String, String](topic, null, a.toString())
      producer.send(message)
    }
  }
}  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM