上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其實是一個無下界的無限遞增的DataFrame。基於這個DataFrame,我們可以做一些基本的select、map、filter操作,也可以做一些復雜的join和統計。本篇就着重介紹下,Structured Streaming支持的輸入輸出,看看都提供了哪些方便的操作。
數據源
Structured Streaming 提供了幾種數據源的類型,可以方便的構造Steaming的DataFrame。默認提供下面幾種類型:
File:文件數據源
file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:
package xingoo.sstreaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
object FileInputStructuredStreamingTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local")
.appName("StructuredNetworkWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val lines = spark.readStream
.option("sep", ";")
.schema(userSchema)
.csv("file:///Users/xingoo/IdeaProjects/spark-in-action/data/*")
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
這樣,在對應的目錄下新建文件時,就可以在控制台看到對應的數據了。
aaa;1
bbb;2
aaa;5
ddd;6
還有一些其他可以控制的參數:
- maxFilesPerTrigger 每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。
- latestFirst 是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。
- fileNameOnly 是否只監聽固定名稱的文件。
socket網絡數據源
在我們自己練習的時候,一般都是基於這個socket來做測試。首先開啟一個socket服務器,nc -lk 9999
,然后streaming這邊連接進行處理。
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
kafka數據源
這個是生產環境或者項目應用最多的數據源,通常架構都是:
應用數據輸入-->kafka-->spark streaming -->其他的數據庫
由於kafka涉及的內容還比較多,因此下一篇專門介紹kafka的集成。
輸出
在配置完輸入,並針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:
- format : 配置輸出的格式
- output mode:輸出的格式
- query name:查詢的名稱,類似tempview的名字
- trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
- checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。
output Mode
詳細的來看看這個輸出模式的配置,它與普通的Spark的輸出不同,只有三種類型:
- complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
- append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
- update,把此次新增的數據輸出,並更新整個dataframe。有點類似之前的streaming的state處理。
輸出的類型
Structed Streaming提供了幾種輸出的類型:
- file,保存成csv或者parquet
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
- console,直接輸出到控制台。一般做測試的時候用這個比較方便。
noAggDF
.writeStream
.format("console")
.start()
- memory,可以保存在內容,供后面的代碼使用
aggDF
.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
- foreach,參數是一個foreach的方法,用戶可以實現這個方法實現一些自定義的功能。
writeStream
.foreach(...)
.start()
這個foreach的功能很強大,稍后也會詳細的說明。