先來一段到處都有的原理(出處到處都有,就不注明了)
Streaming和Kafka整合有兩種方式--Receiver和Direct,簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據
Receiver:
1、Kafka中topic的partition與Spark中RDD的partition是沒有關系的,因此,在KafkaUtils.createStream()中,提高partition的數量,只會增加Receiver的數量,也就是讀取Kafka中topic partition的線程數量,不會增加Spark處理數據的並行度。
2、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
3、如果基於容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
1、簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream,然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
3、一次且僅一次的事務機制:基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。由於數據消費偏移量是保存在checkpoint中,因此,如果后續想使用kafka高級API消費數據,需要手動的更新zookeeper中的偏移量
本來說網上搜一搜就解決了代碼問題,但是大部分都是Spark1.X 的,對應的Kafka的 createDirectStream 的傳參方式不一樣(也可能是我學的太淺),所以仿照spark1.X的寫了個。
接下來直接上代碼
package com.kafka
import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object KafkaZookeeperCheckPoint {
// ZK client
val client = {
val client = CuratorFrameworkFactory
.builder
.connectString("bigdata:2181,bigdata:2182,bigdata:2183")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("mykafka")
.build()
client.start()
client
}
// offset 路徑起始位置
val Globe_kafkaOffsetPath = "/kafka/offsets"
// 路徑確認函數 確認ZK中路徑存在,不存在則創建該路徑
def ensureZKPathExists(path: String)={
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path)
}
}
// 保存 新的 offset
def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {
for (o <- offsetRange){
val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"
// 向對應分區第一次寫入或者更新Offset 信息
println("---Offset寫入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
}
}
def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {
// Kafka 0.8和0.10的版本差別,0.10 為 TopicPartition 0.8 TopicAndPartition
var fromOffset: Map[TopicPartition, Long] = Map()
val topic1 = topic(0).toString
// 讀取ZK中保存的Offset,作為Dstrem的起始位置。如果沒有則創建該路徑,並從 0 開始Dstream
val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"
// 檢查路徑是否存在
ensureZKPathExists(zkTopicPath)
// 獲取topic的子節點,即 分區
val childrens = client.getChildren().forPath(zkTopicPath)
// 遍歷分區
val offSets: mutable.Buffer[(TopicPartition, Long)] = for {
p <- childrens
}
yield {
// 遍歷讀取子節點中的數據:即 offset
val offsetData = client.getData().forPath(s"$zkTopicPath/$p")
// 將offset轉為Long
val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
// 返回 (TopicPartition, Long)
(new TopicPartition(topic1, Integer.parseInt(p)), offSet)
}
println(offSets.toMap)
if(offSets.isEmpty){
(offSets.toMap, 0)
} else {
(offSets.toMap, 1)
}
}
// if (client.checkExists().forPath(zkTopicPath) == null){
//
// (null, 0)
// }
// else {
// val data = client.getData.forPath(zkTopicPath)
// println("----------offset info")
// println(data)
// println(data(0))
// println(data(1))
// val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)
// println(offSets)
// (offSets, 1)
// }
//
// }
def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],
groupName:String ):InputDStream[ConsumerRecord[String, String]] = {
// get offset flag = 1 表示基於已有的offset計算 flag = 表示從頭開始(最早或者最新,根據Kafka配置)
val (fromOffsets, flag) = getFromOffset(topic, groupName)
var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null
if (flag == 1){
// 加上消息頭
//val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
println(fromOffsets)
kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))
println(fromOffsets)
println("中斷后 Streaming 成功!")
} else {
kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams))
println("首次 Streaming 成功!")
}
kafkaStream
}
def main(args: Array[String]): Unit = {
val processInterval = 5
val brokers = "bigdata:9092,bigdata:9093,bigdata:9094"
val topics = Array("zkKafka")
val conf = new SparkConf().setMaster("local[2]").setAppName("kafka checkpoint zookeeper")
// kafka params
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "zk_group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(conf, Seconds(processInterval))
val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")
messages.foreachRDD((rdd) => {
if (!rdd.isEmpty()){
println("###################:"+rdd.count())
}
// 存儲新的offset
storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")
})
ssc.start()
ssc.awaitTermination()
}
}
遇到的幾個坑:
一、怎么傳入 Offset,這個糾結了好久。后面是看 createDirectStream的源代碼看到 consumerStrategy.Subscribe 這個方法還有第三個參數,就是Offset

二、無法遍歷topic的子節點(分區)

解決方案:
import scala.collection.JavaConversions._
這個是因為個人水平問題,不多說
三、TopicPartition和TopicAndPartition
spark2.X貌似只有TopicPartition了,這個方法也是看源碼找到的,搜一下就有,就不截圖了。
然后,實例化的時候,因為我的Topic是Array,在實例化TopicPartition的時候,需要先toString。不能TopicPartition(topic.toString, .... ,在里面toString會造成,實例化后的topic顯示為亂碼(這塊不熟,諒解)。

如有問題,歡迎指正~
下次將Offset寫入到Hbase的代碼和遇到的坑寫出來
最后,源碼真的是個好東西~
