SparkStreaming入門到實戰之(2)--Spark Streaming 基本操作


Spark Streaming 基本操作

一、案例引入
         3.1 StreamingContext
         3.2 數據源
         3.3 服務的啟動與停止
二、Transformation
         2.1 DStream與RDDs
         2.2 updateStateByKey
         2.3 啟動測試
三、輸出操作
         3.1 輸出API
         3.1 foreachRDD
         3.3 代碼說明
         3.4 啟動測試

一、案例引入

這里先引入一個基本的案例來演示流的創建:獲取指定端口上的數據並進行詞頻統計。項目依賴和代碼實現如下:

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.12</artifactId>
   <version>2.4.3</version>
</dependency>
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {

 def main(args: Array[String]) {

   /*指定時間間隔為 5s*/
   val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
   val ssc = new StreamingContext(sparkConf, Seconds(5))

   /*創建文本輸入流,並進行詞頻統計*/
   val lines = ssc.socketTextStream("hadoop001", 9999)
   lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()

   /*啟動服務*/
   ssc.start()
   /*等待服務結束*/
   ssc.awaitTermination()
}
}

使用本地模式啟動 Spark 程序,然后使用 nc -lk 9999 打開端口並輸入測試數據:

[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban

此時控制台輸出如下,可以看到已經接收到數據並按行進行了詞頻統計。

 

 

下面針對示例代碼進行講解:

3.1 StreamingContext

Spark Streaming 編程的入口類是 StreamingContext,在創建時候需要指明 sparkConfbatchDuration(批次時間),Spark 流處理本質是將流數據拆分為一個個批次,然后進行微批處理,batchDuration 就是批次拆分的時間間隔。這個時間可以根據業務需求和服務器性能進行指定,如果業務要求低延遲並且服務器性能也允許,則這個時間可以指定得很短。

這里需要注意的是:示例代碼使用的是本地模式,配置為 local[2],這里不能配置為 local[1]。這是因為對於流數據的處理,Spark 必須有一個獨立的 Executor 來接收數據,然后再由其他的 Executors 來處理,所以為了保證數據能夠被處理,至少要有 2 個 Executors。這里我們的程序只有一個數據流,在並行讀取多個數據流的時候,也需要保證有足夠的 Executors 來接收和處理數據。

3.2 數據源

在示例代碼中使用的是 socketTextStream 來創建基於 Socket 的數據流,實際上 Spark 還支持多種數據源,分為以下兩類:

  • 基本數據源:包括文件系統、Socket 連接等;

  • 高級數據源:包括 Kafka,Flume,Kinesis 等。

在基本數據源中,Spark 支持監聽 HDFS 上指定目錄,當有新文件加入時,會獲取其文件內容作為輸入流。創建方式如下:

// 對於文本文件,指明監聽目錄即可
streamingContext.textFileStream(dataDirectory)
// 對於其他文件,需要指明目錄,以及鍵的類型、值的類型、和輸入格式
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

被監聽的目錄可以是具體目錄,如 hdfs://host:8040/logs/;也可以使用通配符,如 hdfs://host:8040/logs/2017/*

關於高級數據源的整合單獨整理至:Spark Streaming 整合 FlumeSpark Streaming 整合 Kafka

3.3 服務的啟動與停止

在示例代碼中,使用 streamingContext.start() 代表啟動服務,此時還要使用 streamingContext.awaitTermination() 使服務處於等待和可用的狀態,直到發生異常或者手動使用 streamingContext.stop() 進行終止。

 

二、Transformation

2.1 DStream與RDDs

DStream 是 Spark Streaming 提供的基本抽象。它表示連續的數據流。在內部,DStream 由一系列連續的 RDD 表示。所以從本質上而言,應用於 DStream 的任何操作都會轉換為底層 RDD 上的操作。例如,在示例代碼中 flatMap 算子的操作實際上是作用在每個 RDDs 上 (如下圖)。因為這個原因,所以 DStream 能夠支持 RDD 大部分的transformation算子。

 

2.2 updateStateByKey

除了能夠支持 RDD 的算子外,DStream 還有部分獨有的transformation算子,這當中比較常用的是 updateStateByKey。文章開頭的詞頻統計程序,只能統計每一次輸入文本中單詞出現的數量,想要統計所有歷史輸入中單詞出現的數量,可以使用 updateStateByKey 算子。代碼如下:

object NetworkWordCountV2 {


 def main(args: Array[String]) {

   /*
    * 本地測試時最好指定 hadoop 用戶名,否則會默認使用本地電腦的用戶名,
    * 此時在 HDFS 上創建目錄時可能會拋出權限不足的異常
    */
   System.setProperty("HADOOP_USER_NAME", "root")
     
   val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
   val ssc = new StreamingContext(sparkConf, Seconds(5))
   /*必須要設置檢查點*/
   ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
   val lines = ssc.socketTextStream("hadoop001", 9999)
   lines.flatMap(_.split(" ")).map(x => (x, 1))
    .updateStateByKey[Int](updateFunction _)   //updateStateByKey 算子
    .print()

   ssc.start()
   ssc.awaitTermination()
}

 /**
   * 累計求和
   *
   * @param currentValues 當前的數據
   * @param preValues     之前的數據
   * @return 相加后的數據
   */
 def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
   val current = currentValues.sum
   val pre = preValues.getOrElse(0)
   Some(current + pre)
}
}

