Spark Structured Streaming目前的2.1.0版本只支持輸入源:File、kafka和socket。
1. Socket
Socket方式是最簡單的數據輸入源,如Quick example所示的程序,就是使用的這種方式。用戶只需要指定"socket"形式並配置監聽的IP和Port即可。
| val scoketDF = spark.readStream .format("socket") .option("host","localhost") .option("port", 9999) .load() |
注意:
Socket方式Streaming是接收UTF8的text數據,並且這種方式最后只用於測試,不要用戶端到端的項目中。
2. Kafka
Structured streaming提供接收kafka數據源的接口,用戶使用起來也非常方便,只是需要注意開發環境所依賴的特別庫,同時streaming運行環境的kafka版本。
2.1 開發環境
若以kafka作為輸入源,那么開發環境需要再引入所依賴的架包。如使用了Spark版本是2.1.0,那么maven的pom.xml文件中需要添加如下的依賴庫。
| <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0.10_2.11</artifactId> <version>2.1.0</version> </dependency> |
2.2 API
與使用socket作為輸入源類似,只需要指定"kafka"作為輸入源,同時傳遞kafka的server集和topic集。如下所示:
| // Subscribe to 1 topic val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
// Subscribe to multiple topics val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
// Subscribe to a pattern val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] |
2.3 運行環境
由於spark 2.1.0使用了kafka的版本是0.10,所以kafka server也要使用同樣版本,即發送數據的kafka也需要使用0.10版本。
否則會出現如下的錯誤:

圖 21
3. File
Structured Streaming可以指定一個目錄的文件作為數據輸入源,其中支持的文件格式有:text、csv、json、parquet。
如下所示:
| object StructuredFile{ def main(args:Array[String]){ val spark = SparkSession .builder .appName("StructuredNetWordCount") .getOrCreate() val userSchema = new StructType().add("name","string").add("age","integer") val jsonDF = spark .readStream .schema(userSchema) .json("/root/jar/directory")//Equivalent to format("json").load("/root/jar/directore") Val query = jsonDF.writeStream .format(console) .start() Query.awaitTermination() } } |
讀取文件的接口有5個:
-
format(source).load(path):source參數是指文件的形式,有text、csv、json、parquet四種形式;
-
text(path):其封裝了format("text").load(path);
-
json(path):其封裝了format("json").load(path);
-
csv(path):其封裝了format("csv").load(path);
-
parquet(path):其封裝了format("parquet").load(path);
其中path參數為文件的路徑,若該路徑發現新增文件,則會被以數據流的形式被獲取。但該路徑只能是指定的格式文件,不能存放其它文件格式。
注意:
若是以Spark集群方式運行,則路徑是hdfs種的文件路徑;若是以local方式執行,則路徑為本地路徑。
獲取的文件形式有四種,但並不是每種格式都需要調用schema()方法來配置文件信息:
-
csv、json、parquet:用戶需要通過schema()方法手動配置文件信息;
-
text:不需要用戶指定schema,其返回的列是只有一個"value"。
4) 自定義
若上述Spark Structured Streaming API提供的數據輸入源不能滿足要求,那么還有一種方法可以使用:修改源碼。
如下通過獲取"socket"數據源相應類的內容為例,介紹具體使用方式:
4.1 實現Provider
首先實現一個Provider,該類會返回一個數據的數據源對象。其中Provider實現類需要實現三個方法:
| 序號 |
方法 |
描述 |
| 1 |
souceSchema |
該方法返回一個配置信息的詞典,key是字符串,value是StructType對象 |
| 2 |
createSource |
該方法返回一個接受數據源的對象,其為Source接口的子類 |
| 3 |
shortName |
該方法返回一個數據源的標識符,如上述format()方法傳遞的參數:"socket"、"json"或"kafka";此時返回的字符串,就是format()方法傳遞的參數 |
如下所示實現一個TextRabbitMQSourceProvider類:
| class TextRabbitMQSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { private def parseIncludeTimestamp(params: Map[String, String]): Boolean = { Try(params.getOrElse("includeTimestamp", "false").toBoolean) match { case Success(bool) => bool case Failure(_) => throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"") } }
/** Returns the name and schema of the source that can be used to continually read data. */ override def sourceSchema( sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { logWarning("The socket source should not be used for production applications! " + "It does not support recovery.") if (!parameters.contains("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } if (!parameters.contains("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } val schema = if (parseIncludeTimestamp(parameters)) { TextSocketSource.SCHEMA_TIMESTAMP } else { TextSocketSource.SCHEMA_REGULAR } ("textSocket", schema) }
override def createSource( sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt new TextRabbitMQSource(host, port, parseIncludeTimestamp(parameters), sqlContext) }
/** String that represents the format that this data source provider uses. */ override def shortName(): String = "RabbitMQ" } |
4.2 實現Source
用戶需要實現一個真正接受數據的類,該類實例是由Provider實現類來實例化,如上述的createSource()方法。其中需要實現Source抽象類的幾個方法,從而讓Structured Streaming引擎能夠調用:
| 序號 |
方法 |
描述 |
| 1 |
getOffset |
獲取可用的數據偏移量,表明是否有可用的數據 |
| 2 |
getBatch |
獲取可用的數據,以DataFrame對象形式返回 |
| 3 |
commit |
傳遞已經接收的數據偏移量 |
| 4 |
stop |
聽着Source數據源 |
| class TextRabbitMQSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) extends Source with Logging {
@GuardedBy("this") private var socket: Socket = null
@GuardedBy("this") private var readThread: Thread = null
/** * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") protected val batches = new ListBuffer[(String, Timestamp)]
@GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1)
@GuardedBy("this") protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
initialize()
private def initialize(): Unit = synchronized { socket = new Socket(host, port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) readThread = new Thread(s"TextSocketSource($host, $port)") { setDaemon(true)
override def run(): Unit = { try { while (true) { val line = reader.readLine() if (line == null) { // End of file reached logWarning(s"Stream closed by $host:$port") return } TextSocketSource.this.synchronized { val newData = (line, Timestamp.valueOf( TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) ) currentOffset = currentOffset + 1 batches.append(newData) } } } catch { case e: IOException => } } } readThread.start() }
/** Returns the schema of the data from this source */ override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
override def getOffset: Option[Offset] = synchronized { if (currentOffset.offset == -1) { None } else { Some(currentOffset) } }
/** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { val startOrdinal = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
// Internal buffer only holds the batches after lastOffsetCommitted val rawList = synchronized { val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 batches.slice(sliceStart, sliceEnd) }
import sqlContext.implicits._ val rawBatch = sqlContext.createDataset(rawList)
// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp rawBatch.select("_1").toDF("value") } }
override def commit(end: Offset): Unit = synchronized { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") )
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
if (offsetDiff < 0) { sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") }
batches.trimStart(offsetDiff) lastOffsetCommitted = newOffset }
/** Stop this source. */ override def stop(): Unit = synchronized { if (socket != null) { try { // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to // stop the readThread is to close the socket. socket.close() } catch { case e: IOException => } socket = null } }
override def toString: String = s"TextSocketSource[host: $host, port: $port]" } |
4.3 注冊Provider
由於Structured Streaming引擎會根據用戶在format()方法傳遞的數據源類型來尋找具體數據源的provider,即在DataSource.lookupDataSource()方法中尋找。所以用戶需要將上述實現的Provider類注冊到Structured Streaming引擎中。所以用戶需要將provider實現類的完整名稱添加到引擎中的某個,這個地方就是在Spark SQL工程中的\spark-2.2.0\sql\core\src\main\resources\META-INF\services\org.apache.spark.sql.sources.DataSourceRegister文件中。用戶通過將Provider實現類名稱添加到該文件中,從而完成Provider類的注冊工作。
如下所示在文件最后一行添加,我們自己自定義的實現類完整路徑和名稱:
| org.apache.spark.sql.execution.datasources.csv.CSVFileFormat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider org.apache.spark.sql.execution.streaming.TextRabbitMQSourceProvider |
4.4 使用API
再Spark SQL源碼重新編譯后,並肩其jar包丟進Spark的jars路徑下。從而用戶就能夠像使用Structured Streaming自帶的數據輸入源一樣,使用用戶自定義的"RabbitMQ"數據輸入源了。即用戶只需將RabbitMQ字符串傳遞給format()方法,其使用方式和"socket"方式一樣,因為上述的數據源內容其實是Socket方式的實現內容。
5. 參考文獻
[1]. Structured Streaming Programming Guide.
