Structured Streaming教程(2) —— 常用輸入與輸出


上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其實是一個無下界的無限遞增的DataFrame。基於這個DataFrame,我們可以做一些基本的select、map、filter操作,也可以做一些復雜的join和統計。本篇就着重介紹下,Structured Streaming支持的輸入輸出,看看都提供了哪些方便的操作。

數據源

Structured Streaming 提供了幾種數據源的類型,可以方便的構造Steaming的DataFrame。默認提供下面幾種類型:

File:文件數據源

file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:

package xingoo.sstreaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

object FileInputStructuredStreamingTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._
    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val lines = spark.readStream
      .option("sep", ";")
      .schema(userSchema)
      .csv("file:///Users/xingoo/IdeaProjects/spark-in-action/data/*")

    val query = lines.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

這樣,在對應的目錄下新建文件時,就可以在控制台看到對應的數據了。

aaa;1
bbb;2
aaa;5
ddd;6

還有一些其他可以控制的參數:

  • maxFilesPerTrigger 每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。
  • latestFirst 是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。
  • fileNameOnly 是否只監聽固定名稱的文件。

socket網絡數據源

在我們自己練習的時候,一般都是基於這個socket來做測試。首先開啟一個socket服務器,nc -lk 9999,然后streaming這邊連接進行處理。

  spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

kafka數據源

這個是生產環境或者項目應用最多的數據源,通常架構都是:

應用數據輸入-->kafka-->spark streaming -->其他的數據庫

由於kafka涉及的內容還比較多,因此下一篇專門介紹kafka的集成。

輸出

在配置完輸入,並針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:

  • format : 配置輸出的格式
  • output mode:輸出的格式
  • query name:查詢的名稱,類似tempview的名字
  • trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
  • checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。

output Mode

詳細的來看看這個輸出模式的配置,它與普通的Spark的輸出不同,只有三種類型:

  • complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
  • append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
  • update,把此次新增的數據輸出,並更新整個dataframe。有點類似之前的streaming的state處理。

輸出的類型

Structed Streaming提供了幾種輸出的類型:

  • file,保存成csv或者parquet
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()
  • console,直接輸出到控制台。一般做測試的時候用這個比較方便。
noAggDF
  .writeStream
  .format("console")
  .start()
  • memory,可以保存在內容,供后面的代碼使用
aggDF
  .writeStream
  .queryName("aggregates")
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()  
  • foreach,參數是一個foreach的方法,用戶可以實現這個方法實現一些自定義的功能。
writeStream
    .foreach(...)
    .start()

這個foreach的功能很強大,稍后也會詳細的說明。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM