狀態保存:
structured streaming 提供了兩個自定義分組聚合函數:mapGroupsWithState,flatMapGroupsWithState,允許開發者基於事件時間或者處理時間進行有狀態的流計算。
簡單來說,可以將每一次的流計算的group結果,保存下來,用於下一次繼續聚合。
實現方式:
mapGroupsWithState與flatMapGroupsWithState方法是KeyValueGroupedDataset類的方法,要想調用該方法,必須先創建KeyValueGroupedDataset,由於該類不能由用戶直接生成,須由已經存在的Dataset類型轉換成KeyValueGroupedDataset。
Dataset =》 KeyValueGroupedDataset實現:
A Dataset has been logically grouped by a user specified grouping key,Users should not construct a KeyValueGroupedDataset directly, but should instead call groupByKey on an existing Dataset.
翻譯:用戶不能自己創建KeyValueGroupedDataset,必須對已經存在的Dataset調用group by 生成。
常規實現有:
xxx.groupByKey[String](......)
超時時間設置:
超時時間的設置模式:
處理時間 : processtime
數據到達spark后,spark給它添加的接收到該數據時的時間
事件時間 : eventtime
事件事件,是數據上報時,自己記錄的發生時間。
處理時間模式:
如果選擇了基於”處理時間“,則超時的duration是通過GroupState.setTimeoutDuration來設定的,但是超時並非是嚴格地發生在設定的duration的時間點上,而是在當剛剛超過duration的那次trigger發生時,所以說timeout並沒有一個嚴格的“上限”(也就是最晚發生的時間,相對應的“下限“是指最早的發生時間,這個時間是剛好在duration的時間點上)。
舉個例子:如果流上在最近一段時間沒有任何數據,則整個負責維護狀態的函數根本不會被觸發,也就不會執行任何超時檢測,直到有新的數據到達,這樣有可能會導致在很長時間之后才會觸發超時處理,而並不是在規定的那個超時時間點上觸發的,這就是所謂的timeout並沒有嚴格的“上限”的意思。
超時后,並不會立刻處理數據,只有當數據來了,觸發了trigger才會開始執行清理操作,也就是沒有數據時,就算超時了之前的狀態也會保存。
處理時間超時依賴本機時間,時區等。
事件時間模式
如果選擇的是基於“事件時間”,首先要開啟基於事件時間框架,即必須在查詢中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()設定超時,setTimeoutTimestamp有兩類版本,一類是設定一個固定時間戳,另一類是在一個指定的時間戳上再指定一個duration, 前者適用於那些有明確超時時間點的場景,后者適用於那些在某個最新的事件時間上再追加一個duration的場景。
超時后,並不會立刻處理數據,只有當數據來了,watermark時間更新后與設置做對比,才能判斷做什么操作,也就是沒有數據時,就算超時了之前的狀態也會保存。
但是有一點是非常重要和清楚的,就是這個每次設定的超時時間戳是不能晚於watermark的!
共同點:
超時后,並不會立刻處理數據,只有當數據來了才會做對應的group state更新,也就是沒有數據時,就算超時了之前的狀態也會保存。
時間設置:
時間包括兩個參數,超時時間點,和可以忍受的時間范圍
超時時間點:可以是未來的一個准確時間(時間戳)-當前時間(時間戳)得到的一個毫秒數
還可以是一個准確的日期。
具體的處理,看選擇哪個類型的函數(此處函數重載較多)。
GroupState特質:
原文鏈接:
使用GroupState 特質才可以操作狀態數據
GroupState特質參數
1、groupby 后的key
2、一個迭代器,存儲的是每個group by 后的key內容
3、用戶自定義的狀態(state object)對象
GroupState特質使用注意:
1、state的內容不能為null,否則update(用於設置和更新state內容)報錯
2、為了讓線程之間數據共享,state的內容非線程安全
3、使用remove后,exists返回false,get會報錯,getOption會返回none
4、update調用后,exists返回true。get和getOption返回更新的值
GroupStateTimeout
1、groupstate的超時,可以是基於時間(spark本身的時間),也可以是基於事件時間(數據中記錄的時間)。基於時間的超時,一般設置為指定時間內,沒有接收數據則刪除內存記錄,如:state.setTimeoutDuration("1 hour")。基於事件時間的任務超時,依賴watermark時間,如:groupState.setTimeoutTimestamp(timestamp)
2、GroupStateTimeout的設置項是全局共享的
ProcessingTimeTimeout:
1、當且僅當在預設置的時間點上,超出了dms,才會觸發超時處理。(這里可知道,存在兩個參數,超時時間點,以及超時后允許的范圍窗口時間)
2、超時處理是沒有一個嚴格的處理時間,雖然我們設置了超時參數 + 超時窗口參數,定制了處理時間。但是內部的超時觸發器,依賴外部的數據輸入,只有有數據輸入時,超時觸發器才會去處理超時數據,但是假如系統最后一個數據輸入完后,沒有后續的輸入數據,則關於這個數據的超時數據會一直存在系統的內存上。
3、超時處理依賴系統時間,系統時間錯了,自然會有錯誤處理
EventTime Timeout:
如果選擇的是基於“事件時間”,首先要開啟基於事件時間框架,即必須在查詢中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()設定超時,setTimeoutTimestamp有兩類版本,一類是設定一個固定時間戳,另一類是在一個指定的時間戳上再指定一個duration, 前者適用於那些有明確超時時間點的場景,后者適用於那些在某個最新的事件時間上再追加一個duration的場景。但是有一點是非常重要和清楚的,就是這個每次設定的超時時間戳是不能晚於watermark的!因為Spark在基於”事件時間“ 判定超時的原則就是:當且僅當watermark超過(晚於)了設定的timeout!某種意義上,我們可以認為watermark是當前“狀態實例”(不是GroupState,而是它包裹的那個State對象)認定”存活“的一個”開始“的時間界限,timeout是當前”狀態實例“認定”存活“的一個”截止“的時間界限,如果開始的時間比截止的時間還要晚,說明這個狀態實例超時了!最后,在基於”事件時間“時,對於實際的超時時間也是沒有一個准確的”上限“的,這和基於“事件時間”的超時判定的原因是一樣的,因為watermark也是在流中有新數據時才會被觸發更新,進而計算超時的時間。
狀態保存的使用方法:
在官方文檔找:KeyValueGroupedDataset類即可找到
mapGroupsWithState函數:
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout): Dataset[U]
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U]): Dataset[U]
def mapGroupsWithState[S, U](timeoutConf: GroupStateTimeout)(func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
def mapGroupsWithState[S, U](func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
下面的示例采用該方式執行,將該方法柯西化后執行的
flatMapGroupsWithState函數:
兩種狀態保存的區別:
mapGroupsWithState 要求返回,且必須返回一條記錄
flatMapGroupsWithState:可以返回多條,或者不返回記錄
測試框架:
寫數據庫 --- maxwell ---> kafka ------> spark 匯總
注意watermark的用法:
數據在傳給spark后,最終按多久匯總一次數據由Trigger.ProcessingTime("1 minutes")決定的。該方法是按照一定時間匯總數據,與另一種持序及時處理的模式稍有不同。
另外注意處理時間問題,否則會出現各種各樣的問題。
1、是在創建sparkSession時,先將時區設置好
2、是在spark中的提示信息中,展示時間是UTC的時間
一個代碼實例:
package com.test
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Timestamp
import java.util.Date
import java.text.SimpleDateFormat
import org.apache.log4j.Logger
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}
case class NodeFIleVirusesDF(
id:Int,
nid:Int,
virus_name:String,
virus_op:Int,
virus_findby:Int,
virus_type:Int,
find_time:Timestamp,
create_time:String
)
case class NodeFIleViruses(
id:Int,
nid:Int,
virus_name:String,
virus_op:Int,
virus_findby:Int,
virus_type:Int,
find_time:Long,
create_time:String
)
case class DBMessage(
database:String,
table:String,
type1:String,
ts:Long,
xid:Int,
xoffset:Int,
data: NodeFIleViruses
)
object proj2 {
def handleAllInfo(value: String): NodeFIleVirusesDF ={
val gson = new Gson()
val dbinfo = gson.fromJson(value, classOf[DBMessage])
val t = new Timestamp( dbinfo.data.find_time * 1000)
NodeFIleVirusesDF(
dbinfo.data.id,
dbinfo.data.nid,
dbinfo.data.virus_name,
dbinfo.data.virus_op,
dbinfo.data.virus_findby,
dbinfo.data.virus_type,
t,
dbinfo.data.create_time
)
}
def main(args: Array[String]): Unit = {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val spark = SparkSession
.builder
.config("spark.sql.session.timeZone", "Asia/Shanghai") //寫時區,不寫時區的話,所有的本地時間 ,必須減8小時才能有效
.appName("StructuredNetworkWordCount")
.master("
spark://192.168.10.40:7077")
.getOrCreate()
import spark.implicits._
val kafkadf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.10.40:9092")
.option("subscribe", "jmvirus")
.option("startingOffsets", "latest")
.load()
val infods = kafkadf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
//spark.sparkContext.setLogLevel() :DataSet[(String, String)]
log.info("start-----------------------------")
val infodf = infods.map(value => value._2)
.map(value => handleAllInfo(value))
.toDF()
val result = infodf
.withWatermark("find_time", "3 minutes")
.groupByKey[String]((row: Row) =>{
val currentet = sdf2.format(new Date(row.getTimestamp(6).getTime))
currentet + "," + row.getAs[String](1) + "," + row.getAs[String](2) //nid , virusname
})
.mapGroupsWithState[(String, Long), (String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState)=>{
var count = 0L
var key = "null"
val tarr = timeAndWord.split(',')
if(groupState.hasTimedOut){ //根據timeout刪除內存上保存的數據
groupState.remove()
} else if( groupState.exists){ //由於每個數據在group后都會執行這里的邏輯,我們也可以避開timeout設置,自己配置一個刪除內存數據的判斷條件
//當然,自己設置的條件和spark structured本身的缺陷一樣,需要后一條數據來觸發判斷
key = tarr(1) + "," + tarr(2)
count = groupState.getOption.getOrElse((key, 0L))._2 + iterator.size
groupState.update(key, count)
}else{
key = tarr(1) + "," + tarr(2)
count = iterator.size
groupState.update(key, count)
val arr = timeAndWord.split((","))
val timestamp = sdf2.parse(arr(0)).getTime
groupState.setTimeoutTimestamp(timestamp) //無論如何,用戶這里設置的超時設置 + duration時間,必須小於此時的時間+watermark的允許漂移時間,才有意義
}
if(count != 0){
if (key != "null"){
(key, count)
}
else{
(timeAndWord, count)
}
}else{
null
}
}).filter( _ != null).toDF("key", "count")
//直接輸出到屏幕,但是顯示不全,使用文件保存查閱數據
val query = result.writeStream
//.outputMode("append") //select
.outputMode("update") // group by
//.trigger(Trigger.ProcessingTime(0))
.trigger(Trigger.ProcessingTime("1 minutes"))
.format("console")
.start()
query.explain()
query.awaitTermination()
}
}
數據上報測試腳本:
# _*_coding:utf-8_*_
import time
import random
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Date, BigInteger, TIMESTAMP,DateTime
from sqlalchemy.orm import sessionmaker
# 創建連接
engine = create_engine('mysql+
pymysql://root:lsl@192.168.10.40/testdata',
encoding='utf-8')
Base = declarative_base() # 生成orm基類
class NodeFileViruses(Base):
__tablename__ = 'node_file_viruses' # table的名字
id = Column(BigInteger, primary_key=True) # 創建id屬性
nid = Column(Integer)
virus_name = Column(String(36)) # 創建name屬性
virus_op = Column(Integer)
virus_findby = Column(Integer)
virus_type = Column(Integer)
find_time = Column(BigInteger)
create_time = Column(DateTime, default=datetime.datetime.now) #
def __repr__(self): # 用於進行查找時的數據返回
return '<%s name :%s>' %(self.id, self.nid)
Base.metadata.create_all(engine) # 進行指令的調用,即生成table
Session_class = sessionmaker(bind=engine) # 進行數據庫的連接
Session = Session_class() # 生成session 實例
for i in range(5):
vnid = i
vtype = i
vop = i
vfind = i
vname = 'vname' + str(i)
nfv = NodeFileViruses(
nid=vnid,
virus_name=vname,
virus_op=vop,
virus_findby=vfind,
virus_type=vtype,
find_time=int(time.time())
)
Session.add(nfv)
測試結果:
以下僅僅時記錄了部分結果
其中count=2,是因為在一分鍾內,執行了上面的操作兩次
Batch: 3
-------------------------------------------
+--------+-----+
| key|count|
+--------+-----+
|3,vname3| 2|
|2,vname2| 2|
|1,vname1| 2|
|0,vname0| 2|
|4,vname4| 2|
+--------+-----+
20/10/06 17:52:03 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@577bf8f6 committed.
20/10/06 17:52:03 INFO SparkContext: Starting job: start at proj2.scala:152
20/10/06 17:52:03 INFO DAGScheduler: Job 7 finished: start at proj2.scala:152, took 0.000042 s
20/10/06 17:52:03 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "741f6382-3195-4350-8bdd-84f553dd8d74",
"runId" : "dffe982c-e7b5-43ae-b74d-70e6d4ccebec",
"name" : null,
"timestamp" : "2020-10-06T09:52:00.001Z", //UTC展示的時間,與spark本身有關 代碼中使用的是北京時間,通過spark配置文件可以解決
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.1666638889351844,
"processedRowsPerSecond" : 3.2948929159802307,
"durationMs" : {
"addBatch" : 2945,
"getBatch" : 8,
"getOffset" : 1,
"queryPlanning" : 49,
"triggerExecution" : 3035,
"walCommit" : 30
},
"eventTime" : {
"avg" : "2020-10-06T09:51:41.000Z",
"max" : "2020-10-06T09:51:43.000Z",
"min" : "2020-10-06T09:51:39.000Z",
"watermark" : "2020-10-06T09:40:21.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 15,
"numRowsUpdated" : 5,
"memoryUsedBytes" : 24087
} ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[jmvirus]]",
"startOffset" : {
"jmvirus" : {
"0" : 20575
}
},
"endOffset" : {
"jmvirus" : {
"0" : 20585
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.1666638889351844,
"processedRowsPerSecond" : 3.2948929159802307
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2db704e"
}
}
maxwell轉發的部分數據:
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":0,"data":{"id":21581,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":1,"data":{"id":21582,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":2,"data":{"id":21583,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":3,"data":{"id":21584,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"commit":true,"data":{"id":21585,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":0,"data":{"id":21586,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":1,"data":{"id":21587,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":2,"data":{"id":21588,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":3,"data":{"id":21589,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"commit":true,"data":{"id":21590,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
一個特別注意點:
以上實例中,將id和nid定義成Int類型,因此可以轉換成String,但是假如把id和nid定義成long類型,以上程序就無法運行。
報錯為:
java.lang.Long cannot be cast to java.lang.String.
在java中long和String是可以互相轉換的。但是這里不能轉換。出現了強制類型轉換錯誤的問題。
感覺程序內部做了什么特殊的處理,導致long變成了子類,String變成了父類。子類的指針沒法引用父類的對象。
解決方案也很簡單,將long轉換成String即可。在以上代碼中String類型轉換成long是沒有問題的。