spark 的 structured streaming 狀態保存


 

狀態保存:

        structured  streaming 提供了兩個自定義分組聚合函數:mapGroupsWithState,flatMapGroupsWithState,允許開發者基於事件時間或者處理時間進行有狀態的流計算。
 
        簡單來說,可以將每一次的流計算的group結果,保存下來,用於下一次繼續聚合。
 
 

實現方式:

        mapGroupsWithState與flatMapGroupsWithState方法是KeyValueGroupedDataset類的方法,要想調用該方法,必須先創建KeyValueGroupedDataset,由於該類不能由用戶直接生成,須由已經存在的Dataset類型轉換成KeyValueGroupedDataset。
        Dataset    =》   KeyValueGroupedDataset實現:
                A Dataset has been logically grouped by a user specified grouping key,Users should not construct a KeyValueGroupedDataset directly, but should instead call groupByKey on an existing Dataset.
                翻譯:用戶不能自己創建KeyValueGroupedDataset,必須對已經存在的Dataset調用group by  生成。
        常規實現有:
                xxx.groupByKey[String](......)
        
 

超時時間設置:

        超時時間的設置模式:

                    處理時間   :   processtime
                            數據到達spark后,spark給它添加的接收到該數據時的時間
                    事件時間   : eventtime
                            事件事件,是數據上報時,自己記錄的發生時間。
                    
 

        處理時間模式:

                如果選擇了基於”處理時間“,則超時的duration是通過GroupState.setTimeoutDuration來設定的,但是超時並非是嚴格地發生在設定的duration的時間點上,而是在當剛剛超過duration的那次trigger發生時,所以說timeout並沒有一個嚴格的“上限”(也就是最晚發生的時間,相對應的“下限“是指最早的發生時間,這個時間是剛好在duration的時間點上)。
                舉個例子:如果流上在最近一段時間沒有任何數據,則整個負責維護狀態的函數根本不會被觸發,也就不會執行任何超時檢測,直到有新的數據到達,這樣有可能會導致在很長時間之后才會觸發超時處理,而並不是在規定的那個超時時間點上觸發的,這就是所謂的timeout並沒有嚴格的“上限”的意思。
               超時后,並不會立刻處理數據,只有當數據來了,觸發了trigger才會開始執行清理操作,也就是沒有數據時,就算超時了之前的狀態也會保存。
 
                處理時間超時依賴本機時間,時區等。
                

        事件時間模式

                如果選擇的是基於“事件時間”,首先要開啟基於事件時間框架,即必須在查詢中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()設定超時,setTimeoutTimestamp有兩類版本,一類是設定一個固定時間戳,另一類是在一個指定的時間戳上再指定一個duration, 前者適用於那些有明確超時時間點的場景,后者適用於那些在某個最新的事件時間上再追加一個duration的場景。
                超時后,並不會立刻處理數據,只有當數據來了,watermark時間更新后與設置做對比,才能判斷做什么操作,也就是沒有數據時,就算超時了之前的狀態也會保存。
                但是有一點是非常重要和清楚的,就是這個每次設定的超時時間戳是不能晚於watermark的!
                
 

        共同點:

                超時后,並不會立刻處理數據,只有當數據來了才會做對應的group state更新,也就是沒有數據時,就算超時了之前的狀態也會保存。
 
 

        時間設置:

                時間包括兩個參數,超時時間點,和可以忍受的時間范圍
                超時時間點:可以是未來的一個准確時間(時間戳)-當前時間(時間戳)得到的一個毫秒數
                                        還可以是一個准確的日期。
                                        具體的處理,看選擇哪個類型的函數(此處函數重載較多)。
 