使用 updateStateByKey 算子,你必須使用 ssc.checkpoint() 設置檢查點,這樣當使用 updateStateByKey 算子時,它會去檢查點中取出上一次保存的信息,並使用自定義的 updateFunction 函數將上一次的數據和本次數據進行相加,然后返回。

2.3 啟動測試

在監聽端口輸入如下測試數據:

[root@hadoop001 ~]# nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban

此時控制台輸出如下,所有輸入都被進行了詞頻累計:

 

同時在輸出日志中還可以看到檢查點操作的相關信息:

# 保存檢查點信息
19/05/27 16:21:05 INFO CheckpointWriter: Saving checkpoint for time 1558945265000 ms
to file 'hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000'

# 刪除已經無用的檢查點信息
19/05/27 16:21:30 INFO CheckpointWriter:
Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000

三、輸出操作

3.1 輸出API

Spark Streaming 支持以下輸出操作:

Output Operation Meaning
print() 在運行流應用程序的 driver 節點上打印 DStream 中每個批次的前十個元素。用於開發調試。
saveAsTextFiles(prefix, [suffix]) 將 DStream 的內容保存為文本文件。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將 DStream 的內容序列化為 Java 對象,並保存到 SequenceFiles。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsHadoopFiles(prefix, [suffix]) 將 DStream 的內容保存為 Hadoop 文件。每個批處理間隔的文件名基於前綴和后綴生成:“prefix-TIME_IN_MS [.suffix]”。
foreachRDD(func) 最通用的輸出方式,它將函數 func 應用於從流生成的每個 RDD。此函數應將每個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或通過網絡將其寫入數據庫。

前面的四個 API 都是直接調用即可,下面主要講解通用的輸出方式 foreachRDD(func),通過該 API 你可以將數據保存到任何你需要的數據源。

3.1 foreachRDD

這里我們使用 Redis 作為客戶端,對文章開頭示例程序進行改變,把每一次詞頻統計的結果寫入到 Redis,並利用 Redis 的 HINCRBY 命令來進行詞頻統計。這里需要導入 Jedis 依賴:

<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
   <version>2.9.0</version>
</dependency>

具體實現代碼如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

object NetworkWordCountToRedis {
  
    def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /*創建文本輸入流,並進行詞頻統計*/
    val lines = ssc.socketTextStream("hadoop001", 9999)
    val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
     /*保存數據到 Redis*/
    pairs.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        var jedis: Jedis = null
        try {
          jedis = JedisPoolUtil.getConnection
          partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
        } catch {
          case ex: Exception =>
            ex.printStackTrace()
        } finally {
          if (jedis != null) jedis.close()
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

其中 JedisPoolUtil 的代碼如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisPoolUtil {

    /* 聲明為 volatile 防止指令重排序 */
    private static volatile JedisPool jedisPool = null;
    private static final String HOST = "localhost";
    private static final int PORT = 6379;

    /* 雙重檢查鎖實現懶漢式單例 */
    public static Jedis getConnection() {
        if (jedisPool == null) {
            synchronized (JedisPoolUtil.class) {
                if (jedisPool == null) {
                    JedisPoolConfig config = new JedisPoolConfig();
                    config.setMaxTotal(30);
                    config.setMaxIdle(10);
                    jedisPool = new JedisPool(config, HOST, PORT);
                }
            }
        }
        return jedisPool.getResource();
    }
}

3.3 代碼說明

這里將上面保存到 Redis 的代碼單獨抽取出來,並去除異常判斷的部分。精簡后的代碼如下:

pairs.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val jedis = JedisPoolUtil.getConnection
    partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
    jedis.close()
  }
}

這里可以看到一共使用了三次循環,分別是循環 RDD,循環分區,循環每條記錄,上面我們的代碼是在循環分區的時候獲取連接,也就是為每一個分區獲取一個連接。但是這里大家可能會有疑問:為什么不在循環 RDD 的時候,為每一個 RDD 獲取一個連接,這樣所需要的連接數會更少。實際上這是不可行的,如果按照這種情況進行改寫,如下:

pairs.foreachRDD { rdd =>
    val jedis = JedisPoolUtil.getConnection
    rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
    }
    jedis.close()
}

此時在執行時候就會拋出 Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis,這是因為在實際計算時,Spark 會將對 RDD 操作分解為多個 Task,Task 運行在具體的 Worker Node 上。在執行之前,Spark 會對任務進行閉包,之后閉包被序列化並發送給每個 Executor,而 Jedis 顯然是不能被序列化的,所以會拋出異常。

第二個需要注意的是 ConnectionPool 最好是一個靜態,惰性初始化連接池 。這是因為 Spark 的轉換操作本身就是惰性的,且沒有數據流時不會觸發寫出操作,所以出於性能考慮,連接池應該是惰性的,因此上面 JedisPool 在初始化時采用了懶漢式單例進行惰性初始化。

3.4 啟動測試

在監聽端口輸入如下測試數據:

[root@hadoop001 ~]#  nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban

使用 Redis Manager 查看寫入結果 (如下圖),可以看到與使用 updateStateByKey 算子得到的計算結果相同。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM