[Spark]-結構化流之輸出篇(待重修)


5.結構化流的輸出

  一旦定義好了streaming DataFrame/Dataset的最終結果,剩下的就是一些計算輸出了.為此,必須使用 DataStreamWriter通過 Dataset.writeStream() 返回.此時必須以下一個或多個

   輸出落地 的詳細信息:  Data format, location 等等

   輸出模式(Output mode)

   查詢名稱(Query name)  可選,指定用於標識的查詢的唯一名稱

   觸發器間隔(Trigger interval):可選地,指定觸發間隔。如果沒有指定,系統將檢查盡快獲得新數據前處理完成。如果一個觸發時間錯過了,因為前面的處理還沒有完成,則系統將觸發立即處理

   檢查點位置(Checkpoint location) 對於可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統將寫入所有 checkpoint (檢查點)信息的位置。

                   這應該是與 HDFS 兼容的容錯文件系統中的目錄.

  5.1 輸出模式

    追加(append)(默認) 這是默認的模式.在上次觸發后,只有新行添加到結果表(result-table)才會觸發。

             這是只支持那些查詢,行添加到結果表永遠不會改變,因為這種模式保證每一行將只輸出一次。例如,只有選擇查詢,地圖,flatMap,過濾,加入等將支持附加模式

    完全(complete) 每次觸發時都會將整個結果表(result-table)輸出.例如聚合結果等

    更新(update) 只有結果表在上次被觸發后被更新才會觸發

    不同的查詢支持不同的輸出模式,具體見下:

查詢類型   支持模式 描述
帶聚合的查詢 聚合帶事件時間的水印 Append, Update, Complete

Append 使用水印來刪除舊的聚集狀態 窗口將在水印最終閾值的時候聚合.所以結果將在水印過期,最終結果確定的情況下添加進結果表

Update 使用水印來刪除舊的聚集狀態

Complete 不會使用水印來刪除舊的聚集狀態

不帶水印的聚合 Complete, Update 由於沒有使用水印,舊的聚集狀態不會被刪除.不支持append,因為對整數據的聚合結果會不斷更新,不符合append的語義
mapGroupsWithState Update  
flatMapGroupsWithState 追加操作模式 Append flatMapGroupsWithState 之后允許 Aggregations (聚合)
  更新操作模式 Update flatMapGroupsWithState 之后不允許 Aggregations (聚合)。
帶join的查詢 Append join不支持Update,Complete模式
其它查詢 Append, Update 不支持Complete.因為非聚合數據的結果表,全部保存進結果表時不允許的

  5.2 輸出落地

      5.2.1 落地文件

        落地文件,是適合長期保存.這里的文件是指HDFS等任何Hadoop支持的文件系統.

        落地文件,支持自動分區.並且支持有且僅有一次的容錯        

          val query =
            streamingSelectDF
              .writeStream
              .format("parquet")
              .option("path", "文件保存路徑")
              .partitionBy("zip", "day") //分區字段
              .trigger(ProcessingTime("25 seconds"))
              .start()

 

    5.2.2 落地Kafka

      kafka專章描述,具體見后

    5.2.3 落地控制台(debug使用)

       輸出被直接打印到控台或者stdout日志

    5.2.4 落地Memory接收器(debug使用)

    5.2.5 foreach

       foreach落地,是用戶自定義輸出的一種方式.foreach需要用戶實現 ForeachWriter 類.實際處理取決於用戶如何實現.(foreach只能使用在Scala/Java中)

      它將在觸發器(trigger)之后生成結果行時調用一個用戶實現.

      注意: foreach的實現載體是多個executor中的某一個

      5.2.5.1 實現

        open: writer 初始化時執行(例如打開連接,開啟事務等).它具有兩個入參: version=>每個觸發器單調遞增的ID  partitionId =>分區ID.

           open將返回一個布爾值.當為false時,后續將不會產生任何調用.用戶可以根據自己的邏輯,用這個返回值指出本次輸出后續是否還有必要執行

        proccess:根據open的返回值決定是否需要執行.

        close:無論open返回什么值,close必然會執行.這里適合用戶做一些資源回收操作       

            class WriterToMysql extends ForeachWriter[Row] {
            private var connection: Connection = null;
            
            override def open(partitionId: Long, version: Long): Boolean = {
                Class.forName("com.mysql.jdbc.Driver")
                connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false", "root", "12abAB")
                return true
            }
            
            override def process(value: Row): Unit = {
                val stmt = connection.createStatement()
                try {
                stmt.execute(
                    s"""
                    |replace into `test`.`structured_streaming_window_watermark_app_test`
                    |(`window`,`word`,`count`)
                    |VALUES
                    |('${value.get(0).toString}','${value.get(1)}',${value.getLong(2)});
                    """.stripMargin)
                }
                finally {
                stmt.close()
                }
            }
            
            override def close(errorOrNull: Throwable): Unit = {
                try {
                if (connection != null) {
                    connection.close()
                }
                }
                finally {}
            }
            }
            //使用
            val query = windowWordCounts.writeStream.outputMode("update")
                .foreach(new WriterToMysql())
                .option("truncate", "false")
                .start();

    5.2.6 輸出落地可以承載的輸出模式

 

接收器 支持的輸出模式 可選項 容錯機制 備注
文件 Append path 支持容錯(有且僅有一次) 支持分區
kafka Append, Update, Complete 詳見kafka專章 支持容錯(最少一次)  
foreach Append, Update, Complete   取決於foreach的實現  
控制台 Append, Update, Complete numRows:每次打印的行數(默認20),truncate:輸出太長時截斷(默認true) 不支持容錯  
內存 Append, Complete   不支持容錯,但重啟時重建整張結果表  

  5.3 觸發器

    Spark2.3 已經具有以下四種觸發器

    默認觸發器

      如果不設置任何觸發器,將默認使用此觸發器.觸發規則是:上一個微批處理結束,啟動下一個微批處理

    時間間隔觸發(Fixed interval micro-batches)

      設置了觸發間隔的觸發器. 

        如果上一個微批處理在觸發間隔前結束,會等待間隔時間結束后再觸發.

        如果在觸發間隔前還沒有處理結束.后續暫時不會觸發,會等待上個微批處理結束后再立即啟動

      .trigger(Trigger.ProcessingTime("2 seconds"))

    只觸發一次(One-time micro-batch)

      觸發一次處理全部數據,然后自然停止.適合用在關閉集群時數據的最后處理和清理

      .trigger(Trigger.Once())

 

    連續觸發器(Continuous with fixed checkpoint interval)

      連續處理是Spark2.3才提供的觸發器.它是以一個低延遲,連續處理模式(沒有間隔概念)的模式進行處理.

      .trigger(Trigger.Continuous("1 second"))

      不過很可惜,Spark2.3中連續處理依然只是一個實驗版本,在此只做一個簡單介紹.

        連續處理會啟動多個長時間的運行的任務.這些任務不斷地從源中讀取數據、處理數據並不斷地向接收器寫入數據。查詢所需的任務數量取決於查詢可以並行地從源中讀取多少個分區

          因此,在開始一個連續的處理查詢之前,必須確保集群中有足夠多的內核並行執行所有任務.(10分區的Kafka,必須至少有10個內核才能進行查詢)

        現階段連續處理只能接收Map系操作,還不支持聚合系操作.

        支持的數據源只Kafka,Rate兩種

        不支持失敗任務的重試.任何失敗都會導致查詢終止,必須手動的從檢查點恢復.

      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM