一、案例引入
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.3</version>
</dependency>
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkWordCount {
def main(args: Array[String]) {
/*指定時間間隔為 5s*/
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*創建文本輸入流,並進行詞頻統計*/
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
/*啟動服務*/
ssc.start()
/*等待服務結束*/
ssc.awaitTermination()
}
}
使用本地模式啟動 Spark 程序,然后使用 nc -lk 9999
打開端口並輸入測試數據:
[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
此時控制台輸出如下,可以看到已經接收到數據並按行進行了詞頻統計。

下面針對示例代碼進行講解:
3.1 StreamingContext
Spark Streaming 編程的入口類是 StreamingContext,在創建時候需要指明 sparkConf
和 batchDuration
(批次時間),Spark 流處理本質是將流數據拆分為一個個批次,然后進行微批處理,batchDuration
就是批次拆分的時間間隔。這個時間可以根據業務需求和服務器性能進行指定,如果業務要求低延遲並且服務器性能也允許,則這個時間可以指定得很短。
這里需要注意的是:示例代碼使用的是本地模式,配置為 local[2]
,這里不能配置為 local[1]
。這是因為對於流數據的處理,Spark 必須有一個獨立的 Executor 來接收數據,然后再由其他的 Executors 來處理,所以為了保證數據能夠被處理,至少要有 2 個 Executors。這里我們的程序只有一個數據流,在並行讀取多個數據流的時候,也需要保證有足夠的 Executors 來接收和處理數據。
3.2 數據源
在示例代碼中使用的是 socketTextStream
來創建基於 Socket 的數據流,實際上 Spark 還支持多種數據源,分為以下兩類:
-
基本數據源:包括文件系統、Socket 連接等;
-
高級數據源:包括 Kafka,Flume,Kinesis 等。
在基本數據源中,Spark 支持監聽 HDFS 上指定目錄,當有新文件加入時,會獲取其文件內容作為輸入流。創建方式如下:
// 對於文本文件,指明監聽目錄即可
streamingContext.textFileStream(dataDirectory)
// 對於其他文件,需要指明目錄,以及鍵的類型、值的類型、和輸入格式
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
被監聽的目錄可以是具體目錄,如 hdfs://host:8040/logs/
;也可以使用通配符,如 hdfs://host:8040/logs/2017/*
。
關於高級數據源的整合單獨整理至:Spark Streaming 整合 Flume 和 Spark Streaming 整合 Kafka
3.3 服務的啟動與停止
在示例代碼中,使用 streamingContext.start()
代表啟動服務,此時還要使用 streamingContext.awaitTermination()
使服務處於等待和可用的狀態,直到發生異常或者手動使用 streamingContext.stop()
進行終止。
二、Transformation
2.1 DStream與RDDs
DStream 是 Spark Streaming 提供的基本抽象。它表示連續的數據流。在內部,DStream 由一系列連續的 RDD 表示。所以從本質上而言,應用於 DStream 的任何操作都會轉換為底層 RDD 上的操作。例如,在示例代碼中 flatMap 算子的操作實際上是作用在每個 RDDs 上 (如下圖)。因為這個原因,所以 DStream 能夠支持 RDD 大部分的transformation算子。

2.2 updateStateByKey
除了能夠支持 RDD 的算子外,DStream 還有部分獨有的transformation算子,這當中比較常用的是 updateStateByKey
。文章開頭的詞頻統計程序,只能統計每一次輸入文本中單詞出現的數量,想要統計所有歷史輸入中單詞出現的數量,可以使用 updateStateByKey
算子。代碼如下:
object NetworkWordCountV2 {
def main(args: Array[String]) {
/*
* 本地測試時最好指定 hadoop 用戶名,否則會默認使用本地電腦的用戶名,
* 此時在 HDFS 上創建目錄時可能會拋出權限不足的異常
*/
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*必須要設置檢查點*/
ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1))
.updateStateByKey[Int](updateFunction _) //updateStateByKey 算子
.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 累計求和
*
* @param currentValues 當前的數據
* @param preValues 之前的數據
* @return 相加后的數據
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
使用 updateStateByKey
算子,你必須使用 ssc.checkpoint()
設置檢查點,這樣當使用 updateStateByKey
算子時,它會去檢查點中取出上一次保存的信息,並使用自定義的 updateFunction
函數將上一次的數據和本次數據進行相加,然后返回。
2.3 啟動測試
在監聽端口輸入如下測試數據:
[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban
此時控制台輸出如下,所有輸入都被進行了詞頻累計:

同時在輸出日志中還可以看到檢查點操作的相關信息:
# 保存檢查點信息
19/05/27 16:21:05 INFO CheckpointWriter: Saving checkpoint for time 1558945265000 ms
to file 'hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000'
# 刪除已經無用的檢查點信息
19/05/27 16:21:30 INFO CheckpointWriter:
Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000
三、輸出操作
3.1 輸出API
Spark Streaming 支持以下輸出操作:
Output Operation | Meaning |
---|---|
print() | 在運行流應用程序的 driver 節點上打印 DStream 中每個批次的前十個元素。用於開發調試。 |
saveAsTextFiles(prefix, [suffix]) | 將 DStream 的內容保存為文本文件。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。 |
saveAsObjectFiles(prefix, [suffix]) | 將 DStream 的內容序列化為 Java 對象,並保存到 SequenceFiles。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。 |
saveAsHadoopFiles(prefix, [suffix]) | 將 DStream 的內容保存為 Hadoop 文件。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。 |
foreachRDD(func) | 最通用的輸出方式,它將函數 func 應用於從流生成的每個 RDD。此函數應將每個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或通過網絡將其寫入數據庫。 |
前面的四個 API 都是直接調用即可,下面主要講解通用的輸出方式 foreachRDD(func)
,通過該 API 你可以將數據保存到任何你需要的數據源。
3.1 foreachRDD
這里我們使用 Redis 作為客戶端,對文章開頭示例程序進行改變,把每一次詞頻統計的結果寫入到 Redis,並利用 Redis 的 HINCRBY
命令來進行詞頻統計。這里需要導入 Jedis 依賴:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
具體實現代碼如下:
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis object NetworkWordCountToRedis { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) /*創建文本輸入流,並進行詞頻統計*/ val lines = ssc.socketTextStream("hadoop001", 9999) val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) /*保存數據到 Redis*/ pairs.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => var jedis: Jedis = null try { jedis = JedisPoolUtil.getConnection partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2)) } catch { case ex: Exception => ex.printStackTrace() } finally { if (jedis != null) jedis.close() } } } ssc.start() ssc.awaitTermination() } }
其中 JedisPoolUtil
的代碼如下:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class JedisPoolUtil { /* 聲明為 volatile 防止指令重排序 */ private static volatile JedisPool jedisPool = null; private static final String HOST = "localhost"; private static final int PORT = 6379; /* 雙重檢查鎖實現懶漢式單例 */ public static Jedis getConnection() { if (jedisPool == null) { synchronized (JedisPoolUtil.class) { if (jedisPool == null) { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(30); config.setMaxIdle(10); jedisPool = new JedisPool(config, HOST, PORT); } } } return jedisPool.getResource(); } }
3.3 代碼說明
這里將上面保存到 Redis 的代碼單獨抽取出來,並去除異常判斷的部分。精簡后的代碼如下:
pairs.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val jedis = JedisPoolUtil.getConnection partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2)) jedis.close() } }
這里可以看到一共使用了三次循環,分別是循環 RDD,循環分區,循環每條記錄,上面我們的代碼是在循環分區的時候獲取連接,也就是為每一個分區獲取一個連接。但是這里大家可能會有疑問:為什么不在循環 RDD 的時候,為每一個 RDD 獲取一個連接,這樣所需要的連接數會更少。實際上這是不可行的,如果按照這種情況進行改寫,如下:
pairs.foreachRDD { rdd => val jedis = JedisPoolUtil.getConnection rdd.foreachPartition { partitionOfRecords => partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2)) } jedis.close() }
此時在執行時候就會拋出 Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
,這是因為在實際計算時,Spark 會將對 RDD 操作分解為多個 Task,Task 運行在具體的 Worker Node 上。在執行之前,Spark 會對任務進行閉包,之后閉包被序列化並發送給每個 Executor,而 Jedis
顯然是不能被序列化的,所以會拋出異常。
第二個需要注意的是 ConnectionPool 最好是一個靜態,惰性初始化連接池 。這是因為 Spark 的轉換操作本身就是惰性的,且沒有數據流時不會觸發寫出操作,所以出於性能考慮,連接池應該是惰性的,因此上面 JedisPool
在初始化時采用了懶漢式單例進行惰性初始化。
3.4 啟動測試
在監聽端口輸入如下測試數據:
[root@hadoop001 ~]# nc -lk 9999 hello world hello spark hive hive hadoop storm storm flink azkaban hello world hello spark hive hive hadoop storm storm flink azkaban
使用 Redis Manager 查看寫入結果 (如下圖),可以看到與使用 updateStateByKey
算子得到的計算結果相同。
