Spark Structured Streaming框架(1)之基本用法


   Spark Struntured Streaming是Spark 2.1.0版本后新增加的流計算引擎,本博將通過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其為"StructuredNetworkWordcount.scala"文件。

1. Quick Example

  由於我們是在單機上進行測試,所以需要修單機運行模型,修改后的程序如下:

package org.apache.spark.examples.sql.streaming

 

import org.apache.spark.sql.SparkSession

 

/**

* Counts words in UTF8 encoded, '\n' delimited text received from the network.

*

* Usage: StructuredNetworkWordCount <hostname> <port>

* <hostname> and <port> describe the TCP server that Structured Streaming

* would connect to receive data.

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCount

* localhost 9999`

*/

object StructuredNetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCount")

.master("local[*]")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

// Start running the query that prints the running counts to the console

val query = wordCounts.writeStream

.outputMode("complete")

.format("console")

.start()

 

query.awaitTermination()

}

}

 

2. 剖析

  對於上述所示的程序,進行如下的解讀和分析:

2.1 數據輸入

  在創建SparkSessiion對象之后,需要設置數據源的類型,及數據源的配置。然后就會數據源中源源不斷的接收數據,接收到的數據以DataFrame對象存在,該類型與Spark SQL中定義類型一樣,內部由Dataset數組組成。

如下程序所示,設置輸入源的類型為socket,並配置socket源的IP地址和端口號。接收到的數據流存儲到lines對象中,其類型為DataFarme。

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

2.2 單詞統計

  如下程序所示,首先將接受到的數據流lines轉換為String類型的序列;接着每一批數據都以空格分隔為獨立的單詞;最后再對每個單詞進行分組並統計次數。

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

2.3 數據輸出

通過DataFrame對象的writeStream方法獲取DataStreamWrite對象,DataStreamWrite類定義了一些數據輸出的方式。Quick example程序將數據輸出到控制終端。注意只有在調用start()方法后,才開始執行Streaming進程,start()方法會返回一個StreamingQuery對象,用戶可以使用該對象來管理Streaming進程。如上述程序調用awaitTermination()方法阻塞接收所有數據。

 

3. 異常

3.1 拒絕連接

  當直接提交編譯后的架包時,即沒有啟動"nc –lk 9999"時,會出現圖 11所示的錯誤。解決該異常,只需在提交(spark-submit)程序之前,先啟動"nc"命令即可解決,且不能使用"nc –lk localhost 9999".

圖 11

3.2 NoSuchMethodError

  當通過mvn打包程序后,在命令行通過spark-submit提交架包,能夠正常執行,而通過IDEA執行時會出現圖 12所示的錯誤。

圖 12

  出現這個異常,是由於項目中依賴的Scala版本與Spark編譯的版本不一致,從而導致出現這種錯誤。圖 13和圖 14所示,Spark 2.10是由Scala 2.10版本編譯而成的,而項目依賴的Scala版本是2.11.8,從而導致出現了錯誤。

圖 13

 

圖 14

  解決該問題,只需在項目的pom.xml文件中指定與spark編譯的版本一致,即可解決該問題。如圖 15所示的執行結果。

圖 15

4. 參考文獻

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM