StructuredStreaming基礎操作和窗口操作


一、流式DataFrames/Datasets的結構類型推斷與划分

  ◆ 默認情況下,基於文件源的結構化流要求必須指定schema,這種限制確保即
  使在失敗的情況下也會使用一致的模式來進行流查詢。
  ◆ 對於特殊用例,可以通過設置spark.sql.streaming.schemaInference = true。
  此時將會開啟Spark自動類型推斷功能。
  ◆ 注意:默認Spark sql中自動類型推斷為啟動狀態。
  ◆ 當讀取數據的目錄中出現/key=value/ 的子目錄時,Spark將自動遞歸這些子目
  錄,產生分區發現。
  ◆ 如果用戶提供的 schema 中出現了這些列, Spark將會根據正在讀取的文件路
  徑進行填充。
  ◆ 構成分區結構的目錄必須在查詢開始時是存在的,並且必須保持static 。
  ➢ 例如,當 /data/year=2015/ 存在時,可以添加 /data/year=2016/,但是更改
  分區列將無效的(即通過創建目錄 /data/date=2016-04-17/ )。
  ◆ 注意:如果希望得到的數據可以按照/key=value/這種目錄生成時,可以在輸出
  數據時借助於partitionBy(“columnName”)

 

二、流式DataFrames/Datasets的操作

  ◆ 基礎操作-Selection, Projection, Aggregation
  ◆ 基於Event Time的窗口操作
  ◆ 連接操作
  ◆ 流式去重操作
  ◆ 任意狀態運算
  ◆ 不支持操作

2.1 基礎操作-Selection, Projection, Aggregation

  ◆ DataFrames/Datasets上的大多數常用操作都支持流式運算。(后面在討論不
  支持的操作)
  ◆ 例如:
  ➢ case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime) ➢ val df: DataFrame = …
  ➢ val ds: Dataset[DeviceData] = df.as[DeviceData]
  ➢ df.select("device").where("signal > 10") ➢ ds.filter(_.signal > 10).map(_.device) ➢ df.groupBy("deviceType").count() ➢ import org.apache.spark.sql.expressions.scalalang.typed
  ➢ ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))

 

  ◆ 可以注冊一個流式DataFrames/DataSets作為臨時視圖,使用SQL命令做查
  詢操作。
  ➢ df.createOrReplaceTempView(“ updates”) ➢ spark.sql(“ select count(*) from updates”) ➢ df.isStreaming

2.2基於Event Time的窗口操作

  ◆ 基於結構化流的滑動事件時間窗口的聚合操作比較簡單,與分組聚合非常相似。
  在分組聚合中,按照用戶指定的列進行分組聚合。在基於窗口的聚合中,按照
  每個窗口進行聚合操作

  ◆ 案例模型:實時處理流單詞統計的窗口操作示意圖

   

  ◆ 窗口操作類似於分組操作
  ◆ 例子:可以使用groupBy()和window()操作來表示窗口聚合。 ➢ import spark.implicits._ ➢ val words: DataFrame = ... // schema { timestamp: Timestamp, word: String }
  ➢ val windowedCounts = words.groupBy( window($"timestamp"
  , "10 minutes"
  , "5 minutes"), $"word" ).count()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM