一、流式DataFrames/Datasets的結構類型推斷與划分
◆ 默認情況下,基於文件源的結構化流要求必須指定schema,這種限制確保即
使在失敗的情況下也會使用一致的模式來進行流查詢。
◆ 對於特殊用例,可以通過設置spark.sql.streaming.schemaInference = true。
此時將會開啟Spark自動類型推斷功能。
◆ 注意:默認Spark sql中自動類型推斷為啟動狀態。
◆ 當讀取數據的目錄中出現/key=value/ 的子目錄時,Spark將自動遞歸這些子目
錄,產生分區發現。
◆ 如果用戶提供的 schema 中出現了這些列, Spark將會根據正在讀取的文件路
徑進行填充。
◆ 構成分區結構的目錄必須在查詢開始時是存在的,並且必須保持static 。
➢ 例如,當 /data/year=2015/ 存在時,可以添加 /data/year=2016/,但是更改
分區列將無效的(即通過創建目錄 /data/date=2016-04-17/ )。
◆ 注意:如果希望得到的數據可以按照/key=value/這種目錄生成時,可以在輸出
數據時借助於partitionBy(“columnName”)
二、流式DataFrames/Datasets的操作
◆ 基礎操作-Selection, Projection, Aggregation
◆ 基於Event Time的窗口操作
◆ 連接操作
◆ 流式去重操作
◆ 任意狀態運算
◆ 不支持操作
2.1 基礎操作-Selection, Projection, Aggregation
◆ DataFrames/Datasets上的大多數常用操作都支持流式運算。(后面在討論不
支持的操作)
◆ 例如:
➢ case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime) ➢ val df: DataFrame = …
➢ val ds: Dataset[DeviceData] = df.as[DeviceData]
➢ df.select("device").where("signal > 10") ➢ ds.filter(_.signal > 10).map(_.device) ➢ df.groupBy("deviceType").count() ➢ import org.apache.spark.sql.expressions.scalalang.typed
➢ ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))
◆ 可以注冊一個流式DataFrames/DataSets作為臨時視圖,使用SQL命令做查
詢操作。
➢ df.createOrReplaceTempView(“ updates”) ➢ spark.sql(“ select count(*) from updates”) ➢ df.isStreaming
2.2基於Event Time的窗口操作
◆ 基於結構化流的滑動事件時間窗口的聚合操作比較簡單,與分組聚合非常相似。
在分組聚合中,按照用戶指定的列進行分組聚合。在基於窗口的聚合中,按照
每個窗口進行聚合操作
◆ 案例模型:實時處理流單詞統計的窗口操作示意圖
◆ 窗口操作類似於分組操作
◆ 例子:可以使用groupBy()和window()操作來表示窗口聚合。 ➢ import spark.implicits._ ➢ val words: DataFrame = ... // schema { timestamp: Timestamp, word: String }
➢ val windowedCounts = words.groupBy( window($"timestamp"
, "10 minutes"
, "5 minutes"), $"word" ).count()