Kafka為一個分布式的消息隊列,spark流操作kafka有兩種方式:
一種是利用接收器(receiver)和kafaka的高層API實現。
一種是不利用接收器,直接用kafka底層的API來實現(spark1.3以后引入)。
Receiver方式
基於Receiver方式實現會利用Kakfa的高層消費API,和所有的其他Receivers一樣,接受到的數據會保存到excutors中,然后由spark Streaming 來啟動Job進行處理這些數據。
在默認的配置下,這種方式在失敗的情況下,會丟失數據,如果要保證零數據丟失,需要啟用WAL(Write Ahead Logs)。它同步將接受到數據保存到分布式文件系統上比如HDFS。 所以數據在出錯的情況下可以恢復出來。
使用兩個步驟:
1、添加依賴:spark-streaming-kafka_2.10-1.3.0
2、編程:import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
注意:
- kafka的分區數和Spark的RDD的分區不是一個概念。所以在上述函數中增加特定主題的分區數,僅僅增加了一個receiver中消費topic的線程數,並不難增加spark並行處理數據的數量。
(那是不是多少個paratition最好對應多少個receiver的消費線程啊?)
- 對於不同的group和topic,可以使用多個recivers創建多個DStreams來並行處理數據(如果是同一個topic如何保證數據不被重復消費?)
-
如果啟用了WAL,接收到的數據會被持久化一份到日志中,因此需要將storage_lever設置成StorgeLevel.MEMORY_AND_DISK_SER
開啟:
val conf = new SparkConf()
conf.
set
(
"spark.streaming.receiver.writeAheadLog.enable"
,
"true"
)
val sc= new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint(
"checkpoint"
)
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
//
開啟在強行終止的情況下,數據仍然會丟失,解決辦法:
sys.addShutdownHook({
ssc.stop(
true
,
true
)
)})
3、運行
運行提交代碼的時候,需要添加以下基本Jar包依賴:
--jars lib/spark-streaming-kafka_2.10-1.3.0.jar,
lib/spark-streaming_2.10-1.3.0.jar,
lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,
4、例子
object
KafkaWordCount
{
def main(args:
Array
[
String
]) {
if
(args.length < 4) {
System
.err.println(
"Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>"
)
System
.exit(1)
}
StreamingExamples
.set
StreamingLogLevels
()
val
Array
(zk
Quorum
, group, topics, num
Threads
) = args
val spark
Conf
= new
SparkConf
().set
AppName
(
"KafkaWordCount"
)
val ssc = new
StreamingContext
(spark
Conf
,
Seconds
(2))
//保證元數據恢復,就是
Driver
端掛了之后數據仍然可以恢復
ssc.checkpoint(
"checkpoint"
)
val topic
Map
= topics.split(
","
).map((_,num
Threads
.to
Int
)).to
Map
val lines =
KafkaUtils
.create
Stream
(ssc, zk
Quorum
, group, topic
Map
).map(_._2)
val words = lines.flat
Map
(_.split(
" "
))
val word
Counts
= words.map(x => (x, 1L))
.reduce
ByKeyAndWindow
(_ + _, _ - _,
Minutes
(10),
Seconds
(2), 2)
word
Counts
.print()
ssc.start()
ssc.await
Termination
()
}
}
|
5、圖示:
<接收示意圖>
<元數據恢復>
直接操作方式
不同於Receiver接收數據方式,這種方式定期從kafka的topic下對應的partition中查詢最新偏移量,並在每個批次中根據相應的定義的偏移范圍進行處理。Spark通過調用kafka簡單的消費者API讀取一定范圍的數據。
相比基於Receiver方式有幾個優點:
- 簡單的並發:
不需要創建多個kafka輸入流,然后Union他們,而使用DirectStream,spark Streaming將會創建和kafka分區一樣的RDD的分區數,而且會從kafka並行讀取數據,Spark的分區數和Kafka的分區數是一一對應的關系。
- 高效
第一種實現數據的零丟失是將數據預先保存在WAL中,會復制一遍數據,會導致數據被拷貝兩次:一次是被Kafka復制;另一次是寫入到WAL中,沒有Receiver消除了這個問題。
- 僅一次語義:
Receiver方式讀取kafka,使用的是高層API將偏移量寫入ZK中,雖然這種方法可以通過數據保存在WAL中保證數據的不對,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次,
第二種方式不采用ZK保存偏移量,消除了兩者的不一致,保證每個記錄只被Spark Streaming操作一次,即使是在處理失敗的情況下。如果想更新ZK中的偏移量數據,需要自己寫代碼來實現。
1、引入依賴
同第一種方式。
2、編程
import
org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[[key
class
], [value
class
], [key decoder
class
], [value decoder
class
] ](streamingContext, [map of Kafka parameters], [set of topics to consume])
|
如果想獲得每個topic中每個分區的在spark streaming中的偏移量,可以通過以下代碼:
directKafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
...
}
//例子:
val ssc =
new
StreamingContext(sc, Seconds(
2
))
val kafkaParams = Map(
"zookeeper.connect"
-> zkConnect,
"group.id"
-> kafkaGroupId,
"metadata.broker.list"
->
"10.15.42.23:8092,10.15.42.22:8092"
,
"auto.offset.reset"
->
"smallest"
)
val topics = Set(topic)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
//KafkaCluster 需要從源碼拷貝,此類是私有類。
directKafkaStream.foreachRDD(
rdd => {
val offsetLists = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val kc =
new
KafkaCluster(kafkaParams)
for
(offsets <- offsetLists) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(kafkaGroupId, Map((topicAndPartition, offsets.untilOffset)))
if
(o.isLeft) {
println(s
"Error updating the offset to Kafka cluster: ${o.left.get}"
)
}
}
}
)
|
3、部署:
同第一種方式。
4、圖示: