雖然SparkStreaming已經停止更新,Spark的重點也放到了 Structured Streaming ,但由於Spark版本過低或者其他技術選型問題,可能還是會選擇SparkStreaming。
SparkStreaming對於時間窗口,事件時間雖然支撐較少,但還是可以滿足部分的實時計算場景的,SparkStreaming資料較多,這里也做一個簡單介紹。
一. 什么是Spark Streaming
Spark Streaming在當時是為了與當時的Apache Storm競爭,也讓Spark可以用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
當然Storm目前已經漸漸淡出,Flink開始大放異彩。
Spark與Storm的對比
二、SparkStreaming入門
Spark Streaming 是 Spark Core API 的擴展,它支持彈性的,高吞吐的,容錯的實時數據流的處理。數據可以通過多種數據源獲取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通過例如 map
,reduce
,join
,window
等的高級函數組成的復雜算法處理。最終,處理后的數據可以輸出到文件系統,數據庫以及實時儀表盤中。事實上,你還可以在 data streams(數據流)上使用 [機器學習] 以及 [圖計算] 算法。
在內部,它工作原理如下,Spark Streaming 接收實時輸入數據流並將數據切分成多個 batch(批)數據,然后由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果)。
Spark Streaming 提供了一個名為 discretized stream 或 DStream 的高級抽象,它代表一個連續的數據流。DStream 可以從數據源的輸入數據流創建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上進行高層次的操作以創建。在內部,一個 DStream 是通過一系列的 [RDDs] 來表示。
本指南告訴你如何使用 DStream 來編寫一個 Spark Streaming 程序。你可以使用 Scala,Java 或者 Python(Spark 1.2 版本后引進)來編寫 Spark Streaming 程序。
在idea中新建maven項目
引入依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.4</version>
</dependency>
Project Structure —— Global Libraries —— 把scala 添加到 add module
新建Scala Class
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Demo {
//屏蔽日志
Logger.getLogger("org.apache")setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
//local會有問題 最少兩個線程 一個拿數據 一個計算
//val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
//時間間隔
val ssc = new StreamingContext(conf,Seconds(1))
//接收數據 處理
//socket demo
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = value.flatMap(_.split(" "))
val wordsTuple: DStream[(String, Int)] = words.map((_, 1))
val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _)
//觸發action
wordcount.print()
ssc.start()
//保持流的運行 等待程序被終止
ssc.awaitTermination()
}
}
測試
下載一個win10 用的netcat
https://eternallybored.org/misc/netcat/
解壓 在目錄下啟動cmd
輸入
nc -L -p 9999
開始輸入單詞 在idea中驗證接收
原理
初始化StreamingContext
為了初始化一個 Spark Streaming 程序,一個 StreamingContext 對象必須要被創建出來,它是所有的 Spark Streaming 功能的主入口點。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
參數是展示在集群 UI 界面上的應用程序的名稱
master
是local 或者spark集群的url(mesos yarn)
本地測試可以用local[*] 注意要多於兩個線程
Second(1)定義的是batch interval 批處理間隔 就是間隔多久去拿一次數據
在定義一個 context 之后,您必須執行以下操作。
- 通過創建輸入 DStreams 來定義輸入源。
- 通過應用轉換和輸出操作 DStreams 定義流計算(streaming computations)。
- 開始接收輸入並且使用
streamingContext.start()
來處理數據。 - 使用
streamingContext.awaitTermination()
等待處理被終止(手動或者由於任何錯誤)。 - 使用
streamingContext.stop()
來手動的停止處理。
需要記住的幾點:
- 一旦一個 context 已經啟動,將不會有新的數據流的計算可以被創建或者添加到它。
- 一旦一個 context 已經停止,它不會被重新啟動。
- 同一時間內在 JVM 中只有一個 StreamingContext 可以被激活。
- 在 StreamingContext 上的 stop() 同樣也停止了 SparkContext。為了只停止 StreamingContext,設置
stop()
的可選參數,名叫stopSparkContext
為 false。 - 一個 SparkContext 就可以被重用以創建多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被創建之前停止(不停止 SparkContext)。
Discretized Stream or DStream
Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象。它代表了一個連續的數據流。可能是數據源接收的流,也可能是轉換后的流。
DStream就是多個和時間相關的一系列連續RDD的集合,比如本例就是間隔一秒的一堆RDD的集合
DStream也是有依賴關系的
flatMap 操作也是直接作用在DStream上的,就和作用於RDD一樣 這樣很好理解
我們先來看數據源接收的流 這種叫做Input DStreams 他會通過Receivers接收器去不同的數據源接收數據。
Spark Streaming內置了兩種數據源:
- 基礎的數據源:比如剛才用的socket接收 還有file systems
- 高級的數據源:比如kafka 還有flume kinesis等等
注意本地運行時,不要用local或者local[1],一個線程不夠。放到集群上時分配給SparkStreaming的核數必須大於接收器的數量,留一個核去處理數據。
我們也可以自定義數據源,那我們就需要自己開發一個接收器。
Transformations
在我們接收到Dstreams之后可以進行轉換操作,常見轉換如下:
Transformation(轉換) | Meaning(含義) |
---|---|
map(func) | 利用函數 func 處理原 DStream 的每個元素,返回一個新的 DStream。 |
flatMap(func) | 與 map 相似,但是每個輸入項可用被映射為 0 個或者多個輸出項。。 |
filter(func) | 返回一個新的 DStream,它僅僅包含原 DStream 中函數 func 返回值為 true 的項。 |
repartition(numPartitions) | 通過創建更多或者更少的 partition 以改變這個 DStream 的並行級別(level of parallelism)。 |
union(otherStream) | 返回一個新的 DStream,它包含源 DStream 和 otherDStream 的所有元素。 |
count() | 通過 count 源 DStream 中每個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream。 |
reduce(func) | 利用函數 func 聚集源 DStream 中每個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函數應該是相關聯的,以使計算可以並行化。 |
countByValue() | 在元素類型為 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每個 key 的值是在原 DStream 的每個 RDD 中的次數。 |
reduceByKey(func, [numTasks]) | 當在一個由 (K,V) pairs 組成的 DStream 上調用這個算子時,返回一個新的,由 (K,V) pairs 組成的 DStream,每一個 key 的值均由給定的 reduce 函數聚合起來。注意:在默認情況下,這個算子利用了 Spark 默認的並發任務數去分組。你可以用 numTasks 參數設置不同的任務數。 |
join(otherStream, [numTasks]) | 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream。 |
cogroup(otherStream, [numTasks]) | 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組)。 |
transform(func) | 通過對源 DStream 的每個 RDD 應用 RDD-to-RDD 函數,創建一個新的 DStream。這個可以在 DStream 中的任何 RDD 操作中使用。 |
updateStateByKey(func) | 返回一個新的 "狀態" 的 DStream,其中每個 key 的狀態通過在 key 的先前狀態應用給定的函數和 key 的新 valyes 來更新。這可以用於維護每個 key 的任意狀態數據。 |
這里我們特別介紹一下updateStateByKey
我們如果需要對歷史數據進行統計,可能需要去kafka里拿一下之前留存的數據,也可以用updateStateByKey這個方法。
//保存狀態 聚合相同的單詞
val wordcount = wordsTuple.updateStateByKey[Int](
//updateFunction _
(newValues: Seq[Int], runningCount: Option[Int])=> {
val newCount = Some(newValues.sum + runningCount.getOrElse(0))
newCount
}
)
比如剛才的單詞計數,我們只能統計每一次發過來的消息,但是如果希望統計多次消息就需要用到這個,我們要指定一個checkpoint,就是從哪開始算。
//增加成員變量
val checkpointDir = "./ckp"
//在方法中加入checkpoint
ssc.checkpoint(checkpointDir)
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍
這時候我們建立StreamingContext的方法就要改變了 我們把剛才的創建過程提取成方法。
def creatingFunc():StreamingContext = {
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(checkpointDir)
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍
val words: DStream[String] = value.flatMap(_.split(" "))
val wordsTuple: DStream[(String, Int)] = words.map((_, 1))
//保存狀態 聚合相同的單詞
val wordcount = wordsTuple.updateStateByKey[Int](
//updateFunction _
(newValues: Seq[Int], runningCount: Option[Int])=> {
val newCount = Some(newValues.sum + runningCount.getOrElse(0))
newCount
}
)
//觸發action
wordcount.print()
ssc
}
在mian函數中修改為:
def main(args: Array[String]): Unit = {
val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _)
ssc.start()
//保持流的運行 等待程序被終止
ssc.awaitTermination()
}
這樣就是,如果有checkpoint,程序會在checkpoint中把程序加載回來(程序被保存為二進制),沒有checkpoint的話才會創建。
將目錄下的checkpoint刪除,就可以將狀態刪除。
生產中updateStateByKey由於會將數據備份要慎重使用,可以考慮用hbase,redis等做替代。或者借助kafka做聚合處理。
//如果不用updatestateByKey 可以考慮redis
wordsTuple.foreachRDD(rdd => {
rdd.foreachPartition(i =>
{
//redis
}
)
})
窗口操作
Spark Streaming 也支持 _windowed computations(窗口計算),它允許你在數據的一個滑動窗口上應用 transformation(轉換)。
如上圖顯示,窗口在源 DStream 上 _slides(滑動),任何一個窗口操作都需要指定兩個參數:
- window length(窗口長度) - 窗口的持續時間。
- sliding interval(滑動間隔) - 執行窗口操作的間隔。
比如計算過去30秒的詞頻:
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個參數 - windowLength(窗口長度) 和 slideInterval(滑動的時間間隔)。
Transformation(轉換) | Meaning(含義) |
---|---|
window(windowLength, slideInterval) | 返回一個新的 DStream,它是基於 source DStream 的窗口 batch 進行計算的。 |
countByWindow(windowLength, slideInterval) | 返回 stream(流)中滑動窗口元素的數 |
reduceByWindow(func, windowLength, slideInterval) | 返回一個新的單元素 stream(流),它通過在一個滑動間隔的 stream 中使用 func 來聚合以創建。該函數應該是 associative(關聯的)且 commutative(可交換的),以便它可以並行計算 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, V) pairs 的 Stream,其中的每個 key 的 values 是在滑動窗口上的 batch 使用給定的函數 func 來聚合產生的。Note(注意): 默認情況下,該操作使用 Spark 的默認並行任務數量(local model 是 2,在 cluster mode 中的數量通過 spark.default.parallelism 來確定)來做 grouping。您可以通過一個可選的 numTasks 參數來設置一個不同的 tasks(任務)數量。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上述 reduceByKeyAndWindow() 的更有效的一個版本,其中使用前一窗口的 reduce 值逐漸計算每個窗口的 reduce值。這是通過減少進入滑動窗口的新數據,以及 “inverse reducing(逆減)” 離開窗口的舊數據來完成的。一個例子是當窗口滑動時”添加” 和 “減” keys 的數量。然而,它僅適用於 “invertible reduce functions(可逆減少函數)”,即具有相應 “inverse reduce(反向減少)” 函數的 reduce 函數(作為參數 invFunc </ i>)。像在 reduceByKeyAndWindow 中的那樣,reduce 任務的數量可以通過可選參數進行配置。請注意,針對該操作的使用必須啟用 checkpointing. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, Long) pairs 的 DStream,其中每個 key 的 value 是它在一個滑動窗口之內的頻次。像 code>reduceByKeyAndWindow 中的那樣,reduce 任務的數量可以通過可選參數進行配置。 |
Join操作
在 Spark Streaming 中可以執行不同類型的 join
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
//也可以用窗口
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
DStreams輸出操作
輸出操作允許將 DStream 的數據推送到外部系統,如數據庫或文件系統。
會觸發所有變換的執行,類似RDD的action操作。有如下操作:
Output Operation | Meaning |
---|---|
print() | 在運行流應用程序的 driver 節點上的DStream中打印每批數據的前十個元素。這對於開發和調試很有用。 |
Python API 這在 Python API 中稱為 pprint()。 | |
saveAsTextFiles(prefix, [suffix]) | 將此 DStream 的內容另存為文本文件。每個批處理間隔的文件名是根據 前綴 和 后綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
saveAsObjectFiles(prefix, [suffix]) | 將此 DStream 的內容另存為序列化 Java 對象的 SequenceFiles 。每個批處理間隔的文件名是根據 前綴 和 后綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
Python API 這在Python API中是不可用的。 | |
saveAsHadoopFiles(prefix, [suffix]) | 將此 DStream 的內容另存為 Hadoop 文件。每個批處理間隔的文件名是根據 前綴 和 后綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
Python API 這在Python API中是不可用的。 | |
foreachRDD(func) | 對從流中生成的每個 RDD 應用函數 func 的最通用的輸出運算符。此功能應將每個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或將其通過網絡寫入數據庫。請注意,函數 func 在運行流應用程序的 driver 進程中執行,通常會在其中具有 RDD 動作,這將強制流式傳輸 RDD 的計算。 |
foreachRDD設計模式使用
dstream.foreachRDD允許將數據發送到外部系統。
但我們不要每次都創建一個連接,解決方案如下:
減少開銷,分區分攤開銷
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
更好的做法是用靜態資源池:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
連接Kafka
Apache Kafka是一個高性能的消息系統,由Scala 寫成。是由Apache 軟件基金會開發的一個開源消息系統項目。
Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待(低延時)的平台。
更多kafka相關請查看Kafka入門寶典(詳細截圖版)
Spark Streaming 2.4.4兼容 kafka 0.10.0 或者更高的版本
Spark Streaming在2.3.0版本之前是提供了對kafka 0.8 和 0.10的支持的 ,不過在2.3.0以后對0.8的支持取消了。
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
API Maturity | Deprecated | Stable |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API | No | Yes |
Dynamic Topic Subscription | No | Yes |
Receiver
這里簡單介紹一下對kafka0.8的一種支持方式:基於Receiver
依賴:
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.12
version = 2.4.4
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
這種情況 程序停掉數據會丟失,為了不丟失自己又寫了一份,這種是很多余的。
由於采用了kafka高階api,偏移量offset不可控。
Direct
Kafka 0.10.0版本以后,采用了更好的一種Direct方式,這種我們需要自己維護偏移量offset。
直連方式 並行度會更高 生產環境用的最多,0.8版本需要在zk或者redis等地方自己維護偏移量。我們使用0.10以上版本支持自己設置偏移量,我們只需要自己將偏移量寫回kafka就可以。
依賴
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4
kafka 0.10以后 可以將offset寫回kafka 我們不需要自己維護offset了,具體代碼如下:
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
val ssc = new StreamingContext(conf,Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
//latest none earliest
"auto.offset.reset" -> "earliest",
//自動提交偏移量 false
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//val topics = Array("topicA", "topicB")
val topics = Array("test_topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
// 與kafka broker不在一個節點上 用不同策略
//在一個節點用 PreferBrokers策略 很少見
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
//普通的RDD不能強轉HasOffsetRanges 但kafkaRDD有 with這個特性 可以強轉
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//處理數據 計算邏輯
rdd.foreachPartition { iter =>
//一次處理一個分區的數據 獲取這個分區的偏移量
//計算完以后修改偏移量 要開啟事務 類似數據庫 connection -> conn.setAutoCommit(false) 各種操作 conn.commit(); conn.rollback()
//獲取偏移量 如果要自己記錄的話這個
//val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
//println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//處理數據
iter.foreach(println)
}
//kafka 0.10新特性 處理完數據后 將偏移量寫回kafka
// some time later, after outputs have completed
//kafka有一個特殊的topic 保存偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
更多Flink,Kafka,Spark等相關技術博文,科技資訊,歡迎關注實時流式計算 公眾號后台回復 “電子書” 下載300頁Flink實戰電子書