推薦系統的在線部分往往使用spark-streaming實現,這是一個很重要的環節。
在線流程的實時數據一般是從kafka獲取消息到spark streaming
spark連接kafka兩種方式在面試中會經常被問到,說明這是重點,下面為大家介紹一下這兩種方法:
第一種方式:Receiver模式 又稱kafka高級api模式
效果:SparkStreaming中的Receivers,恰好kafka有發布、訂閱,然而:這種方式企業不常用,說明有bug,不符合企業需求。因為:接收到的數據存儲在Executor,會出現數據漏處理或者多處理狀況。
簡單的理解就是kafka把消息全部封裝好,提供給spark去調用,本來kafka的消息分布在不同的partition上面,相當於做了一步數據合並,在發送給spark,故spark可以設置executor個數去消費這部分數據,效率相對慢一些。
代碼實例:
object ReceiverKafkaWordCount { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val Array(brokers, topics) = Array(Conf.KAFKA_BROKER, Conf.TEST_TOPIC) // Create context with 2 second batch interval val conf = new SparkConf() .setMaster("local") .setAppName("OnlineStreamHobby") //設置本程序名稱 // .set("auto.offset.reset","smallest") val ssc = new StreamingContext(conf, Seconds(2)) // 從kafka取數據 val kafkaParams: Map[String, String] = Map[String, String]( // "auto.offset.reset" -> "smallest", //自動將偏移重置為最早的偏移 "zookeeper.connect" -> Conf.ZK_HOST, // "bootstrap.servers" -> Common.KAFKA_BROKER_LIST, "group.id" -> "test" ) val numThreads = 1 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val fact_streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2).map(_._2) // fact_streaming.print() val words = fact_streaming.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.checkpoint(".") //啟動spark並設置執行時間 ssc.start() ssc.awaitTermination() } }
第二種方式:Direct模式 又稱kafka低級API模式
效果:每次到topic的每個partition依據偏移量進行獲取數據,拉取數據以后進行處理,可以實現高可用
解釋:在spark 1.3中引入了這種新的無接收器“直接”方法,以確保更強大的端到端保證。這種方法不是使用接收器來接收數據,而是定期查詢kafka在每個topic+分partition中的最新偏移量,並相應地定義要在每個批次中處理的偏移量范圍。當處理數據的作業啟動時,Kafka簡單的客戶API用於讀取Kafka中定義的偏移范圍(類似於從文件系統讀取文件)。請注意,此功能在Spark 1.3中為Scala和Java API引入
簡單的理解就是spark直接從kafka底層中的partition直接獲取消息,相對於Receiver模式少了一步,效率更快。但是這樣一來spark中的executor的工作的個數就為kafka中的partition一致,設置再多的executor都不工作,同時偏移量也需要自己維護。
代碼示例:
object DirectTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("kafka direct test").setMaster("local") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(10)) //kafka基本參數,yourBrokers你的brokers集群 val kafkaParams = Map("metadata.broker.list" -> Conf.KAFKA_BROKER) val topic = "test" val customGroup = "testGroup" //新建一個zkClient,zk是你的zk集群,和broker一樣,也是"IP:端口,IP端口..." /** *如果你使用val zkClient = new ZKClient(zk)新建zk客戶端, *在后邊讀取分區信息的文件數據時可能會出現錯誤 *org.I0Itec.zkclient.exception.ZkMarshallingError: * java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) .. *那么使用我的這個新建方法就可以了,指定讀取數據時的序列化方式 **/ val zkClient = new ZkClient(Conf.ZK_HOST, Integer.MAX_VALUE, 10000,ZKStringSerializer) //獲取zk下該消費者的offset存儲路徑,一般該路徑是/consumers/test_spark_streaming_group/offsets/topic_name val topicDirs = new ZKGroupTopicDirs(customGroup, topic) val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //設置第一批數據讀取的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() var directKafkaStream : InputDStream[(String,String)] = null //如果zk下有該消費者的offset信息,則從zk下保存的offset位置開始讀取,否則從最新的數據開始讀取(受auto.offset.reset設置影響,此處默認) if (children > 0) { //將zk下保存的該主題該消費者的每個分區的offset值添加到fromOffsets中 for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/$i") val tp = TopicAndPartition(topic, i) //將不同 partition 對應的 offset 增加到 fromOffsets 中 fromOffsets += (tp -> partitionOffset.toLong) println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@") val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic,mmd.message()) directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, kafkaParams, fromOffsets, messageHandler) } }else{ directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) } /** *上邊已經實現從zk上保存的值開始讀取數據 *下邊就是數據處理后,再講offset值寫會到zk上 */ //用於保存當前offset范圍 var offsetRanges: Array[OffsetRange] = Array.empty val directKafkaStream1 = directKafkaStream.transform { rdd => //取出該批數據的offset值 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map(_._2) directKafkaStream1.foreachRDD(rdd=>{ //數據處理完畢后,將offset值更新到zk集群 for (o <- offsetRanges) { val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) } rdd.foreach(println) }) ssc.start() ssc.awaitTermination() } }