第1章 Spark Streaming 概述1.1 什么是 Spark Streaming1.2 為什么要學習 Spark Streaming1.3 Spark 與 Storm 的對比第2章 運行 Spark Streaming第3章 架構與抽象第4章 Spark Streaming 解析4.1 初始化 StreamingContext4.2 什么是 DStreams4.3 DStream 的輸入4.3.1 基本數據源4.3.2 高級數據源4.4 DStream 的轉換4.4.1 無狀態轉化操作4.4.2 有狀態轉化操作4.4.3 重要操作4.5 DStream 的輸出4.6 累加器和廣播變量4.7 DataFrame ans SQL Operations4.8 Caching / Persistence4.9 不間斷運行 7x24 小時4.9.1 檢查點機制4.9.2 驅動器程序容錯4.9.3 工作節點容錯4.9.4 接收器容錯4.9.5 處理保證4.10 性能考量
第1章 Spark Streaming 概述
1.1 什么是 Spark Streaming
![]()
Spark Streaming 類似於 Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming 有高吞吐量和容錯能力強等特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入后可以用 Spark 的高度抽象,如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。
![]()
和 Spark 基於 RDD 的概念很相似,Spark Streaming 使用離散化流(discretized stream)作為抽象表示,叫作 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此得名“離散化”)。
![]()
DStream 可以從各種輸入源創建,比如 Flume、Kafka 或者 HDFS。創建出來的 DStream 支持兩種操作,一種是轉化操作(transformation),會生成一個新的 DStream,另一種是輸出操作(output operation),可以把數據寫入外部系統中。DStream 提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時間相關的新操作,比如滑動窗口。
Spark Streaming 的關鍵抽象
![]()
DStream:Discretized Stream 離散化流
![]()
1.2 為什么要學習 Spark Streaming
1、易用
2、容錯
3、易整合到 Spark 體系![]()
1.3 Spark 與 Storm 的對比
第2章 運行 Spark Streaming
通過 IDEA 編寫程序
pom.xml 加入以下依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!-- provided 表示編譯期可用,運行期不可用 -->
<!--<scope>provided</scope>-->
</dependency>
示例代碼如下:
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val results = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
results.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
ssc.stop()
}
}
安裝 Netcat 后,參考文章鏈接:https://www.cnblogs.com/chenmingjun/p/10785438.html
先啟動 Netcat,然后通過 Netcat 發送數據:
$ nc -l -p 9999 #監聽9999端口
hello world #運行 jar 包后,發送測試數據
再按照 Spark Core 中的方式進行打包,並將程序上傳到Spark機器上。並運行:
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.WorldCount /opt/software/sparkjars/networdcount-jar-with-dependencies.jar
注意:如果程序運行時,log 日志太多,可以將 spark 的 conf 目錄下的 log4j 文件里面的日志級別改成 WARN。
第3章 架構與抽象
Spark Streaming 使用“微批次”的架構,把流式計算當作一系列連續的小規模批處理來對待。Spark Streaming 從各種輸入源中讀取數據,並把數據分組為小的批次。新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次停止增長。時間區間的大小是由批次間隔這個參數決定的。批次間隔一般設在 500 毫秒到幾秒之間,由應用開發者配置。每個輸入批次都形成一個 RDD,以 Spark 作業的方式處理並生成其他的 RDD。處理的結果可以以批處理的方式傳給外部系統。高層次的架構如下圖所示:
![]()
Spark Streaming 的編程抽象是離散化流,也就是 DStream。它是一個 RDD 序列,每個 RDD 代表數據流中一個時間片內的數據。
![]()
Spark Streaming 在 Spark 的驅動器程序 -- 工作節點的結構的執行過程如下圖所示。Spark Streaming 為每個輸入源啟動對應的接收器。接收器以任務的形式運行在應用的執行器進程中,從輸入源收集數據並保存為 RDD。它們收集到輸入數據后會把數據復制到另一個執行器進程來保障容錯性(默認行為)。數據保存在執行器進程的內存中,和緩存 RDD 的方式一樣。驅動器程序中的 StreamingContext 會周期性地運行 Spark 作業來處理這些數據,把數據與之前時間區間中的 RDD 進行整合。
![]()
第4章 Spark Streaming 解析
4.1 初始化 StreamingContext
源碼:
import org.apache.spark._
import org.apache.spark.streaming._
// 可以通過 ssc.sparkContext 來訪問 SparkContext
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
// 或者通過已經存在的 SparkContext 來創建 StreamingContext
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
初始化完 Context 之后:
1)定義消息輸入源來創建 DStreams。
2)定義 DStreams 的轉化操作和輸出操作。
3)通過 streamingContext.start() 來啟動消息采集和處理.
4)等待程序終止,可以通過 streamingContext.awaitTermination() 來設置。
5)通過 streamingContext.stop() 來手動終止處理程序。
注意:
StreamingContext 一旦啟動,對 DStreams 的操作就不能修改了。
在同一時間一個 JVM 中只有一個 StreamingContext 可以啟動。
stop() 方法將同時停止 SparkContext,可以傳入參數 stopSparkContext 用於只停止 StreamingContext。
4.2 什么是 DStreams
Discretized Stream 是 Spark Streaming 的基礎抽象,代表持續性的數據流和經過各種 Spark 原語操作后的結果數據流。在內部實現上,DStream 是一系列連續的 RDD 來表示。每個 RDD 含有一段時間間隔內的數據,如下圖:
![]()
對數據的操作也是按照 RDD 為單位來進行的,如下圖:
![]()
計算過程由 Spark Engine 來完成,如下圖:
![]()
4.3 DStream 的輸入
Spark Streaming 原生支持一些不同的數據源。一些“核心”數據源已經被打包到 Spark Streaming 的 Maven 工件中,而其他的一些則可以通過 spark-streaming-kafka 等附加工件獲取。每個接收器都以 Spark 執行器程序中一個長期運行的任務的形式運行,因此會占據分配給應用的 CPU 核心。此外,我們還需要有可用的 CPU 核心來處理數據。這意味着如果要運行多個接收器,就必須至少有和接收器數目相同的核心數,還要加上用來完成計算所需要的核心數。例如,如果我們想要在流計算應用中運行 10 個接收器,那么至少需要為應用分配 11 個 CPU 核心。所以如果在本地模式運行,不要使用 local 或者 local[1]。
4.3.1 基本數據源
文件數據源(實際開發中這種方式用的比較少)
Socket 數據流前面的例子已經看到過。
文件數據流:能夠讀取所有 HDFS API 兼容的文件系統文件,通過 fileStream 方法進行讀取。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming 將會監控 dataDirectory 目錄並不斷處理移動進來的文件,注意:目前不支持嵌套目錄。
1)文件需要有相同的數據格式。
2)文件進入 dataDirectory 的方式需要通過移動或者重命名來實現。
3)一旦文件移動進目錄,則不能再修改,即便修改了也不會讀取新的數據。
如果文件比較簡單,則可以使用 streamingContext.textFileStream(dataDirectory) 方法來讀取文件。文件流不需要接收器,不需要單獨分配 CPU 核。
Hdfs 讀取實例:(需要提前在 HDFS 上建好目錄)
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4eb3b690
scala> val lines = ssc.textFileStream("hdfs://hadoop102:9000/data/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@14c7ab73
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@125bc00d
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4a3363c9
scala> wordCounts.print()
scala> ssc.start()
上傳文件到 HDFS 進行測試:
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -mkdir /data/
[atguigu@hadoop102 hadoop-2.7.2]$ ls
bin data etc include input lib libexec LICENSE.txt logs NOTICE.txt README.txt safemode.sh sbin share wcinput wcoutput
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./LICENSE.txt /data/
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./README.txt /data/
獲取計算結果:
-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(“control”,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time: 1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time: 1504665741000 ms
-------------------------------------------
自定義數據源(實際開發中用的較多)
通過繼承 Receiver,並實現 onStart、onStop 方法來自定義數據源采集。
// Receiver 需要提供一個類型參數,該類型參數是 Receiver 接收到的數據的類型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
// 定義一個新的線程去執行 receive() 方法
override def run() {
receive()
}
}.start()
}
override def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/**
* Create a socket connection and receive data until receiver is stopped
*/
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
// 獲取 Socket 的輸入對象
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
// 當 Receiver 沒有停止並且 userInput 不為空
while (!isStopped && userInput != null) {
// 通過 store() 方法將獲取到的 userInput 提交給 Spark 框架
store(userInput)
// 再獲取下一條
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
可以通過 streamingContext.receiverStream(<instance of custom receiver>) 來使用自定義的數據采集源。
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
模擬 Spark 內置的 Socket 鏈接,全部源碼如下:
package com.atguigu.streaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
// Receiver 需要提供一個類型參數,該類型參數是 Receiver 接收到的數據的類型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
// 定義一個新的線程去執行 receive() 方法
override def run() {
receive()
}
}.start()
}
override def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/**
* Create a socket connection and receive data until receiver is stopped
*/
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
// 獲取 Socket 的輸入對象
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
// 當 Receiver 沒有停止並且 userInput 不為空
while (!isStopped && userInput != null) {
// 通過 store() 方法將獲取到的 userInput 提交給 Spark 框架
store(userInput)
// 再獲取下一條
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
object CustomReceiverDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.receiverStream(new CustomReceiver("hadoop102", 9999))
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
先啟動 Netcat,然后通過 Netcat 發送數據:
$ nc -l -p 9999 #監聽9999端口
hello world #運行 jar 包后,發送測試數據
按照 Spark Core 中的方式進行打包,並將程序上傳到Spark機器上。並運行:
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.CustomReceiverDemo /opt/software/sparkjars/sparkstreaming_customerReceiver-1.0-SNAPSHOT-jar-with-dependencies.jar
輸出結果截圖:
RDD 隊列(用在 Spark Streaming 與 RDD 的結合時,即混合程序)
測試過程中,可以通過使用 streamingContext.queueStream(queueOfRDDs) 來創建 DStream,每一個推送到這個隊列中的 RDD,都會作為一個 DStream 處理。
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueRdd {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
val ssc = new StreamingContext(conf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
// 創建 RDD 隊列
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// 創建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
// 處理隊列中的 RDD 數據
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
// 打印結果
reducedStream.print()
// 啟動計算
ssc.start()
// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
// 通過程序停止 StreamingContext 的運行
// ssc.stop()
}
}
}
運行jar 包
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar
輸出結果如下:
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar
19/04/28 20:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1556454615000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1556454616000 ms
-------------------------------------------
-------------------------------------------
Time: 1556454617000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1556454618000 ms
-------------------------------------------
......
4.3.2 高級數據源
除核心數據源外,還可以用附加數據源接收器來從一些知名數據獲取系統中接收的數據,這些接收器都作為 Spark Streaming 的組件進行獨立打包了。它們仍然是 Spark 的一部分,不過你需要在構建文件中添加額外的包才能使用它們。現有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及 ZeroMQ。可以通過添加與 Spark 版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 來引入這些附加接收器。
Apache Kafka
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 來使用它。包內提供的 KafkaUtils 對象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息創建出 DStream。由於 KafkaUtils 可以訂閱多個主題,因此它創建出的 DStream 由成對的主題和消息組成。要創建出一個流數據,需要使用 StreamingContext 實例、一個由逗號隔開的 ZooKeeper 主機列表字符串、消費者組的名字(唯一名字),以及一個從主題到針對這個主題的接收器線程數的映射表來調用 createStream() 方法。
import org.apache.spark.streaming.kafka._
...
// 創建一個從主題到接收器線程數的映射表
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
topicLines.map(_._2)
下面我們進行一個實例,演示 SparkStreaming 如何從 Kafka 讀取消息,以及如何通過連接池方法把消息處理完成后再寫回 Kafka:
pom.xml 需要加入的依賴如下:
<!-- 用來提供對象連接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 用來連接 Kafka 的工具類 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
kafka Connection Pool 程序:
package com.atguigu.streaming
import java.util.Properties
import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
// 自定義的樣例類(是池化的對象)
case class KafkaProducerProxy(brokerList: String,
producerConfig: Properties = new Properties,
defaultTopic: Option[String] = None,
producer: Option[KafkaProducer[String, String]] = None) {
type Key = String
type Val = String
require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
private val p = producer getOrElse {
val props: Properties = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
new KafkaProducer[String, String](props)
}
// 把我要發送的消息包裝成了 ProducerRecord
private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
require(!t.isEmpty, "Topic must not be empty")
key match {
case Some(k) => new ProducerRecord(t, k, value)
case _ => new ProducerRecord(t, value)
}
}
def send(key: Key, value: Val, topic: Option[String] = None) {
// 調用 KafkaProducer 對象的 send 方法來發送消息
p.send(toMessage(value, Option(key), topic))
}
def send(value: Val, topic: Option[String]) {
send(null, value, topic)
}
def send(value: Val, topic: String) {
send(null, value, Option(topic))
}
def send(value: Val) {
send(null, value, None)
}
def shutdown(): Unit = p.close()
}
abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
def newInstance(): KafkaProducerProxy
}
class BaseKafkaProducerFactory(brokerList: String,
config: Properties = new Properties,
defaultTopic: Option[String] = None)
extends KafkaProducerFactory(brokerList, config, defaultTopic) {
override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
}
// 繼承一個基礎的連接池,需要提供池化的對象類型
class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)
extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
// 用於連接池創建對象
override def create(): KafkaProducerProxy = factory.newInstance()
// 用於連接池包裝對象
override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
// 用於連接池銷毀對象
override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}
}
KafkaStreaming main:
package com.atguigu.streaming
import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 單例對象(即保證了 kafka 連接池只有一個)
object createKafkaProducerPool {
// 用於返回真正的對象池 GenericObjectPool
def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {
val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
// 指定 kafka 對象池的大小
val poolConfig = {
val c = new GenericObjectPoolConfig
val maxNumProducers = 10
c.setMaxTotal(maxNumProducers)
c.setMaxIdle(maxNumProducers)
c
}
// 返回一個對象池
new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
}
}
object KafkaStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 創建 topic
val brobrokers = "192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092" // kafka 集群的地址
val sourcetopic = "source"; // kafka 的隊列名稱
val targettopic = "target"; // kafka 的隊列名稱
// 創建消費者組
val group = "con-consumer-group"
// 消費者配置
val kafkaParam = Map(
"bootstrap.servers" -> brobrokers, // 用於初始化鏈接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
// 用於標識這個消費者屬於哪個消費團體
"group.id" -> group,
// 如果沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可以使用這個配置屬性
// 可以使用這個配置,latest 自動重置偏移量為最新的偏移量
"auto.offset.reset" -> "latest",
// 如果是 true,則這個消費者的偏移量會在后台自動提交
"enable.auto.commit" -> (false: java.lang.Boolean)
);
// ssc.sparkContext.broadcast(pool)
// 創建 DStream,返回接收到的輸入數據
val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(sourcetopic), kafkaParam))
// 每一個 stream 都是一個 ConsumerRecord
stream.map(s => ("id:" + s.key(), ">>>>:" + s.value())).foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// Get a producer from the shared pool
val pool = createKafkaProducerPool(brobrokers, targettopic)
val p = pool.borrowObject()
partitionOfRecords.foreach { message => System.out.println(message._2); p.send(message._2, Option(targettopic)) }
// Returning the producer to the pool also shuts it down
pool.returnObject(p)
})
})
ssc.start()
ssc.awaitTermination()
}
}
程序部署:
1、啟動 zookeeper 集群和 kafka 集群。
[atguigu@hadoop102 zookeeper-3.4.10]$ pwd
/opt/module/zookeeper-3.4.10
[atguigu@hadoop102 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop102 kafka]$ pwd
/opt/module/kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
2、創建兩個 topic,一個為 source,一個為 target
bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic source
bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic targe
3、啟動 kafka console producer 寫入 source topic
bin/kafka-console-producer.sh \
--broker-list 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic source
4、啟動 kafka console consumer 監聽 target topic
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic target
5、啟動 kafka Streaming 程序
[atguigu@hadoop102 ~]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.KafkaStreaming \
/opt/software/sparkjars/kafkastreaming-jar-with-dependencies.jar
6、程序運行截圖
生產者
Spark Stream
消費者
kafka 知識補充:
kafka 集群圖解
分片圖解
新舊 kafka 版本對比
Flume-ng
Spark 提供兩個不同的接收器來使用 Apache Flume(http://flume.apache.org)。
兩個接收器簡介如下。
• 推式接收器:該接收器以 Avro 數據池的方式工作,由 Flume 向其中推數據。
• 拉式接收器:該接收器可以從自定義的中間數據池中拉數據,而其他進程可以使用 Flume 把數據推進該中間數據池。
兩種方式都需要重新配置 Flume,並在某個節點配置的端口上運行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一種方法,都需要在工程中引入 Maven 工件 spark-streaming-flume_2.10。![]()
推式接收器的方法設置起來很容易,但是它不使用事務來接收數據。在這種方式中,接收器以 Avro 數據池的方式工作,我們需要配置 Flume 來把數據發到 Avro 數據池。我們提供的 FlumeUtils 對象會把接收器配置在一個特定的工作節點的主機名及端口號 上。這些設置必須和 Flume 配置相匹配。
![]()
雖然這種方式很簡潔,但缺點是沒有事務支持。這會增加運行接收器的工作節點發生錯誤 時丟失少量數據的幾率。不僅如此,如果運行接收器的工作節點發生故障,系統會嘗試從 另一個位置啟動接收器,這時需要重新配置 Flume 才能將數據發給新的工作節點。這樣配 置會比較麻煩。
較新的方式是拉式接收器(在Spark 1.1中引入),它設置了一個專用的Flume數據池供 Spark Streaming 讀取,並讓接收器主動從數據池中拉取數據。這種方式的優點在於彈性較 好,Spark Streaming 通過事務從數據池中讀取並復制數據。在收到事務完成的通知前,這些數據還保留在數據池中。
我們需要先把自定義數據池配置為 Flume 的第三方插件。安裝插件的最新方法請參考 Flume 文檔的相關部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party- plugins)。由於插件是用 Scala 寫的,因此需要把插件本身以及 Scala 庫都添加到 Flume 插件 中。Spark 1.1 中對應的 Maven 索引如下所示。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
當你把自定義 Flume 數據池添加到一個節點上之后,就需要配置 Flume 來把數據推送到這個數據池中。
a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = receiver-hostname
a1.sinks.spark.port = port-used-for-sync-not-spark-port
a1.sinks.spark.channel = memoryChannel
等到數據已經在數據池中緩存起來,就可以調用 FlumeUtils 來讀取數據了。
4.4 DStream 的轉換
DStream 上的原語與 RDD 的類似,分為 Transformations(轉換)和 Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform() 以及各種 Window 相關的原語。
DStream 的轉化操作可以分為無狀態(stateless)和有狀態(stateful)兩種。
• 在無狀態轉化操作中,每個批次的處理不依賴於之前批次的數據。常見的 RDD 轉化操作,例如 map()、filter()、reduceByKey() 等,都是無狀態轉化操作。
• 相對地,有狀態轉化操作需要使用之前批次的數據或者是中間結果來計算當前批次的數據。有狀態轉化操作包括基於滑動窗口的轉化操作和追蹤狀態變化的轉化操作。
4.4.1 無狀態轉化操作
![]()
無狀態轉化操作就是把簡單的 RDD 轉化操作應用到每個批次上,也就是轉化 DStream 中的每一個 RDD。部分無狀態轉化操作列在了下表中。注意,針對鍵值對的 DStream 轉化操作(比如 reduceByKey()) 要添加 import StreamingContext._ 才能在 Scala 中使用。
![]()
需要記住的是,盡管這些函數看起來像作用在整個流上一樣,但
事實上每個 DStream 在內部是由許多 RDD(批次)組成,且無狀態轉化操作是分別應用到每個 RDD 上的。例如,reduceByKey() 會歸約每個時間區間中的數據,但不會歸約不同區間之間的數據。
舉個例子,在之前的 wordcount 程序中,我們只會統計1秒內接收到的數據的單詞個數,而不會累加。
無狀態轉化操作也能在多個 DStream 間整合數據,不過也是在各個時間區間內。例如,鍵值對 DStream 擁有和 RDD 一樣的與連接相關的轉化操作,也就是 cogroup()、join()、leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個批次分別執行了對應的 RDD 操作。
我們還可以像在常規的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個 DStream 的內容合並起來,也可以使用 StreamingContext.union() 來合並多個流。
4.4.2 有狀態轉化操作
特殊的 Transformations。
追蹤狀態變化 UpdateStateByKey
updateStateByKey 原語是
用於記錄歷史記錄,有時,我們需要在 DStream 中跨批次維護狀態(例如流計算中累加 wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個狀態變量的訪問,用於鍵值對形式的 DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的 DStream,其內部數據為(鍵,狀態)對。
updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每個時間區間對應的 (鍵,狀態) 對組成的。
updateStateByKey() 操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功能,你需要做下面兩步:
• 1)定義狀態,狀態可以是一個任意的數據類型。
• 2)定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。
使用 updateStateByKey 需要對檢查點目錄進行配置,會使用檢查點來保存狀態。
WordCount 第二版:
代碼如下:
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints") // 設置一個檢查點的目錄
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" ")) // DStream[RDD[String]]
// import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1)) // 將每一個單詞映射成一個元組 (word,1)
// 定義更新狀態方法,參數 values 為當前批次單詞頻度,state 為以往批次單詞頻度(該參數是由框架提供的)
val updateFunc = (values: Seq[Int], state: Option[Int]) => { // 匿名函數
// 計算當前批次相同 key 的單詞總數
val currentCount = values.foldLeft(0)(_ + _)
// 獲取上一次保存的單詞總數
val previousCount = state.getOrElse(0)
// 返回新的單詞總數
Some(currentCount + previousCount)
}
// 使用 updateStateByKey 來更新狀態,統計從運行開始以來單詞總的次數
val stateDstream = pairs.updateStateByKey[Int](updateFunc)
stateDstream.print()
// 以 text 文件形式存儲這個 DStream 的內容。第一個參數是存儲路徑,第二個參數是文件的后綴名。
stateDstream.saveAsTextFiles("hdfs://192.168.25.102:9000/stateful", "abc")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
更新狀態方法 updateFunc 圖解如下:
測試:
先啟動 netcat,再啟動統計程序,再通過 netcat 發送測試數據
[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world #發送第一個 RRD
hello hello china china #發送第二個 RRD
啟動統計程序
bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ bin/spark-submit \
> --class com.atguigu.streaming.WorldCount \
> /opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar
19/04/29 11:26:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1556508402000 ms
-------------------------------------------
19/04/29 11:26:44 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:44 WARN BlockManager: Block input-0-1556508404000 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508405000 ms
-------------------------------------------
(hello,2)
(world,1)
(china,1)
19/04/29 11:26:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:47 WARN BlockManager: Block input-0-1556508407400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508408000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)
-------------------------------------------
Time: 1556508411000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)
在 HDFS 上查看檢查點目錄
Window Operations
Window Operations 有點類似於 Storm中 的 State,可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前 Steaming 的允許狀態。基於窗口的操作會在一個比 StreamingContext 的批次間隔更長的時間范圍內,通過整合多個批次的結果,計算出整個窗口的結果。
![]()
所有基於窗口的操作都需要兩個參數,分別為
窗口時長以及滑動步長,兩者都必須是 StreamContext 的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的 windowDuration/batchInterval 個批次,如下圖所示。如果有一個以 10 秒為批次間隔的源 DStream,要創建一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration 設為 30 秒。而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,並且我們只希望每兩個批次計算一次窗口結果,就應該把滑動步長設置為 20 秒。假設,你想拓展前例從而每隔十秒對持續 30 秒的數據生成 wordcount。為做到這個,我們需要在持續 30 秒數據的 (word,1) 對 DStream上應用 reduceByKey。![]()
使用操作 reduceByKeyAndWindow:
# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
Window Operations 常用函數
![]()
reduceByWindow() 和 reduceByKeyAndWindow() 讓我們可以對每個窗口更高效地進行歸約操作。它們接收一個歸約函數,在整個窗口上執行,比如 +。除此以外,它們還有一種特殊形式,通過只考慮新進入窗口的數據和離開窗 口的數據,讓 Spark 增量計算歸約結果。這種特殊形式需要提供歸約函數的一個逆函數,比如 + 對應的逆函數為 -。對於較大的窗口,提供逆函數可以大大提高執行效率。如下圖所示:
![]()
示例代碼:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x, y) => x + y},
{(x, y) => x - y},
Seconds(30),
Seconds(10))
// 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長
// 滑動步長
countByWindow() 和 countByValueAndWindow() 作為對數據進行計數操作的簡寫。countByWindow() 返回一個表示每個窗口中元素個數的 DStream,而 countByValueAndWindow() 返回的 DStream 則包含窗口中每個值的個數。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一個批次,窗口 12 秒,滑步 6 秒。
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints") // 設置一個檢查點的目錄
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
// 3 秒一個批次,窗口 12 秒,會有 12 / 3 = 4 個批次
// 滑動步長 6 秒,會有 6 / 3 = 2 個批次
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
測試:
先啟動 netcat,再啟動統計程序,再通過 netcat 發送測試數據
[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world #發送第一個 RRD
hello hello china china #發送第二個 RRD
啟動統計程序
bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/windowwordcount-jar-with-dependencies.jar
4.4.3 重要操作
Transform Operation
Transform 原語允許 DStream 上執行任意的 RDD-to-RDD 函數。即使這些函數並沒有在 DStream 的 API 中暴露出來,通過該函數可以方便的擴展 Spark API。
該函數每一批次調度一次。
比如下面的例子,在進行單詞統計的時候,想要過濾掉 spam 的信息。
其實也就是對 DStream 中的 RDD 應用轉換。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
Join 操作
連接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin 也可以),可以連接 stream-stream,windows-stream to windows-stream、stream-dataset
1)stream-stream joins
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
2)stream-dataset joins
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
4.5 DStream 的輸出
輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)。與 RDD 中的惰性求值類似,如果一個 DStream 及其派生出的 DStream 都沒有被執行輸出操作,那么這些 DStream 就都不會被求值。如果 StreamingContext 中沒有設定輸出操作,那么整個 context 就都不會啟動。
![]()
通用的輸出操作 foreachRDD(),它用來對 DStream 中的 RDD 運行任意計算。這和transform() 有些類似,都可以讓我們訪問任意 RDD。在 foreachRDD() 中,可以重用我們在 Spark 中實現的所有行動操作。比如,常見的用例之一是把數據寫到諸如 MySQL 的外部數據庫中。
需要注意的:
• 1)連接不能寫在 driver 層面。
• 2)如果寫在 foreach 中則每個 RDD 都創建,得不償失。
• 3)增加 foreachPartition,在分區創建。
• 4)可以考慮使用連接池優化。
dstream.foreachRDD { rdd =>
// error val connection = createNewConnection() // executed at the driver 序列化錯誤
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record) // executed at the worker
)
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
4.6 累加器和廣播變量
累加器(Accumulators)和廣播變量(Broadcast variables)不能從 Spark Streaming 的檢查點中恢復。如果你啟用檢查並也使用了累加器和廣播變量,那么你必須創建累加器和廣播變量的延遲單實例從而在驅動因失效重啟后他們可以被重新實例化。如下例述:
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
4.7 DataFrame ans SQL Operations
你可以很容易地在流數據上使用 DataFrames 和 SQL。你必須使用 SparkContext 來創建 StreamingContext 要用的 SQLContext。此外,這一過程可以在驅動失效后重啟。我們通過創建一個實例化的 SQLContext 單實例來實現這個工作。如下例所示。我們對前例 wordcount 進行修改從而使用 DataFrames 和 SQL 來產生 wordcounts。每個 RDD 被轉換為 DataFrame,以臨時表格配置並用 SQL 進行查詢。
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
你也可以從不同的線程在定義流數據的表上運行 SQL 查詢(也就是說,異步運行 StreamingContext)。僅確定你設置 StreamingContext 記住了足夠數量的流數據以使得查詢操作可以運行。否則,StreamingContext 不會意識到任何異步的 SQL 查詢操作,那么其就會在查詢完成之后刪除舊的數據。例如,如果你要查詢最后一批次,但是你的查詢會運行 5 分鍾,那么你需要調用 streamingContext.remember(Minutes(5))
(in Scala 或者其他語言的等價操作)。
4.8 Caching / Persistence
和 RDDs 類似,DStream 同樣允許開發者將流數據保存在內存中。也就是說,在 DStream 上使用 persist() 方法將會自動把 DStream 中的每個 RDD 保存在內存中。當 DStream 中的數據要被多次計算時,這個非常有用(如在同樣數據上的多次操作)。
對於像 reduceByWindow 和 reduceByKeyAndWindow 以及基於狀態的 (updateStateByKey) 這種操作,保存在內存中是隱含默認的。因此,即使開發者沒有調用 persist(),由基於窗操作產生的 DStream 會自動保存在內存中。
4.9 不間斷運行 7x24 小時
4.9.1 檢查點機制
檢查點機制是我們在 Spark Streaming 中用來
保障容錯性的主要機制。與應用程序邏輯無關的錯誤(即系統錯位,JVM 崩潰等)有迅速恢復的能力。
它可以使 Spark Streaming 階段性地把應用數據存儲到諸如 HDFS 或 Amazon S3 這樣的可靠存儲系統中,以供恢復時使用。具體來說,檢查點機制主要為以下兩個目的服務:
• 1)控制發生失敗時需要重算的狀態數。SparkStreaming 可以通過轉化圖的譜系圖來重算狀態,檢查點機制則可以控制需要在轉化圖中回溯多遠。
• 2)提供驅動器程序容錯。如果流計算應用中的驅動器程序崩潰了,你可以重啟驅動器程序並讓驅動器程序從檢查點恢復,這樣 Spark Streaming 就可以讀取之前運行的程序處理數據的進度,並從那里繼續。為了實現這個,Spark Streaming 需要為容錯存儲系統 checkpoint 提供足夠的信息從而使得其可以從失敗中恢復過來。有兩種類型的數據設置檢查點:
Metadata checkpointing:將定義流計算的信息存入容錯的系統如 HDFS。元數據包括:
配置 – 用於創建流應用的配置。
DStreams 操作 – 定義流應用的 DStreams 操作集合。
不完整批次 – 批次的工作已進行排隊但是並未完成。Data checkpointing:將產生的 RDDs 存入可靠的存儲空間。對於在多批次間合並數據的狀態轉換,這個很有必要。在這樣的轉換中,RDDs 的產生基於之前批次的 RDDs,這樣依賴鏈長度隨着時間遞增。為了避免在恢復期這種無限的時間增長(和鏈長度成比例),狀態轉換中間的 RDDs 周期性寫入可靠地存儲空間(如 HDFS)從而切短依賴鏈。
總而言之,
元數據檢查點在由驅動失效中恢復是首要需要的。而數據或者 RDD 檢查點甚至在使用了狀態轉換的基礎函數中也是必要的。
出於這些原因,檢查點機制對於任何生產環境中的流計算應用都至關重要。你可以通過向 ssc.checkpoint() 方法傳遞一個路徑參數 (HDFS、S3 或者本地路徑均可) 來配置檢查點機制,同時你的應用應該能夠使用檢查點的數據。
• 1)當程序首次啟動,其將創建一個新的 StreamingContext,設置所有的流並調用 start()。
• 2)當程序在失效后重啟,其將依據檢查點目錄的檢查點數據重新創建一個 StreamingContext。通過使用 StraemingContext.getOrCreate 很容易獲得這個性能。
ssc.checkpoint("hdfs://...")
# 創建和設置一個新的 StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # 設置檢查點目錄
return ssc
# 從檢查點數據中獲取 StreamingContext 或者重新創建一個
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# 在需要完成的 context 上做額外的配置
# 無論其有沒有啟動
context ...
# 啟動 context
context.start()
contaxt.awaitTermination()
如果檢查點目錄(checkpointDirectory)存在,那么 context 將會由檢查點數據重新創建。如果目錄不存在(首次運行),那么函數 functionToCreateContext 將會被調用來創建一個新的 context 並設置 DStreams。
注意:RDDs 的檢查點引起存入可靠內存的開銷。在 RDDs 需要檢查點的批次里,處理的時間會因此而延長。所以,檢查點的間隔需要很仔細地設置。在小尺寸批次(1秒鍾)。每一批次檢查點會顯著減少操作吞吐量。反之,檢查點設置的過於頻繁導致“血統”和任務尺寸增長,這會有很不好的影響。對於需要 RDD 檢查點設置的狀態轉換,默認間隔是批次間隔的乘數一般至少為 10 秒鍾。可以通過 dstream.checkpoint(checkpointInterval)。通常,檢查點設置間隔是 5-10 個 DStream 的滑動間隔。
4.9.2 驅動器程序容錯
驅動器程序的容錯要求我們以特殊的方式創建 StreamingContext。我們需要把檢查點目錄提供給 StreamingContext。與直接調用 new StreamingContext 不同,應該使用 StreamingContext.getOrCreate() 函數。
4.9.3 工作節點容錯
為了應對工作節點失敗的問題,Spark Streaming 使用與 Spark 的容錯機制相同的方法。所有從外部數據源中收到的數據都在多個工作節點上備份。所有從備份數據轉化操作的過程中創建出來的 RDD 都能容忍一個工作節點的失敗,因為
根據 RDD 譜系圖,系統可以把丟失的數據從幸存的輸入數據備份中重算出來。
4.9.4 接收器容錯
運行接收器的工作節點的容錯也是很重要的。如果這樣的節點發生錯誤,Spark Streaming 會在集群中別的節點上重啟失敗的接收器。然而,這種情況會不會導致數據的丟失
取決於數據源的行為(數據源是否會重發數據)以及接收器的實現(接收器是否會向數據源確認收到數據)。舉個例子,使用 Flume 作為數據源時,兩種接收器的主要區別在於數據丟失時的保障。在 “接收器從數據池中拉取數據” 的模型中,Spark 只會在數據已經在集群中備份時才會從數據池中移除元素。而在 “向接收器推數據” 的模型中,如果接收器在數據備份之前失敗,一些數據可能就會丟失。總的來說,對於任意一個接收器,必須同時考慮上游數據源的容錯性(是否支持事務)來確保零數據丟失。
總的來說,接收器提供以下保證:
• 所有從可靠文件系統中讀取的數據 (比如通過 StreamingContext.hadoopFiles 讀取的) 都是可靠的,因為底層的文件系統是有備份的。Spark Streaming 會記住哪些數據存放到了檢查點中,並在應用崩潰后從檢查點處繼續執行。
• 對於像 Kafka、推式 Flume、Twitter 這樣的不可靠數據源,Spark 會把輸入數據復制到其他節點上,但是如果接收器任務崩潰,Spark 還是會丟失數據。在 Spark 1.1 以及更早的版本中,收到的數據只被備份到執行器進程的內存中,所以一旦驅動器程序崩潰(此時所有的執行器進程都會丟失連接),數據也會丟失。在 Spark 1.2 中,收到的數據被記錄到諸如 HDFS 這樣的可靠的文件系統中,這樣即使驅動器程序重啟也不會導致數據丟失。
綜上所述,確保所有數據都被處理的最佳方式是使用可靠的數據源(例如 HDFS、拉式 Flume 等)。如果你還要在批處理作業中處理這些數據,使用可靠數據源是最佳方式,因為這種方式確保了你的批處理作業和流計算作業能讀取到相同的數據,因而可以得到相同的結果。
4.9.5 處理保證
由於 Spark Streaming 工作節點的容錯保障,Spark Streaming 可以為所有的轉化操作提供 “精確一次” 執行的語義,即使一個工作節點在處理部分數據時發生失敗,最終的轉化結果(即轉化操作得到的 RDD)仍然與數據只被處理一次得到的結果一樣。
然而,當把轉化操作得到的結果使用輸出操作推入外部系統中時,寫結果的任務可能因故障而執行多次,一些數據可能也就被寫了多次。由於這引入了外部系統,因此我們需要專門針對各系統的代碼來處理這樣的情況。我們可以使用事務操作來寫入外部系統(即原子化地將一個 RDD 分區一次寫入),或者設計冪等的更新操作(即多次運行同一個更新操作仍生成相同的結果)。比如 Spark Streaming 的 saveAs…File 操作會在一個文件寫完時自動將其原子化地移動到最終位置上,以此確保每個輸出文件只存在一份。
4.10 性能考量
最常見的問題是 Spark Streaming 可以使用的最小批次間隔是多少。總的來說,500 毫秒已經被證實為對許多應用而言是比較好的最小批次大小。尋找最小批次大小的最佳實踐是從一個比較大的批次大小(10 秒左右)開始,不斷使用更小的批次大小。如果 Streaming 用戶界面中顯示的處理時間保持不變,你就可以進一步減小批次大小。如果處理時間開始增加,你可能已經達到了應用的極限。
相似地,對於窗口操作,計算結果的間隔(也就是滑動步長)對於性能也有巨大的影響。當計算代價巨大並成為系統瓶頸時,就應該考慮提高滑動步長了。
減少批處理所消耗時間的常見方式還有提高並行度。有以下三種方式可以提高並行度:
•增加接收器數目。有時如果記錄太多會導致單台機器來不及讀入並分發的話,接收器會成為系統瓶頸。這時你就需要通過創建多個輸入 DStream(這樣會創建多個接收器) 來增加接收器數目,然后使用 union 來把數據合並為一個數據源。
•將收到的數據顯式地重新分區。如果接收器數目無法再增加,你可以通過使用 DStream.repartition 來顯式重新分區輸入流(或者合並多個流得到的數據流) 來重新分配收到的數據。
•提高聚合計算的並行度。對於像 reduceByKey() 這樣的操作,你可以在第二個參數中指定並行度,我們在介紹 RDD 時提到過類似的手段。
