SparkStreaming的Receiver方式和直連方式有什么區別?
Receiver接收固定時間間隔的數據(放在內存中的),使用高級API,自動維護偏移量,達到固定的時間才去進行處理,效率低並且容易丟失數據,靈活性特別差,不好,而且它處理數據的時候,如果某一刻的數據量過大,那么就會造成磁盤溢寫的情況,他通過WALS進行磁盤寫入。
Receiver實現方式:

代碼如下:
object KafkaWC02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafkaWC").setMaster("local[2]") //設置線程數
val ssc = new StreamingContext(conf, Seconds(5))
//設置檢查點
ssc.checkpoint("D:\\data\\checpoint\\checpoint1")
//接下來編寫kafka的配置信息
val zks = "spark01:2181"
//然后是kafka的消費組
val groupId = "gp1"
//Topic的名字 Map的key是Topic名字,第二個參數是線程數
val topics = Map[String, Int]("test02" -> 1)
//創建kafka的輸入數據流,來獲取kafka中的數據
val data = KafkaUtils.createStream(ssc, zks, groupId, topics)
//獲取到的數據是鍵值對的格式(key,value)
//獲取到的數據是 key是偏移量 value是數據
//接下來開始處理數據
val lines = data.flatMap(_._2.split(" "))
val words = lines.map((_, 1))
val res = words.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
res.print()
//val result = words.reduceByKey(_ + _)
//val res = result.updateStateByKey[Int](updateFunc)
//res.print()
//打印輸出
//result.print()
//啟動程序
ssc.start()
//等待停止
ssc.awaitTermination()
}
//(iterator:Iteratot[(K,Seq[V]),Option[S]]))
//傳過來的值是Key Value類型
//第一個參數,是我們從kafka獲取到的元素,key ,String類型
//第二個參數,是我們進行單詞統計的value值,Int類型
//第三個參數,是我們每次批次提交的中間結果集
val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.map(t=>{
(t._1,t._2.sum+t._3.getOrElse(0))
})
}
}

Direct直連方式,
它使用的是底層API實現Offest我們開發人員管理,這樣的話,它的靈活性特別好。並且可以保證數據的安全性,而且不用擔心數據量過大,因為它有預處理機制,進行提前處理,然后再批次提交任務。
Direct實現方式:

代碼如下:
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
/**
* 重要!!! Direct直連方式
*/
object KafkaDirectWC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Direct").setMaster("local[2]")
val ssc = new StreamingContext(conf,Duration(5000))
//指定組名
val groupId = "gp01"
//指定消費的topic名字
val topic = "tt"
//指定kafka的Broker地址(SparkStreaming的Task直接連接到Kafka分區上,用的是底層API消費)
val brokerList ="spark:9092"
//接下來我們要自己維護offset了,將offset保存到ZK中
val zkQuorum = "spark:2181"
//創建stream時使用的topic名字集合,SparkStreaming可以同時消費多個topic
val topics:Set[String] = Set(topic)
//創建一個ZkGroupTopicDirs對象,其實是指定往Zk中寫入數據的目錄
// 用於保存偏移量
val TopicDirs = new ZKGroupTopicDirs(groupId,topic)
//獲取zookeeper中的路徑“/gp01/offset/tt/”
val zkTopicPath = s"${TopicDirs.consumerOffsetDir}"
//准備kafka參數
val kafkas = Map(
"metadata.broker.list"->brokerList,
"group.id"->groupId,
//從頭開始讀取數據
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
)
// zookeeper 的host和ip,創建一個client,用於更新偏移量
// 是zookeeper客戶端,可以從zk中讀取偏移量數據,並更新偏移量
val zkClient = new ZkClient(zkQuorum)
//"/gp01/offset/tt/0/10001"
//"/gp01/offset/tt/1/20001"
//"/gp01/offset/tt/2/30001"
val clientOffset = zkClient.countChildren(zkTopicPath)
// 創建KafkaStream
var kafkaStream :InputDStream[(String,String)]= null
//如果zookeeper中有保存offset 我們會利用這個offset作為KafkaStream的起始位置
//TopicAndPartition [/gp01/offset/tt/0/ , 8888]
var fromOffsets:Map[TopicAndPartition,Long] = Map()
//如果保存過offset
if(clientOffset > 0){
//clientOffset 的數量其實就是 /gp01/offset/tt的分區數目
for(i<-0 until clientOffset){
// /gp01/offset/tt/ 0/10001
val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
// tt/0
val tp = TopicAndPartition(topic,i)
//將不同partition 對應得offset增加到fromoffset中
// tt/0 -> 10001
fromOffsets += (tp->partitionOffset.toLong)
}
// key 是kafka的key value 就是kafka數據
// 這個會將kafka的消息進行transform 最終kafka的數據都會變成(kafka的key,message)這樣的Tuple
val messageHandler = (mmd:MessageAndMetadata[String,String])=>
(mmd.key(),mmd.message())
// 通過kafkaUtils創建直連的DStream
//[String,String,StringDecoder, StringDecoder,(String,String)]
// key value key解碼方式 value的解碼方式 接收數據的格式
kafkaStream = KafkaUtils.createDirectStream
[String,String,StringDecoder,
StringDecoder,(String,String)](ssc,kafkas,fromOffsets,messageHandler)
}else{
//如果未保存,根據kafkas的配置使用最新的或者最舊的offset
kafkaStream = KafkaUtils.createDirectStream
[String,String,StringDecoder,StringDecoder](ssc,kafkas,topics)
}
//偏移量范圍
var offsetRanges = Array[OffsetRange]()
//從kafka讀取的數據,是批次提交的,那么這塊注意下,
// 我們每次進行讀取數據后,需要更新維護一下偏移量
//那么我們開始進行取值
// val transform = kafkaStream.transform{
// rdd=>
// //得到該RDD對應得kafka消息的offset
// // 然后獲取偏移量
// offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// rdd
// }
// val mes = transform.map(_._2)
// 依次迭代DStream中的RDD
kafkaStream.foreachRDD{
//對RDD進行操作 觸發Action
kafkardd=>
offsetRanges = kafkardd.asInstanceOf[HasOffsetRanges].offsetRanges
//下面 你就可以怎么寫都行了,為所欲為
val maps = kafkardd.map(_._2)
maps.foreach(println)
for(o<-offsetRanges){
// /gp01/offset/tt/ 0
val zkpath = s"${TopicDirs.consumerOffsetDir}/${o.partition}"
//將該partition的offset保存到zookeeper中
// /gp01/offset/tt/ 0/88889
ZkUtils.updatePersistentPath(zkClient,zkpath,o.untilOffset.toString)
}
}
// 啟動
ssc.start()
ssc.awaitTermination()
}
}