GroupState特質:

        原文鏈接:

        使用GroupState 特質才可以操作狀態數據
 

        GroupState特質參數

                1、groupby 后的key
                2、一個迭代器,存儲的是每個group by 后的key內容
                3、用戶自定義的狀態(state object)對象
 

        GroupState特質使用注意:

                1、state的內容不能為null,否則update(用於設置和更新state內容)報錯
                2、為了讓線程之間數據共享,state的內容非線程安全
                3、使用remove后,exists返回false,get會報錯,getOption會返回none
                4、update調用后,exists返回true。get和getOption返回更新的值
 

        GroupStateTimeout

                1、groupstate的超時,可以是基於時間(spark本身的時間),也可以是基於事件時間(數據中記錄的時間)。基於時間的超時,一般設置為指定時間內,沒有接收數據則刪除內存記錄,如:state.setTimeoutDuration("1 hour")。基於事件時間的任務超時,依賴watermark時間,如:groupState.setTimeoutTimestamp(timestamp)
                2、GroupStateTimeout的設置項是全局共享的
 

        ProcessingTimeTimeout:

                1、當且僅當在預設置的時間點上,超出了dms,才會觸發超時處理。(這里可知道,存在兩個參數,超時時間點,以及超時后允許的范圍窗口時間)
                2、超時處理是沒有一個嚴格的處理時間,雖然我們設置了超時參數 + 超時窗口參數,定制了處理時間。但是內部的超時觸發器,依賴外部的數據輸入,只有有數據輸入時,超時觸發器才會去處理超時數據,但是假如系統最后一個數據輸入完后,沒有后續的輸入數據,則關於這個數據的超時數據會一直存在系統的內存上。
                3、超時處理依賴系統時間,系統時間錯了,自然會有錯誤處理
 

        EventTime Timeout:

                如果選擇的是基於“事件時間”,首先要開啟基於事件時間框架,即必須在查詢中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()設定超時,setTimeoutTimestamp有兩類版本,一類是設定一個固定時間戳,另一類是在一個指定的時間戳上再指定一個duration, 前者適用於那些有明確超時時間點的場景,后者適用於那些在某個最新的事件時間上再追加一個duration的場景。但是有一點是非常重要和清楚的,就是這個每次設定的超時時間戳是不能晚於watermark的!因為Spark在基於”事件時間“ 判定超時的原則就是:當且僅當watermark超過(晚於)了設定的timeout!某種意義上,我們可以認為watermark是當前“狀態實例”(不是GroupState,而是它包裹的那個State對象)認定”存活“的一個”開始“的時間界限,timeout是當前”狀態實例“認定”存活“的一個”截止“的時間界限,如果開始的時間比截止的時間還要晚,說明這個狀態實例超時了!最后,在基於”事件時間“時,對於實際的超時時間也是沒有一個准確的”上限“的,這和基於“事件時間”的超時判定的原因是一樣的,因為watermark也是在流中有新數據時才會被觸發更新,進而計算超時的時間。
 
 
 
 

