Spark Streaming 讀取 Kafka 中數據


一、什么是 Spark Streaming

  1、SparkStreaming 是 Spark核心API 的擴展。可實現可伸縮、高吞吐、容錯機制的實時流處理。

   如圖,數據可從 Kafka、Flume、HDFS 等多種數據源獲得,最后將數據推送到 HDFS、數據庫 或者 Dashboards 上面。

        

 

 

    2、SparkStreaming 接收到實時的數據,然后按照時間段將實時數據分成多個批次,經過Spark處理引擎的數據處理,最后按照批次輸出。

           

 

 

   3、SparkStreaming 提供了一個高抽象的離散流或者叫做 DStream,它相當於連續的數據流。

    外部數據不斷的涌入,數據按照自定義的時間將數據進行切片,每個時間段內的數據是連續的,時間段與時間段之間的關系史相互獨立的。這就是 離散流

      DStreamRDD 的序列化,DStream 可以看作一組 RDD 的集合。

          

 

   4、在DStream上執行的任何操作都轉換為對基礎RDD的操作。

 

二、需要的maven包

<!-- Spark Streaming -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

<!-- config的統一配置 -->
<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>config</artifactId>
    <version>1.3.3</version>
</dependency>

<!-- json格式轉化 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
    </dependency>

 

三、項目參數的配置文件

  首先在 src/resources 路徑下創建一個 application.conf 文件,上面 maven 文件中添加的 config 依賴默認會讀取 src/resources 路徑下的 application.conf 文件。

applicaltion.conf

kafka.topic = "topicName"
kafka.group.id = "Your group"
kafka.broker.list = "your Kafka's host:port"

redis.host = "redisIP, no port"
redis.db=1

   Redis 會有 16個庫,這里面 redis.db=1,代表我會將數據存入到redis的1庫中。

   然后我會創建一個 Object,通過 ConfigFactory 來獲取 application 中的參數的值.

object ParamsConf {
  private lazy val conf = ConfigFactory.load() 

  val topics = conf.getString("kafka.topic").split(",")
  val groupId = conf.getString("kafka.group.id")
  val brokers = conf.getString("kafka.broker.list")
  val redisHost = conf.getString("redis.host")
  val redisDB = conf.getInt("redis.db")

    
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
}

   這樣項目就會很方便的通過這個工具類來獲取到統一的參數配置了。

  

四、向 Kafka 傳入 Demo 數據

   通過 KafkaProducer 創建與 Kafka 之間的連接,KafkaProducer 在創建對象的時候需要傳入生產者的配置參數。通過讀 Source Code 都可以查到對應的參數配置

val prop = new Properties()
// 序列化用到的 key、value
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
// kafka的地址,在上面的配置里
prop.put("bootstrap.servers", ParamsConf.brokers)
prop.put("request.required.acks", "1")

val topic = ParamsConf.topics(0)

val producer = new KafkaProducer[String, String](prop)

  其中 request.required.acks 是 Kafka 的發送確認,有 -1、0、1 三個級別。-1 代表 producer 會獲得所有同步replicas 都收到數據的確認,才會發下一條消息;0代表 producer 不等待確認消息,producer中有message就會發給broker;1代表 獲得leader replica已經接收了數據的確認信息。

  之后利用循環編造數據,並將數據發送給 Kafka

val random = new Random()
val dateFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")

for(i <- 1 to 100) {
      val time = dateFormat.format(new Date())
      val userId = random.nextInt(1000).toString
      val courseid = random.nextInt(500).toString
      val fee = random.nextInt(400).toString
      val result = Array("0","1") // 0 表示未成功;1 表示成功
      val flag = result(random.nextInt(2)).toString
      var orderId = UUID.randomUUID().toString()

      val map = new util.HashMap[String, Object]()
      map.put("time",time)
      map.put("userId",userId)
      map.put("courseid",courseid)
      map.put("fee",fee)
      map.put("result",result)
      map.put("flag",flag)
      map.put("orderId",orderId)

      val json = new JSONObject(map)

  producer.send(new ProducerRecord[String, String](topic, i.toString, json.toString()))
    }

  

五、 Spark Streaming 對接 Kafka

  初始化 SparkStreaming 程序,首先需要創建 StreamingContext。StreamingContext的創建方式:

val conf = new SparkConf().setMaster("local[2]").setAppName("hunterV2")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
...
... Do Something...
...
ssc.start()
ssc.awaitTermination()

  在邏輯代碼之后要調用 ssc.start() 來開始實時處理數據。ssc.awaitTermination() 方法也是必需的, 來等待應用程序的終止,也可以用 ssc.stop() 來終止程序。或者就是讓它持續不斷的運行進行計算

  通過 KafkaUtils 以直連方式拉取數據,這種方式不會修改數據的偏移量,需要手動的更新

val stream = KafkaUtils.createDirectStream(ssc,
                       LocationStrategies.PreferConsistent,
                       ConsumerStrategies.Subscribe[String, String](ParamsConf.topics, ParamsConf.kafkaParams))

  createDirectStream 有三個參數,查看 Source Code 

def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]
)

  前兩個參數較明確,第三個參數中則是要添加 kafka 的 topic 以及 Kafka 的信息,在 ParamsConf 中已經定義好了,這個也可以在官網文檔中可以找到。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html)

  拉取數據之后,SparkStreaming 提供的 stream 變量是 DStream 類型數據。DStream 是一組 RDD 的序列,DStream 任何操作都轉換為對基礎 RDD 的操作。因此需要用到 DStream 的 action -- foreachRDD.

stream.foreachRDD( rdd => {
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  ...
  ... Do SomeThing
  ...
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})

  在邏輯代碼的前后有兩行 關於 offsetRanges 的語句,這是 kafka 自帶的用作管理 offset 的語句。第一句是讀取當前的偏移量的數據,邏輯執行成功之后,最后一句是將偏移量提交到 Kafka 上。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself)

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM