Spark Structured Streaming框架(2)之數據輸入源詳解


  Spark Structured Streaming目前的2.1.0版本只支持輸入源:File、kafka和socket。

1. Socket

  Socket方式是最簡單的數據輸入源,如Quick example所示的程序,就是使用的這種方式。用戶只需要指定"socket"形式並配置監聽的IP和Port即可。

val scoketDF = spark.readStream

.format("socket")

.option("host","localhost")

.option("port", 9999)

.load()

 

注意:

    Socket方式Streaming是接收UTF8的text數據,並且這種方式最后只用於測試,不要用戶端到端的項目中。

2. Kafka

  Structured streaming提供接收kafka數據源的接口,用戶使用起來也非常方便,只是需要注意開發環境所依賴的特別庫,同時streaming運行環境的kafka版本。

2.1 開發環境

  若以kafka作為輸入源,那么開發環境需要再引入所依賴的架包。如使用了Spark版本是2.1.0,那么maven的pom.xml文件中需要添加如下的依賴庫。

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql-kafka-0.10_2.11</artifactId>

<version>2.1.0</version>

</dependency>

2.2 API

  與使用socket作為輸入源類似,只需要指定"kafka"作為輸入源,同時傳遞kafka的server集和topic集。如下所示:

// Subscribe to 1 topic

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

// Subscribe to multiple topics

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1,topic2")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

// Subscribe to a pattern

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribePattern", "topic.*")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

2.3 運行環境

  由於spark 2.1.0使用了kafka的版本是0.10,所以kafka server也要使用同樣版本,即發送數據的kafka也需要使用0.10版本。

否則會出現如下的錯誤:

圖 21

3. File

  Structured Streaming可以指定一個目錄的文件作為數據輸入源,其中支持的文件格式有:text、csv、json、parquet。

如下所示:

object StructuredFile{

def main(args:Array[String]){

val spark = SparkSession

.builder

.appName("StructuredNetWordCount")

.getOrCreate()

val userSchema = new StructType().add("name","string").add("age","integer")

val jsonDF = spark

.readStream

.schema(userSchema)

.json("/root/jar/directory")//Equivalent to format("json").load("/root/jar/directore")

Val query = jsonDF.writeStream

.format(console)

.start()

Query.awaitTermination()

}

}

 

 1) DataStreamReader接口

  讀取文件的接口有5個:

  • format(source).load(path):source參數是指文件的形式,有textcsvjsonparquet四種形式;
  • text(path):其封裝了format("text").load(path);
  • json(path):其封裝了format("json").load(path);
  • csv(path):其封裝了format("csv").load(path);
  • parquet(path):其封裝了format("parquet").load(path);

  其中path參數為文件的路徑,若該路徑發現新增文件,則會被以數據流的形式被獲取。但該路徑只能是指定的格式文件,不能存放其它文件格式。

注意:

    若是以Spark集群方式運行,則路徑是hdfs種的文件路徑;若是以local方式執行,則路徑為本地路徑。

 2) schema()方法

  獲取的文件形式有四種,但並不是每種格式都需要調用schema()方法來配置文件信息:

  • csvjsonparquet:用戶需要通過schema()方法手動配置文件信息;
  • text:不需要用戶指定schema,其返回的列是只有一個"value"。

4) 自定義

  若上述Spark Structured Streaming API提供的數據輸入源不能滿足要求,那么還有一種方法可以使用:修改源碼。

如下通過獲取"socket"數據源相應類的內容為例,介紹具體使用方式:

4.1 實現Provider

  首先實現一個Provider,該類會返回一個數據的數據源對象。其中Provider實現類需要實現三個方法:

序號

方法

描述

1

souceSchema

該方法返回一個配置信息的詞典,key是字符串,valueStructType對象

2

createSource

該方法返回一個接受數據源的對象,其為Source接口的子類

3

shortName

該方法返回一個數據源的標識符,如上述format()方法傳遞的參數:"socket"、"json"或"kafka";此時返回的字符串,就是format()方法傳遞的參數

 

  如下所示實現一個TextRabbitMQSourceProvider類:

class TextRabbitMQSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {

private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {

Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {

case Success(bool) => bool

case Failure(_) =>

throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")

}

}

 

/** Returns the name and schema of the source that can be used to continually read data. */

override def sourceSchema(

sqlContext: SQLContext,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): (String, StructType) = {

logWarning("The socket source should not be used for production applications! " +

"It does not support recovery.")

if (!parameters.contains("host")) {

throw new AnalysisException("Set a host to read from with option(\"host\", ...).")

}

if (!parameters.contains("port")) {

throw new AnalysisException("Set a port to read from with option(\"port\", ...).")

}

val schema =

if (parseIncludeTimestamp(parameters)) {

TextSocketSource.SCHEMA_TIMESTAMP

} else {

TextSocketSource.SCHEMA_REGULAR

}

("textSocket", schema)

}

 

override def createSource(

sqlContext: SQLContext,

metadataPath: String,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): Source = {

val host = parameters("host")

val port = parameters("port").toInt

new TextRabbitMQSource(host, port, parseIncludeTimestamp(parameters), sqlContext)

}

 

/** String that represents the format that this data source provider uses. */

override def shortName(): String = "RabbitMQ"

}

 

4.2 實現Source

  用戶需要實現一個真正接受數據的類,該類實例是由Provider實現類來實例化,如上述的createSource()方法。其中需要實現Source抽象類的幾個方法,從而讓Structured Streaming引擎能夠調用:

序號

方法

描述

1

getOffset

獲取可用的數據偏移量,表明是否有可用的數據

2

getBatch

獲取可用的數據,以DataFrame對象形式返回

3

commit

傳遞已經接收的數據偏移量

4

stop

聽着Source數據源

 

class TextRabbitMQSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)

extends Source with Logging {

 

@GuardedBy("this")

private var socket: Socket = null

 

@GuardedBy("this")

private var readThread: Thread = null

 

/**

* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.

* Stored in a ListBuffer to facilitate removing committed batches.

*/

@GuardedBy("this")

protected val batches = new ListBuffer[(String, Timestamp)]

 

@GuardedBy("this")

protected var currentOffset: LongOffset = new LongOffset(-1)

 

@GuardedBy("this")

protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

 

initialize()

 

private def initialize(): Unit = synchronized {

socket = new Socket(host, port)

val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))

readThread = new Thread(s"TextSocketSource($host, $port)") {

setDaemon(true)

 

override def run(): Unit = {

try {

while (true) {

val line = reader.readLine()

if (line == null) {

// End of file reached

logWarning(s"Stream closed by $host:$port")

return

}

TextSocketSource.this.synchronized {

val newData = (line,

Timestamp.valueOf(

TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))

)

currentOffset = currentOffset + 1

batches.append(newData)

}

}

} catch {

case e: IOException =>

}

}

}

readThread.start()

}

 

/** Returns the schema of the data from this source */

override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP

else TextSocketSource.SCHEMA_REGULAR

 

override def getOffset: Option[Offset] = synchronized {

if (currentOffset.offset == -1) {

None

} else {

Some(currentOffset)

}

}

 

/** Returns the data that is between the offsets (`start`, `end`]. */

override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {

val startOrdinal =

start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1

val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1

 

// Internal buffer only holds the batches after lastOffsetCommitted

val rawList = synchronized {

val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1

val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1

batches.slice(sliceStart, sliceEnd)

}

 

import sqlContext.implicits._

val rawBatch = sqlContext.createDataset(rawList)

 

// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp

// if requested.

if (includeTimestamp) {

rawBatch.toDF("value", "timestamp")

} else {

// Strip out timestamp

rawBatch.select("_1").toDF("value")

}

}

 

override def commit(end: Offset): Unit = synchronized {

val newOffset = LongOffset.convert(end).getOrElse(

sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +

s"originate with an instance of this class")

)

 

val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

 

if (offsetDiff < 0) {

sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")

}

 

batches.trimStart(offsetDiff)

lastOffsetCommitted = newOffset

}

 

/** Stop this source. */

override def stop(): Unit = synchronized {

if (socket != null) {

try {

// Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to

// stop the readThread is to close the socket.

socket.close()

} catch {

case e: IOException =>

}

socket = null

}

}

 

override def toString: String = s"TextSocketSource[host: $host, port: $port]"

}

 

4.3 注冊Provider

  由於Structured Streaming引擎會根據用戶在format()方法傳遞的數據源類型來尋找具體數據源的provider,即在DataSource.lookupDataSource()方法中尋找。所以用戶需要將上述實現的Provider類注冊到Structured Streaming引擎中。所以用戶需要將provider實現類的完整名稱添加到引擎中的某個,這個地方就是在Spark SQL工程中的\spark-2.2.0\sql\core\src\main\resources\META-INF\services\org.apache.spark.sql.sources.DataSourceRegister文件中。用戶通過將Provider實現類名稱添加到該文件中,從而完成Provider類的注冊工作。

如下所示在文件最后一行添加,我們自己自定義的實現類完整路徑和名稱:

org.apache.spark.sql.execution.datasources.csv.CSVFileFormat

org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider

org.apache.spark.sql.execution.datasources.json.JsonFileFormat

org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

org.apache.spark.sql.execution.datasources.text.TextFileFormat

org.apache.spark.sql.execution.streaming.ConsoleSinkProvider

org.apache.spark.sql.execution.streaming.TextSocketSourceProvider

org.apache.spark.sql.execution.streaming.RateSourceProvider

org.apache.spark.sql.execution.streaming.TextRabbitMQSourceProvider

 

4.4 使用API

  再Spark SQL源碼重新編譯后,並肩其jar包丟進Spark的jars路徑下。從而用戶就能夠像使用Structured Streaming自帶的數據輸入源一樣,使用用戶自定義的"RabbitMQ"數據輸入源了。即用戶只需將RabbitMQ字符串傳遞給format()方法,其使用方式和"socket"方式一樣,因為上述的數據源內容其實是Socket方式的實現內容。

5. 參考文獻

[1]. Structured Streaming Programming Guide.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM