第1章 SparkStreaming概述
1.1 Spark Streaming是什么
Spark Streaming用於流式數據的處理。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。
和Spark基於RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而DStream是由這些RDD所組成的序列(因此得名“離散化”)。所以簡單來將,DStream就是對RDD在實時數據處理場景的一種封裝。
1.2 Spark Streaming的特點
- 易用
- 容錯
- 易整合到Spark體系
1.3 Spark Streaming架構
1.3.1 架構圖
- 整體架構圖
- SparkStreaming架構圖
1.3.2 背壓機制
Spark 1.5以前版本,用戶如果要限制Receiver的數據接收速率,可以通過設置靜態配制參數“spark.streaming.receiver.maxRate”的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer數據生產高於maxRate,當前集群處理能力也高於maxRate,這就會造成資源利用率下降等問題。
為了更好的協調數據接收速率與資源處理能力,1.5版本開始Spark Streaming可以動態控制數據接收速率來適配集群數據處理能力。背壓機制(即Spark Streaming Backpressure): 根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。
通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。
第2章 Dstream入門
2.1 WordCount案例實操
- 需求:使用netcat工具向3333端口不斷的發送數據,通過SparkStreaming讀取端口數據並統計不同單詞出現的次數
1) 添加依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.1</version> </dependency>
2)添加 log4j.properties
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
3) 編寫代碼
package com.yuange.sparkstreaming package com.yuange.sparkstreaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 17:02 */ object WordCount { def main(args: Array[String]): Unit = { // ①創建編程入口,StreamingContext val streamingContext = new StreamingContext("local[2]","wc",Seconds(4)) //②創建編程模型: DStream 根據不同的數據源創建不同的DS val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103",3333) // ③調用DS中的方法進行運算 val result: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _) // ④調用行動算子,例如輸出,打印等 result.print(1000) // ⑤真正的計算,會在啟動了app之后運行 streamingContext.start() // ⑥流式應用,需要一直運行(類似agent) 不能讓main運行完,阻塞main streamingContext.awaitTermination() } }
4) 啟動程序並通過netcat發送數據:
nc -lk 3333
2.2 WordCount解析
Discretized Stream是Spark Streaming的基礎抽象,代表持續性的數據流和經過各種Spark原語操作后的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的數據。
對數據的操作也是按照RDD為單位來進行的
計算過程由Spark Engine來完成
第3章 DStream創建
3.1 RDD隊列
3.1.1 用法及說明
測試過程中,可以通過使用ssc.queueStream(queueOfRDDs)來創建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。
3.1.2 案例實操
- 需求:循環創建幾個RDD,將RDD放入隊列。通過SparkStream創建Dstream,計算WordCount
1) 編寫代碼
package com.yuange.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /** * @作者:袁哥 * @時間:2021/7/2 19:26 */ object RDDStream { def main(args: Array[String]): Unit = { //初始化Spark配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //初始化SparkStreamingContext val ssc = new StreamingContext(conf,Seconds(4)) //創建RDD隊列 val rddQueue = new mutable.Queue[RDD[Int]]() //創建QueueInputDStream val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //處理隊列中的RDD數據 val reducedStream = inputStream.map((_,1)).reduceByKey(_ + _) //打印結果 reducedStream.print() //啟動任務 ssc.start() //循環創建並向RDD隊列中放入RDD for (i <- 1 to 4){ rddQueue += ssc.sparkContext.makeRDD(1 to 300,10) Thread.sleep(1500) } ssc.awaitTermination() } }
2) 結果展示
3.2 自定義數據源
3.2.1 用法及說明
需要繼承Receiver,並實現onStart、onStop方法來自定義數據源采集。
3.2.2 案例實操
需求:自定義數據源,實現監控某個端口號,獲取該端口號內容。
1) 自定義數據源
package com.yuange.sparkstreaming import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /** * @作者:袁哥 * @時間:2021/7/2 19:42 */ class CustomerReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //最初啟動的時候,調用該方法,作用為:讀數據並將數據發送給Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //讀數據並將數據發送給Spark def receive(): Unit = { //創建一個Socket var socket: Socket = new Socket(host,port) //定義一個變量,用來接收端口傳過來的數據 var input: String = null //創建一個BufferedReader用於讀取端口傳來的數據 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) //讀取數據 input = reader.readLine() //當receiver沒有關閉並且輸入數據不為空,則循環發送數據給Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循環則關閉資源 reader.close() socket.close() } override def onStop(): Unit = {} }
2) 使用自定義的數據源采集數據
package com.yuange.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 19:54 */ object FileStream { def main(args: Array[String]): Unit = { //初始化Spark配置信息 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount") //初始化SparkStreamingContext val ssc = new StreamingContext(sparkConf,Seconds(4)) //創建自定義的receiver的Streaming val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop103", 3333)) //將每行數據做切分,形成一個個單詞 val wordStream = lineStream.flatMap(_.split("\t")) //將單詞映射為元組 val wordAndOneStream = wordStream.map((_,1)) //6.將相同的單詞次數做統計 val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _) //7.打印 wordAndCountStream.print() //8.啟動SparkStreamingContext ssc.start() ssc.awaitTermination() } }
3.3 Kafka數據源
ReceiverAPI(Receiver 模式):需要一個專門的Executor去接收數據,然后發送給其他的Executor做計算。存在的問題,接收數據的Executor和計算的Executor速度會有所不同,特別在接收數據的Executor速度大於計算的Executor速度,會導致計算數據的節點內存溢出。早期版本中提供此方式,當前版本不適用
DirectAPI(Direct模式(推薦)):是由計算的Executor來主動消費Kafka的數據,速度由自身控制
官方文檔:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
3.3.1 Direct模式官網版
1)代碼
package com.yuange.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ /** * @作者:袁哥 * @時間:2021/7/3 14:00 */ object KafkaDirectModeDemo { def main(args: Array[String]): Unit = { val streamingContext = new StreamingContext("local[2]","wc",Seconds(5)) //配置消費者信息 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //集群地址 "key.deserializer" -> classOf[StringDeserializer], //Key-value的反序列化器 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "yuange", //消費者組id "auto.offset.reset" -> "latest", //消費者id(可選) "enable.auto.commit" -> "true" //是否自動提交offset,如果要自動提交,多久自動提交1次 ) //指定消費主題 val topics = Array("topicA") // 使用提供的API,從kafka中獲取一個DS val ds = KafkaUtils.createDirectStream[String, String]( streamingContext, /* * 位置策略: 指 Task允許的Executor 和 kafka的broker 的網絡拓撲關系 * 大部分情況, LocationStrategies.PreferConsistent * PreferConsistent : 大部分情況,公司用 * PreferBrokers: executor和broker在同一台機器 * PreferFixed: 自定義 */ LocationStrategies.PreferConsistent, /* * 消費策略 * 獨立消費者: 不需要借助Kafka集群保存Offset * assign * 非獨立消費者(大部分): 需要kafka集群,采取分配策略為消費者組的每個消費者分配分區! * 需要借助kafka集群保存offset * subscribe */ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) val ds1 = ds.map(recored => recored.value()) ds1.print() streamingContext.start() streamingContext.awaitTermination() } }
2)啟動kafka生產者線程生產數據
bin/kafka-console-producer.sh --broker-list hadoop104:9092 --topic topicA
3)查看結果
3.3.2 Direct模式演示丟失數據
1)代碼
package com.yuange.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} /** * @作者:袁哥 * @時間:2021/7/3 22:32 * * 數據處理語義: * at least once : 至少一次,可以是 1次或多次,存在重復處理! * at most once(沒人用) : 至多一次,可以是0次或1次, 存在丟數據風險! * eaxctly once : 精確一次,不多不少 * at least once + 去重 = eaxctly once * 目前是自動提交offset,因此程序運行到49時,只是從kafka消費到了數據,還沒有進行計算,就已經自動提交了offset! * 無論發生什么情況,都只能從提交的offset后,再消費,如果程序異常,異常期間的那批數據,也無法在程序重啟后,再次消費到! */ object KafkaDirectLoseDataTest { def main(args: Array[String]): Unit = { val streamingContext = new StreamingContext("local[2]","wc",Seconds(5)) //配置消費者信息 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //集群地址 "key.deserializer" -> classOf[StringDeserializer], //Key-value的反序列化器 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "yuange", //消費者組id "auto.offset.reset" -> "latest", //消費者id(可選) "enable.auto.commit" -> "true" //是否自動提交offset,如果要自動提交,多久自動提交1次 ) //指定消費主題 val topics = Array("topicA") // 使用提供的API,從kafka中獲取一個DS val ds = KafkaUtils.createDirectStream[String, String]( streamingContext, /* * 位置策略: 指 Task允許的Executor 和 kafka的broker 的網絡拓撲關系 * 大部分情況, LocationStrategies.PreferConsistent * PreferConsistent : 大部分情況,公司用 * PreferBrokers: executor和broker在同一台機器 * PreferFixed: 自定義 */ LocationStrategies.PreferConsistent, /* * 消費策略 * 獨立消費者-->assign:不需要借助Kafka集群保存Offset * 非獨立消費者(大部分)-->subscribe:需要kafka集群,采取分配策略為消費者組的每個消費者分配分區!需要借助kafka集群保存offset */ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //程序的運算邏輯 val ds1 = ds.map(record =>{ //模擬異常 if (record.value().equals("d")) 1 / 0 record.value() }) ds1.print() streamingContext.start() streamingContext.awaitTermination() } }
2)測試
bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092
3)結果
3.3.3 Direct模式使用Checkpoint維護offset
1)代碼
package com.yuange.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} /** * @作者:袁哥 * @時間:2021/7/3 22:40 * 不能自動提交offset,需要自己維護offset! * 如何自己維護? * 三種方式:①Checkpoints ②Kafka itself ③Your own data store * Checkpoints: checkpoint本質是一個持久化的文件系統!將kafka的偏移量存儲在 spark提供的ck目錄中,下次程序重啟時,會從ck目錄獲取上次消費的offset,繼續消費! * 操作: ①設置ck目錄:streamingContext.checkpoint("kafkack") * ②設置故障的時候,讓Driver從ck目錄恢復: * def getActiveOrCreate( * checkpointPath: String, //ck目錄 * creatingFunc: () => StreamingContext // 一個空參的函數,要求返回StreamingContext,函數要求把計算邏輯也放入次函數 * ): StreamingContext * ③取消自動提交offset:"enable.auto.commit" -> "false" * Checkpoints的弊端: * ①一般的異常,會catch住,繼續運行,不給你異常后,從異常位置繼續往后消費的機會 * ②重啟后,會從上次ck目錄記錄的時間戳,一直按照 slide時間,提交Job,到啟動的時間 * ③會產生大量的小文件 * 結論:不推薦使用!因為不能保證精確一次! */ object KafkaDirectCKTest { val ckPath:String ="kafkack" def main(args: Array[String]): Unit = { def creatingStreamingContextFunc(): StreamingContext = { val streamingContext = new StreamingContext("local[2]","wc",Seconds(5)) //設置ck目錄 streamingContext.checkpoint(ckPath) //配置消費者信息 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //集群地址 "key.deserializer" -> classOf[StringDeserializer], //Key-value的反序列化器 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "yuange", //消費者組id "auto.offset.reset" -> "latest", //消費者id(可選) "enable.auto.commit" -> "false" //是否自動提交offset,如果要自動提交,多久自動提交1次 ) //指定消費主題 val topics = Array("topicA") // 使用提供的API,從kafka中獲取一個DS val ds = KafkaUtils.createDirectStream[String, String]( streamingContext, /* * 位置策略: 指 Task允許的Executor 和 kafka的broker 的網絡拓撲關系 * 大部分情況, LocationStrategies.PreferConsistent * PreferConsistent : 大部分情況,公司用 * PreferBrokers: executor和broker在同一台機器 * PreferFixed: 自定義 */ LocationStrategies.PreferConsistent, /* * 消費策略 * 獨立消費者-->assign:不需要借助Kafka集群保存Offset * 非獨立消費者(大部分)-->subscribe:需要kafka集群,采取分配策略為消費者組的每個消費者分配分區!需要借助kafka集群保存offset */ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //程序的運算邏輯 val ds1 = ds.map(record =>{ //模擬異常 if (record.value().equals("d")) throw new UnknownError("故障了!求你停下來吧!") record.value() }) ds1.print() streamingContext } // 要么直接創建,要么從ck目錄中恢復一個StreamingContext val context: StreamingContext = StreamingContext.getActiveOrCreate(ckPath,creatingStreamingContextFunc) context.start() context.awaitTermination() } }
2)測試
bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092
3)結果
4)將模擬異常的代碼注釋掉,然后再次啟動,發現重啟后,程序會從上次ck目錄記錄的時間戳,一直按照 slide時間,提交Job,到啟動的時間
3.3.4 Direct模式使用Kafka維護offset
1)代碼
package com.yuange.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/5 8:19 * 借助kafka提供的api,手動將offset存儲到kafka的__consumer_offsets中 * ①取消自動提交offset * ②獲取當前消費到的這批數據的offset信息 * ③進行計算和輸出 * ④計算和輸出完全完成后,再手動提交offset **/ object KafkaDirectStoreOffsetToKafka { def main(args: Array[String]): Unit = { val streamingContext = new StreamingContext("local[2]","wc",Seconds(5)) //配置消費者信息 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //集群地址 "key.deserializer" -> classOf[StringDeserializer], //Key-value的反序列化器 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "yuange", //消費者組id "auto.offset.reset" -> "latest", //消費者id(可選) "enable.auto.commit" -> "false" //是否自動提交offset,如果要自動提交,多久自動提交1次 ) //指定消費主題 val topics = Array("topicA") // 使用提供的API,從kafka中獲取一個DS val ds = KafkaUtils.createDirectStream[String, String]( streamingContext, /* * 位置策略: 指 Task允許的Executor 和 kafka的broker 的網絡拓撲關系 * 大部分情況, LocationStrategies.PreferConsistent * PreferConsistent : 大部分情況,公司用 * PreferBrokers: executor和broker在同一台機器 * PreferFixed: 自定義 */ LocationStrategies.PreferConsistent, /* * 消費策略 * 獨立消費者-->assign:不需要借助Kafka集群保存Offset * 非獨立消費者(大部分)-->subscribe:需要kafka集群,采取分配策略為消費者組的每個消費者分配分區!需要借助kafka集群保存offset */ ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) ds.foreachRDD { rdd => //判斷當前RDD是不是KafkaRDD,如果是,獲取其中的偏移量信息 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //在Driver端中的foreachRDD,只有RDD的算子,才在Executor端運行 //計算邏輯:如果有冪等操作,此時是 精確一次,如果沒有冪等操作,此時就是 最少一次 rdd.map(record => { //模擬異常 //if (record.value().equals("d")) 1 / 0 // 將數據寫到redis或hbase,保證寫N次,結果不變 (record.value(),1) }).reduceByKey(_ + _).collect().foreach(println(_)) //手動提交offset ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } streamingContext.start() streamingContext.awaitTermination() } }
2)測試
3)結果
3.3.5 Direct模式自己維護offset
1)創建數據庫
CREATE TABLE `offsets` ( `groupid` varchar(100) DEFAULT NULL, `topic` varchar(100) DEFAULT NULL, `partitionid` int(5) DEFAULT NULL, `offset` bigint(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `wordcount` ( `word` varchar(200) NOT NULL, `count` int(11) DEFAULT NULL, PRIMARY KEY (`word`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2)在idea中新建db.properties
jdbc.datasource.size=10 jdbc.url=jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true jdbc.user=root jdbc.password=root123 jdbc.driver.name=com.mysql.jdbc.Driver kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
3)新建JDBCUtil 工具類
package com.yuange.utils import java.sql.Connection import com.alibaba.druid.pool.DruidDataSourceFactory import javax.sql.DataSource /** * @作者:袁哥 * @時間:2021/7/5 9:11 */ object JDBCUtil { // 創建連接池對象 var dataSource:DataSource = init() // 連接池的初始化 def init():DataSource = { val paramMap = new java.util.HashMap[String, String]() paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name")) paramMap.put("url", PropertiesUtil.getValue("jdbc.url")) paramMap.put("username", PropertiesUtil.getValue("jdbc.user")) paramMap.put("password", PropertiesUtil.getValue("jdbc.password")) paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size")) // 使用Druid連接池對象 DruidDataSourceFactory.createDataSource(paramMap) } // 從連接池中獲取連接對象 def getConnection(): Connection = { dataSource.getConnection } def main(args: Array[String]): Unit = { println(getConnection()) } }
4)新建PropertiesUtil 工具類
package com.yuange.utils import java.util.ResourceBundle /** * @作者:袁哥 * @時間:2021/7/5 9:12 */ object PropertiesUtil { // 綁定配置文件 // ResourceBundle專門用於讀取配置文件,所以讀取時,不需要增加擴展名 // 國際化 = I18N => Properties val resourceFile: ResourceBundle = ResourceBundle.getBundle("db") def getValue( key : String ): String = { resourceFile.getString(key) } def main(args: Array[String]): Unit = { println(getValue("jdbc.user")) } }
5)代碼
package com.yuange.kafka import java.sql.{Connection, PreparedStatement, ResultSet} import com.yuange.utils.JDBCUtil import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /** * @作者:袁哥 * @時間:2021/7/5 8:36 * 自己維護offset:①取消自動提交offset * ②在程序開始計算之前,先從 mysql中讀取上次提交的offsets * ③基於上次提交的offsets,構造一個DS,這個DS從上次提交的offsets位置向后消費的數據流 * def SubscribePattern[K, V]( * pattern: ju.regex.Pattern, * kafkaParams: collection.Map[String, Object], * offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] * ④需要將結果,收集到Driver端,和offsets信息,組合為一個事務,一起寫入數據庫(若成功,提交,失敗,就回滾!) * Mysql中表的設計: * 運算的結果:單詞統計 --> result(單詞 varchar, count int) * offsets:offsets(group_id varchar, topic varchar, partition int , offset bigint) * 精准一次,借助事務 */ object KafkaDirectStoreOffsetToMysql { val groupId:String ="yuange" val streamingContext = new StreamingContext("local[2]", "wc", Seconds(5)) def main(args: Array[String]): Unit = { // 消費者的配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "enable.auto.commit" -> "false" ) // 查詢之前已經在mysql中保存的offset val offsetsMap: Map[TopicPartition, Long] = readHitoryOffsetsFromMysql(groupId) //指定消費主題 val topics = Array("topicA") // 基於上次提交的offsets,構造一個DS,這個DS從上次提交的offsets位置向后消費的數據流 val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap) ) ds.foreachRDD { rdd => //消費到了數據 if (!rdd.isEmpty()){ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach(println) //計算邏輯 val result: Array[(String, Int)] = rdd.map(record => (record.value(), 1)).reduceByKey(_ + _).collect() // 開啟事務,和offsets一起寫入mysql writeResultAndOffsetsToMysql(result,offsetRanges) } } streamingContext.start() streamingContext.awaitTermination() } //從Mysql讀取歷史偏移量 def readHitoryOffsetsFromMysql(groupId: String) : Map[TopicPartition, Long] = { val offsetsMap: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() var conn:Connection=null var ps:PreparedStatement=null var rs:ResultSet=null val sql:String= """ |SELECT | `topic`,`partitionid`,`offset` |FROM `offsets` |WHERE `groupid`=? | |""".stripMargin try { conn=JDBCUtil.getConnection() ps=conn.prepareStatement(sql) ps.setString(1,groupId) rs= ps.executeQuery() while(rs.next()){ val topic: String = rs.getString("topic") val partitionid: Int = rs.getInt("partitionid") val offset: Long = rs.getLong("offset") val topicPartition = new TopicPartition(topic, partitionid) offsetsMap.put(topicPartition,offset) } }catch { case e:Exception => e.printStackTrace() throw new RuntimeException("查詢偏移量出錯!") }finally { if (rs != null){ rs.close() } if (ps != null){ ps.close() } if (conn != null){ conn.close() } } //將可變map轉為不可變map offsetsMap.toMap } //在一個事務中,寫入結果和偏移量 def writeResultAndOffsetsToMysql(result: Array[(String, Int)], offsetRanges: Array[OffsetRange]): Unit = { val sql1:String = """ |INSERT INTO | `wordcount` VALUES(?,?) |ON DUPLICATE KEY UPDATE `count`= COUNT + VALUES(COUNT) | |""".stripMargin val sql2:String = """ |INSERT INTO | `offsets` VALUES(?,?,?,?) | ON DUPLICATE KEY UPDATE `offset`= VALUES(OFFSET) | |""".stripMargin var conn:Connection=null var ps1:PreparedStatement=null var ps2:PreparedStatement=null try { conn=JDBCUtil.getConnection() //取消事務的自動提交 ,只有取消了自動提交,才能將多次寫操作組合為一個事務,手動提交 conn.setAutoCommit(false) ps1=conn.prepareStatement(sql1) ps2=conn.prepareStatement(sql2) for ((word, count) <- result) { ps1.setString(1,word) ps1.setInt(2,count) ps1.addBatch() } //一批insert執行一次 ps1.executeBatch() //模擬異常 //1 / 0 for (offsetRange <- offsetRanges) { ps2.setString(1,groupId) ps2.setString(2,offsetRange.topic) ps2.setInt(3,offsetRange.partition) ps2.setLong(4,offsetRange.untilOffset) ps2.addBatch() } ps2.executeBatch() //手動提交事務 conn.commit() }catch { case e:Exception => e.printStackTrace() //回滾事務 conn.rollback() //重啟app ,暫時以停止代替 streamingContext.stop(true) }finally { if (ps1 != null){ ps1.close() } if (ps2 != null){ ps2.close() } if (conn != null){ conn.close() } } } }
6)測試
bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092
7)結果
第4章 DStream轉換
DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關的原語。
4.1 無狀態轉化操作
無狀態轉化操作就是把簡單的RDD轉化操作應用到每個批次上,也就是轉化DStream中的每一個RDD。部分無狀態轉化操作列在了下表中。注意,針對鍵值對的DStream轉化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
需要記住的是,盡管這些函數看起來像作用在整個流上一樣,但事實上每個DStream在內部是由許多RDD(批次)組成,且無狀態轉化操作是分別應用到每個RDD上的。
例如:reduceByKey()會歸約每個時間區間中的數據,但不會歸約不同區間之間的數據。
4.1.1 Transform
Transform允許DStream上執行任意的RDD-to-RDD函數。即使這些函數並沒有在DStream的API中暴露出來,通過該函數可以方便的擴展Spark API。該函數每一批次調度一次。其實也就是對DStream中的RDD應用轉換。
package com.yuange.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 21:34 */ object Transform { def main(args: Array[String]): Unit = { //創建SparkConf val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") //創建StreamingContext val ssc = new StreamingContext(sparkConf,Seconds(4)) //創建DStream val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103",3333) //轉化為RDD ds.transform(rdd => { val words: RDD[String] = rdd.flatMap(_.split("\t")) val wordAndOne: RDD[(String,Int)] = words.map((_ , 1)) val values: RDD[(String,Int)] = wordAndOne.reduceByKey(_ + _) values }) //打印 ds.print() //啟動 ssc.start() ssc.awaitTermination() } }
4.1.2 Join
兩個流之間的join需要兩個流的批次大小一致,這樣才能做到同時觸發計算。計算過程就是對當前批次的兩個流中各自的RDD進行join,與兩個RDD的join效果相同。
package com.yuange.sparkstreaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 21:51 */ object JoinTest { def main(args: Array[String]): Unit = { val streamingContext = new StreamingContext("local[*]", "wc", Seconds(5)) val ds1: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103", 3333) val ds2: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103", 3334) //轉換為K-V類型 val ds3: DStream[(String, Int)] = ds1.map((_, 1)) val ds4: DStream[(String, Int)] = ds2.map((_, 2)) // 只能Join同一個批次的兩個RDD中的數據 val ds5: DStream[(String, (Int, Int))] = ds3.join(ds4) ds5.print(1000) // ⑤真正的計算,會在啟動了app之后運行 streamingContext.start() // ⑥流式應用,需要一直運行(類似agent) 不能讓main運行完,阻塞main streamingContext.awaitTermination() } }
4.2 有狀態轉化操作
4.2.1 UpdateStateByKey
UpdateStateByKey原語用於記錄歷史記錄,有時,我們需要在DStream中跨批次維護狀態(例如流計算中累加wordcount)。針對這種情況,updateStateByKey()為我們提供了對一個狀態變量的訪問,用於鍵值對形式的DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的 DStream,其內部數據為(鍵,狀態) 對。
updateStateByKey() 的結果會是一個新的DStream,其內部的RDD 序列是由每個時間區間對應的(鍵,狀態)對組成的。
updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功能,需要做下面兩步:
1. 定義狀態,狀態可以是一個任意的數據類型。
2. 定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。
使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來保存狀態。
1) 編寫代碼
package com.yuange.sparkstreaming import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 22:30 */ object UpdateStateByKeyTest { def main(args: Array[String]): Unit = { // 定義更新狀態方法,參數values為當前批次單詞頻度,state為以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val ssc = new StreamingContext("local[*]","UpdateStateByKeyTest",Seconds(5)) ssc.checkpoint("./ck") val word = ssc.socketTextStream("hadoop103",3333) // 使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數 val stateDS = word.flatMap(_.split(" ")).map(r => (r,1)).updateStateByKey[Int](updateFunc) stateDS.print() ssc.start() ssc.awaitTermination() } }
2) 啟動程序並向3333端口發送數據
nc -lk 3333
Hello World
Hello Scala
3) 結果展示
4.2.2 WindowOperations
Window Operations可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的允許狀態。所有基於窗口的操作都需要兩個參數,分別為窗口時長以及滑動步長。
- 窗口時長:計算內容的時間范圍;
- 滑動步長:隔多久觸發一次計算。
注意:這兩者都必須為采集周期大小的整數倍。
WordCount第三版:3秒一個批次,窗口12秒,滑步6秒。
package com.yuange.sparkstreaming import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/2 22:41 */ object WindowOperations { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[*]","WindowOperations",Seconds(3)) ssc.checkpoint("./ck") val word = ssc.socketTextStream("hadoop103",3333) val word2 = word.flatMap(_.split(" ")).map(r => (r,1)).reduceByKeyAndWindow((a: Int,b: Int) => (a+b),Seconds(12),Seconds(6)) word2.print() ssc.start() ssc.awaitTermination() } }
關於Window的操作還有如下方法:
(1)window(windowLength, slideInterval): 基於對源DStream窗化的批次進行計算返回一個新的Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一個滑動窗口計數流中的元素個數;
(3)reduceByWindow(func, windowLength, slideInterval): 通過使用自定義函數整合滑動區間流元素來創建一個新的單元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 當在一個(K,V)對的DStream上調用此函數,會返回一個新(K,V)對的DStream,此處通過對滑動窗口中批次數據使用reduce函數來整合每個key的value值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 這個函數是上述函數的變化版本,每個窗口的reduce值都是通過用前一個窗的reduce值來遞增計算。通過reduce進入到滑動窗口數據並”反向reduce”離開窗口的舊數據來實現這個操作。一個例子是隨着窗口滑動對keys的“加”“減”計數。通過前邊介紹可以想到,這個函數只適用於”可逆的reduce函數”,也就是這些reduce函數有相應的”反reduce”函數(以參數invFunc形式傳入)。如前述函數,reduce任務的數量通過可選參數來配置。
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) //加上新進入窗口的批次中的元素 //移除離開窗口的老批次中的元素 //窗口時長// 滑動步長
countByWindow()和countByValueAndWindow()作為對數據進行計數操作的簡寫。countByWindow()返回一個表示每個窗口中元素個數的DStream,而countByValueAndWindow()返回的DStream則包含窗口中每個值的個數。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
第5章 DStream輸出
輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)。與RDD中的惰性求值類似,如果一個DStream及其派生出的DStream都沒有被執行輸出操作,那么這些DStream就都不會被求值。如果StreamingContext中沒有設定輸出操作,整個context就都不會啟動。
輸出操作如下:
- print():在運行流程序的驅動結點上打印DStream中每一批次數據的最開始10個元素。這用於開發和調試。在Python API中,同樣的操作叫print()。
- saveAsTextFiles(prefix, [suffix]):以text文件形式存儲這個DStream的內容。每一批次的存儲文件名基於參數中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
- saveAsObjectFiles(prefix, [suffix]):以Java對象序列化的方式將Stream中的數據保存為 SequenceFiles . 每一批次的存儲文件名基於參數中的為"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
- saveAsHadoopFiles(prefix, [suffix]):將Stream中的數據保存為 Hadoop files. 每一批次的存儲文件名基於參數中的為"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。
- foreachRDD(func):這是最通用的輸出操作,即將函數 func 用於產生於 stream的每一個RDD。其中參數傳入的函數func應該實現將每一個RDD中數據推送到外部系統,如將RDD存入文件或者通過網絡將其寫入數據庫。
通用的輸出操作foreachRDD(),它用來對DStream中的RDD運行任意計算。這和transform() 有些類似,都可以讓我們訪問任意RDD。在foreachRDD()中,可以重用我們在Spark中實現的所有行動操作。比如,常見的用例之一是把數據寫到諸如MySQL的外部數據庫中。
注意:
1) 連接不能寫在driver層面(序列化)
2) 如果寫在foreach則每個RDD中的每一條數據都創建,得不償失;
3) 增加foreachPartition,在分區創建(獲取)。
第6章 優雅關閉
流式任務需要7*24小時執行,但是有時涉及到升級代碼需要主動停止程序,但是分布式程序,沒辦法做到一個個進程去殺死,所有配置優雅的關閉就顯得至關重要了。
使用外部文件系統來控制內部程序關閉。
1)MonitorStop
package com.yuange.sparkstreaming import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.streaming.{StreamingContext, StreamingContextState} /** * @作者:袁哥 * @時間:2021/7/3 11:28 */ class MonitorStop(ssc: StreamingContext) extends Runnable{ override def run(): Unit = { val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration(),"atguigu") while (true) { try{ Thread.sleep(5000) }catch { case e: Exception => { e.printStackTrace() } } val state: StreamingContextState = ssc.getState val bool: Boolean = fs.exists(new Path("hdfs://hadoop102:8020/stopSpark")) if (bool){ if (state == StreamingContextState.ACTIVE){ ssc.stop(stopSparkContext = true,stopGracefully = true) System.exit(0) } } } } }
2)SparkTest
package com.yuange.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @作者:袁哥 * @時間:2021/7/3 11:38 */ object SparkTest { def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = { val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int],status: Option[Int]) => { //當前批次的內容 val sum: Int = values.sum //取出狀態信息中上一次狀態 val lastStatu: Int = status.getOrElse(0) Some(sum + lastStatu) } val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest") //設置優雅的關閉 sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(sparkConf,Seconds(5)) ssc.checkpoint("./ck") val word: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103",3333) val word2 = word.flatMap(_.split(" ")).map((_,1)).updateStateByKey(update) word2.print() ssc } def main(args: Array[String]): Unit = { val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",() => createSSC()) new Thread(new MonitorStop(ssc)).start() ssc.start() ssc.awaitTermination() } }