狀態保存的使用方法:

        在官方文檔找:KeyValueGroupedDataset類即可找到
 
        mapGroupsWithState函數:
                def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout): Dataset[U]
                def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U]): Dataset[U]
                def mapGroupsWithState[S, U](timeoutConf: GroupStateTimeout)(func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
                def mapGroupsWithState[S, U](func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
                        下面的示例采用該方式執行,將該方法柯西化后執行的
 
        flatMapGroupsWithState函數:
                
 
 
        兩種狀態保存的區別:
                mapGroupsWithState  要求返回,且必須返回一條記錄
                flatMapGroupsWithState:可以返回多條,或者不返回記錄
 
 

測試框架:

            寫數據庫   ---  maxwell --->  kafka   ------>   spark 匯總
 

注意watermark的用法:

            數據在傳給spark后,最終按多久匯總一次數據由Trigger.ProcessingTime("1 minutes")決定的。該方法是按照一定時間匯總數據,與另一種持序及時處理的模式稍有不同。
 
另外注意處理時間問題,否則會出現各種各樣的問題。
        1、是在創建sparkSession時,先將時區設置好
        2、是在spark中的提示信息中,展示時間是UTC的時間
 

一個代碼實例:

 
package com.test
 
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Timestamp
import java.util.Date
import java.text.SimpleDateFormat
 
import org.apache.log4j.Logger
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}
 
case class NodeFIleVirusesDF(
                              id:Int,
                              nid:Int,
                              virus_name:String,
                              virus_op:Int,
                              virus_findby:Int,
                              virus_type:Int,
                              find_time:Timestamp,
                              create_time:String
                          )
case class NodeFIleViruses(
                              id:Int,
                              nid:Int,
                              virus_name:String,
                              virus_op:Int,
                              virus_findby:Int,
                              virus_type:Int,
                              find_time:Long,
                              create_time:String
                          )
case class DBMessage(
                    database:String,
                    table:String,
                    type1:String,
                    ts:Long,
                    xid:Int,
                    xoffset:Int,
                    data: NodeFIleViruses
                    )
 
object proj2 {
    def handleAllInfo(value: String): NodeFIleVirusesDF ={
        val gson = new Gson()
        val dbinfo = gson.fromJson(value, classOf[DBMessage])
        val t = new Timestamp(  dbinfo.data.find_time  * 1000)
        NodeFIleVirusesDF(
            dbinfo.data.id,
            dbinfo.data.nid,
            dbinfo.data.virus_name,
            dbinfo.data.virus_op,
            dbinfo.data.virus_findby,
            dbinfo.data.virus_type,
            t,
            dbinfo.data.create_time
        )
    }
 
    def main(args: Array[String]): Unit = {
        val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
 
        val spark = SparkSession
            .builder
            .config("spark.sql.session.timeZone", "Asia/Shanghai")                                            //寫時區,不寫時區的話,所有的本地時間 ,必須減8小時才能有效
            .appName("StructuredNetworkWordCount")
            .master(" spark://192.168.10.40:7077")
            .getOrCreate()
 
        import spark.implicits._
 
        val kafkadf = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "192.168.10.40:9092")
            .option("subscribe", "jmvirus")
            .option("startingOffsets", "latest")
            .load()
 
        val infods = kafkadf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
            .as[(String, String)]
        //spark.sparkContext.setLogLevel()  :DataSet[(String, String)]
        log.info("start-----------------------------")
 
        val infodf = infods.map(value => value._2)
            .map(value => handleAllInfo(value))
            .toDF()
 
        val result = infodf
            .withWatermark("find_time", "3 minutes")
            .groupByKey[String]((row: Row) =>{
                val currentet = sdf2.format(new Date(row.getTimestamp(6).getTime))
                currentet + "," + row.getAs[String](1) + "," + row.getAs[String](2)   //nid , virusname
            })
            .mapGroupsWithState[(String, Long), (String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState)=>{
                var count = 0L
                var key = "null"
                val tarr = timeAndWord.split(',')
                if(groupState.hasTimedOut){                                         //根據timeout刪除內存上保存的數據
                    groupState.remove()
                } else if( groupState.exists){                                            //由於每個數據在group后都會執行這里的邏輯,我們也可以避開timeout設置,自己配置一個刪除內存數據的判斷條件
                                                                                                       //當然,自己設置的條件和spark structured本身的缺陷一樣,需要后一條數據來觸發判斷
                    key = tarr(1) + "," + tarr(2)
                    count = groupState.getOption.getOrElse((key, 0L))._2 + iterator.size
                    groupState.update(key, count)
                }else{
                    key = tarr(1) + "," + tarr(2)
                    count = iterator.size
                    groupState.update(key, count)
                    val arr = timeAndWord.split((","))
                    val timestamp = sdf2.parse(arr(0)).getTime
                    groupState.setTimeoutTimestamp(timestamp)            //無論如何,用戶這里設置的超時設置 + duration時間,必須小於此時的時間+watermark的允許漂移時間,才有意義
                }
 
                if(count != 0){
                    if (key != "null"){
                        (key, count)
                    }
                    else{
                        (timeAndWord, count)
                    }
                }else{
                    null
                }
            }).filter( _ != null).toDF("key", "count")
 
        //直接輸出到屏幕,但是顯示不全,使用文件保存查閱數據
        val query = result.writeStream
            //.outputMode("append")   //select
            .outputMode("update")   // group by
            //.trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.ProcessingTime("1 minutes"))
            .format("console")
            .start()
 
        query.explain()
 
        query.awaitTermination()
    }
}
 
 
數據上報測試腳本:
# _*_coding:utf-8_*_
 
import time
import random
import datetime
 
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Date, BigInteger, TIMESTAMP,DateTime
from sqlalchemy.orm import sessionmaker
 
# 創建連接
engine = create_engine('mysql+ pymysql://root:lsl@192.168.10.40/testdata',
                       encoding='utf-8')
Base = declarative_base()  # 生成orm基類
 
class NodeFileViruses(Base):
    __tablename__ = 'node_file_viruses'   # table的名字
    id = Column(BigInteger, primary_key=True)  # 創建id屬性
    nid = Column(Integer)
    virus_name = Column(String(36))  # 創建name屬性
    virus_op = Column(Integer)
    virus_findby = Column(Integer)
    virus_type = Column(Integer)
    find_time = Column(BigInteger)
    create_time = Column(DateTime, default=datetime.datetime.now) #
 
    def __repr__(self):  # 用於進行查找時的數據返回
        return '<%s name :%s>' %(self.id, self.nid)
Base.metadata.create_all(engine) # 進行指令的調用,即生成table
Session_class = sessionmaker(bind=engine)  # 進行數據庫的連接
Session = Session_class() # 生成session 實例
 
for i in range(5):
    vnid = i
    vtype = i
    vop = i
    vfind = i
    vname = 'vname' + str(i)
 
    nfv = NodeFileViruses(
        nid=vnid,
        virus_name=vname,
        virus_op=vop,
        virus_findby=vfind,
        virus_type=vtype,
        find_time=int(time.time())
 
    )
    Session.add(nfv)
 
 
 
測試結果:
        以下僅僅時記錄了部分結果
        其中count=2,是因為在一分鍾內,執行了上面的操作兩次
Batch: 3
-------------------------------------------
+--------+-----+
|     key|count|
+--------+-----+
|3,vname3|    2|
|2,vname2|    2|
|1,vname1|    2|
|0,vname0|    2|
|4,vname4|    2|
+--------+-----+
 
20/10/06 17:52:03 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@577bf8f6 committed.
20/10/06 17:52:03 INFO SparkContext: Starting job: start at proj2.scala:152
20/10/06 17:52:03 INFO DAGScheduler: Job 7 finished: start at proj2.scala:152, took 0.000042 s
20/10/06 17:52:03 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "741f6382-3195-4350-8bdd-84f553dd8d74",
  "runId" : "dffe982c-e7b5-43ae-b74d-70e6d4ccebec",
  "name" : null,
  "timestamp" : "2020-10-06T09:52:00.001Z",                     //UTC展示的時間,與spark本身有關   代碼中使用的是北京時間,通過spark配置文件可以解決
  "batchId" : 3,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.1666638889351844,
  "processedRowsPerSecond" : 3.2948929159802307,
  "durationMs" : {
    "addBatch" : 2945,
    "getBatch" : 8,
    "getOffset" : 1,
    "queryPlanning" : 49,
    "triggerExecution" : 3035,
    "walCommit" : 30
  },
  "eventTime" : {
    "avg" : "2020-10-06T09:51:41.000Z",
    "max" : "2020-10-06T09:51:43.000Z",
    "min" : "2020-10-06T09:51:39.000Z",
    "watermark" : "2020-10-06T09:40:21.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 15,
    "numRowsUpdated" : 5,
    "memoryUsedBytes" : 24087
  } ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[jmvirus]]",
    "startOffset" : {
      "jmvirus" : {
        "0" : 20575
      }
    },
    "endOffset" : {
      "jmvirus" : {
        "0" : 20585
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 0.1666638889351844,
    "processedRowsPerSecond" : 3.2948929159802307
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2db704e"
  }
}
 
 
maxwell轉發的部分數據:
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":0,"data":{"id":21581,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":1,"data":{"id":21582,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":2,"data":{"id":21583,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":3,"data":{"id":21584,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"commit":true,"data":{"id":21585,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":0,"data":{"id":21586,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":1,"data":{"id":21587,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":2,"data":{"id":21588,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":3,"data":{"id":21589,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"commit":true,"data":{"id":21590,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
 
 
 
 
 
一個特別注意點:
            以上實例中,將id和nid定義成Int類型,因此可以轉換成String,但是假如把id和nid定義成long類型,以上程序就無法運行。
            報錯為:
                    java.lang.Long  cannot be cast to java.lang.String.
                    在java中long和String是可以互相轉換的。但是這里不能轉換。出現了強制類型轉換錯誤的問題。
                    感覺程序內部做了什么特殊的處理,導致long變成了子類,String變成了父類。子類的指針沒法引用父類的對象。
 
            解決方案也很簡單,將long轉換成String即可。在以上代碼中String類型轉換成long是沒有問題的。
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM