状态保存:
structured streaming 提供了两个自定义分组聚合函数:mapGroupsWithState,flatMapGroupsWithState,允许开发者基于事件时间或者处理时间进行有状态的流计算。
简单来说,可以将每一次的流计算的group结果,保存下来,用于下一次继续聚合。
实现方式:
mapGroupsWithState与flatMapGroupsWithState方法是KeyValueGroupedDataset类的方法,要想调用该方法,必须先创建KeyValueGroupedDataset,由于该类不能由用户直接生成,须由已经存在的Dataset类型转换成KeyValueGroupedDataset。
Dataset =》 KeyValueGroupedDataset实现:
A Dataset has been logically grouped by a user specified grouping key,Users should not construct a KeyValueGroupedDataset directly, but should instead call groupByKey on an existing Dataset.
翻译:用户不能自己创建KeyValueGroupedDataset,必须对已经存在的Dataset调用group by 生成。
常规实现有:
xxx.groupByKey[String](......)
超时时间设置:
超时时间的设置模式:
处理时间 : processtime
数据到达spark后,spark给它添加的接收到该数据时的时间
事件时间 : eventtime
事件事件,是数据上报时,自己记录的发生时间。
处理时间模式:
如果选择了基于”处理时间“,则超时的duration是通过GroupState.setTimeoutDuration来设定的,但是超时并非是严格地发生在设定的duration的时间点上,而是在当刚刚超过duration的那次trigger发生时,所以说timeout并没有一个严格的“上限”(也就是最晚发生的时间,相对应的“下限“是指最早的发生时间,这个时间是刚好在duration的时间点上)。
举个例子:如果流上在最近一段时间没有任何数据,则整个负责维护状态的函数根本不会被触发,也就不会执行任何超时检测,直到有新的数据到达,这样有可能会导致在很长时间之后才会触发超时处理,而并不是在规定的那个超时时间点上触发的,这就是所谓的timeout并没有严格的“上限”的意思。
超时后,并不会立刻处理数据,只有当数据来了,触发了trigger才会开始执行清理操作,也就是没有数据时,就算超时了之前的状态也会保存。
处理时间超时依赖本机时间,时区等。
事件时间模式
如果选择的是基于“事件时间”,首先要开启基于事件时间框架,即必须在查询中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()设定超时,setTimeoutTimestamp有两类版本,一类是设定一个固定时间戳,另一类是在一个指定的时间戳上再指定一个duration, 前者适用于那些有明确超时时间点的场景,后者适用于那些在某个最新的事件时间上再追加一个duration的场景。
超时后,并不会立刻处理数据,只有当数据来了,watermark时间更新后与设置做对比,才能判断做什么操作,也就是没有数据时,就算超时了之前的状态也会保存。
但是有一点是非常重要和清楚的,就是这个每次设定的超时时间戳是不能晚于watermark的!
共同点:
超时后,并不会立刻处理数据,只有当数据来了才会做对应的group state更新,也就是没有数据时,就算超时了之前的状态也会保存。
时间设置:
时间包括两个参数,超时时间点,和可以忍受的时间范围
超时时间点:可以是未来的一个准确时间(时间戳)-当前时间(时间戳)得到的一个毫秒数
还可以是一个准确的日期。
具体的处理,看选择哪个类型的函数(此处函数重载较多)。
GroupState特质:
原文链接:
使用GroupState 特质才可以操作状态数据
GroupState特质参数
1、groupby 后的key
2、一个迭代器,存储的是每个group by 后的key内容
3、用户自定义的状态(state object)对象
GroupState特质使用注意:
1、state的内容不能为null,否则update(用于设置和更新state内容)报错
2、为了让线程之间数据共享,state的内容非线程安全
3、使用remove后,exists返回false,get会报错,getOption会返回none
4、update调用后,exists返回true。get和getOption返回更新的值
GroupStateTimeout
1、groupstate的超时,可以是基于时间(spark本身的时间),也可以是基于事件时间(数据中记录的时间)。基于时间的超时,一般设置为指定时间内,没有接收数据则删除内存记录,如:state.setTimeoutDuration("1 hour")。基于事件时间的任务超时,依赖watermark时间,如:groupState.setTimeoutTimestamp(timestamp)
2、GroupStateTimeout的设置项是全局共享的
ProcessingTimeTimeout:
1、当且仅当在预设置的时间点上,超出了dms,才会触发超时处理。(这里可知道,存在两个参数,超时时间点,以及超时后允许的范围窗口时间)
2、超时处理是没有一个严格的处理时间,虽然我们设置了超时参数 + 超时窗口参数,定制了处理时间。但是内部的超时触发器,依赖外部的数据输入,只有有数据输入时,超时触发器才会去处理超时数据,但是假如系统最后一个数据输入完后,没有后续的输入数据,则关于这个数据的超时数据会一直存在系统的内存上。
3、超时处理依赖系统时间,系统时间错了,自然会有错误处理
EventTime Timeout:
如果选择的是基于“事件时间”,首先要开启基于事件时间框架,即必须在查询中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()设定超时,setTimeoutTimestamp有两类版本,一类是设定一个固定时间戳,另一类是在一个指定的时间戳上再指定一个duration, 前者适用于那些有明确超时时间点的场景,后者适用于那些在某个最新的事件时间上再追加一个duration的场景。但是有一点是非常重要和清楚的,就是这个每次设定的超时时间戳是不能晚于watermark的!因为Spark在基于”事件时间“ 判定超时的原则就是:当且仅当watermark超过(晚于)了设定的timeout!某种意义上,我们可以认为watermark是当前“状态实例”(不是GroupState,而是它包裹的那个State对象)认定”存活“的一个”开始“的时间界限,timeout是当前”状态实例“认定”存活“的一个”截止“的时间界限,如果开始的时间比截止的时间还要晚,说明这个状态实例超时了!最后,在基于”事件时间“时,对于实际的超时时间也是没有一个准确的”上限“的,这和基于“事件时间”的超时判定的原因是一样的,因为watermark也是在流中有新数据时才会被触发更新,进而计算超时的时间。
状态保存的使用方法:
在官方文档找:KeyValueGroupedDataset类即可找到
mapGroupsWithState函数:
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout): Dataset[U]
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U]): Dataset[U]
def mapGroupsWithState[S, U](timeoutConf: GroupStateTimeout)(func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
def mapGroupsWithState[S, U](func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
下面的示例采用该方式执行,将该方法柯西化后执行的
flatMapGroupsWithState函数:
两种状态保存的区别:
mapGroupsWithState 要求返回,且必须返回一条记录
flatMapGroupsWithState:可以返回多条,或者不返回记录
测试框架:
写数据库 --- maxwell ---> kafka ------> spark 汇总
注意watermark的用法:
数据在传给spark后,最终按多久汇总一次数据由Trigger.ProcessingTime("1 minutes")决定的。该方法是按照一定时间汇总数据,与另一种持序及时处理的模式稍有不同。
另外注意处理时间问题,否则会出现各种各样的问题。
1、是在创建sparkSession时,先将时区设置好
2、是在spark中的提示信息中,展示时间是UTC的时间
一个代码实例:
package com.test
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Timestamp
import java.util.Date
import java.text.SimpleDateFormat
import org.apache.log4j.Logger
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}
case class NodeFIleVirusesDF(
id:Int,
nid:Int,
virus_name:String,
virus_op:Int,
virus_findby:Int,
virus_type:Int,
find_time:Timestamp,
create_time:String
)
case class NodeFIleViruses(
id:Int,
nid:Int,
virus_name:String,
virus_op:Int,
virus_findby:Int,
virus_type:Int,
find_time:Long,
create_time:String
)
case class DBMessage(
database:String,
table:String,
type1:String,
ts:Long,
xid:Int,
xoffset:Int,
data: NodeFIleViruses
)
object proj2 {
def handleAllInfo(value: String): NodeFIleVirusesDF ={
val gson = new Gson()
val dbinfo = gson.fromJson(value, classOf[DBMessage])
val t = new Timestamp( dbinfo.data.find_time * 1000)
NodeFIleVirusesDF(
dbinfo.data.id,
dbinfo.data.nid,
dbinfo.data.virus_name,
dbinfo.data.virus_op,
dbinfo.data.virus_findby,
dbinfo.data.virus_type,
t,
dbinfo.data.create_time
)
}
def main(args: Array[String]): Unit = {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val spark = SparkSession
.builder
.config("spark.sql.session.timeZone", "Asia/Shanghai") //写时区,不写时区的话,所有的本地时间 ,必须减8小时才能有效
.appName("StructuredNetworkWordCount")
.master("
spark://192.168.10.40:7077")
.getOrCreate()
import spark.implicits._
val kafkadf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.10.40:9092")
.option("subscribe", "jmvirus")
.option("startingOffsets", "latest")
.load()
val infods = kafkadf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
//spark.sparkContext.setLogLevel() :DataSet[(String, String)]
log.info("start-----------------------------")
val infodf = infods.map(value => value._2)
.map(value => handleAllInfo(value))
.toDF()
val result = infodf
.withWatermark("find_time", "3 minutes")
.groupByKey[String]((row: Row) =>{
val currentet = sdf2.format(new Date(row.getTimestamp(6).getTime))
currentet + "," + row.getAs[String](1) + "," + row.getAs[String](2) //nid , virusname
})
.mapGroupsWithState[(String, Long), (String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState)=>{
var count = 0L
var key = "null"
val tarr = timeAndWord.split(',')
if(groupState.hasTimedOut){ //根据timeout删除内存上保存的数据
groupState.remove()
} else if( groupState.exists){ //由于每个数据在group后都会执行这里的逻辑,我们也可以避开timeout设置,自己配置一个删除内存数据的判断条件
//当然,自己设置的条件和spark structured本身的缺陷一样,需要后一条数据来触发判断
key = tarr(1) + "," + tarr(2)
count = groupState.getOption.getOrElse((key, 0L))._2 + iterator.size
groupState.update(key, count)
}else{
key = tarr(1) + "," + tarr(2)
count = iterator.size
groupState.update(key, count)
val arr = timeAndWord.split((","))
val timestamp = sdf2.parse(arr(0)).getTime
groupState.setTimeoutTimestamp(timestamp) //无论如何,用户这里设置的超时设置 + duration时间,必须小于此时的时间+watermark的允许漂移时间,才有意义
}
if(count != 0){
if (key != "null"){
(key, count)
}
else{
(timeAndWord, count)
}
}else{
null
}
}).filter( _ != null).toDF("key", "count")
//直接输出到屏幕,但是显示不全,使用文件保存查阅数据
val query = result.writeStream
//.outputMode("append") //select
.outputMode("update") // group by
//.trigger(Trigger.ProcessingTime(0))
.trigger(Trigger.ProcessingTime("1 minutes"))
.format("console")
.start()
query.explain()
query.awaitTermination()
}
}
数据上报测试脚本:
# _*_coding:utf-8_*_
import time
import random
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Date, BigInteger, TIMESTAMP,DateTime
from sqlalchemy.orm import sessionmaker
# 创建连接
engine = create_engine('mysql+
pymysql://root:lsl@192.168.10.40/testdata',
encoding='utf-8')
Base = declarative_base() # 生成orm基类
class NodeFileViruses(Base):
__tablename__ = 'node_file_viruses' # table的名字
id = Column(BigInteger, primary_key=True) # 创建id属性
nid = Column(Integer)
virus_name = Column(String(36)) # 创建name属性
virus_op = Column(Integer)
virus_findby = Column(Integer)
virus_type = Column(Integer)
find_time = Column(BigInteger)
create_time = Column(DateTime, default=datetime.datetime.now) #
def __repr__(self): # 用于进行查找时的数据返回
return '<%s name :%s>' %(self.id, self.nid)
Base.metadata.create_all(engine) # 进行指令的调用,即生成table
Session_class = sessionmaker(bind=engine) # 进行数据库的连接
Session = Session_class() # 生成session 实例
for i in range(5):
vnid = i
vtype = i
vop = i
vfind = i
vname = 'vname' + str(i)
nfv = NodeFileViruses(
nid=vnid,
virus_name=vname,
virus_op=vop,
virus_findby=vfind,
virus_type=vtype,
find_time=int(time.time())
)
Session.add(nfv)
测试结果:
以下仅仅时记录了部分结果
其中count=2,是因为在一分钟内,执行了上面的操作两次
Batch: 3
-------------------------------------------
+--------+-----+
| key|count|
+--------+-----+
|3,vname3| 2|
|2,vname2| 2|
|1,vname1| 2|
|0,vname0| 2|
|4,vname4| 2|
+--------+-----+
20/10/06 17:52:03 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@577bf8f6 committed.
20/10/06 17:52:03 INFO SparkContext: Starting job: start at proj2.scala:152
20/10/06 17:52:03 INFO DAGScheduler: Job 7 finished: start at proj2.scala:152, took 0.000042 s
20/10/06 17:52:03 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "741f6382-3195-4350-8bdd-84f553dd8d74",
"runId" : "dffe982c-e7b5-43ae-b74d-70e6d4ccebec",
"name" : null,
"timestamp" : "2020-10-06T09:52:00.001Z", //UTC展示的时间,与spark本身有关 代码中使用的是北京时间,通过spark配置文件可以解决
"batchId" : 3,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.1666638889351844,
"processedRowsPerSecond" : 3.2948929159802307,
"durationMs" : {
"addBatch" : 2945,
"getBatch" : 8,
"getOffset" : 1,
"queryPlanning" : 49,
"triggerExecution" : 3035,
"walCommit" : 30
},
"eventTime" : {
"avg" : "2020-10-06T09:51:41.000Z",
"max" : "2020-10-06T09:51:43.000Z",
"min" : "2020-10-06T09:51:39.000Z",
"watermark" : "2020-10-06T09:40:21.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 15,
"numRowsUpdated" : 5,
"memoryUsedBytes" : 24087
} ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[jmvirus]]",
"startOffset" : {
"jmvirus" : {
"0" : 20575
}
},
"endOffset" : {
"jmvirus" : {
"0" : 20585
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.1666638889351844,
"processedRowsPerSecond" : 3.2948929159802307
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2db704e"
}
}
maxwell转发的部分数据:
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":0,"data":{"id":21581,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":1,"data":{"id":21582,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":2,"data":{"id":21583,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":3,"data":{"id":21584,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"commit":true,"data":{"id":21585,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":0,"data":{"id":21586,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":1,"data":{"id":21587,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":2,"data":{"id":21588,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":3,"data":{"id":21589,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"commit":true,"data":{"id":21590,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
一个特别注意点:
以上实例中,将id和nid定义成Int类型,因此可以转换成String,但是假如把id和nid定义成long类型,以上程序就无法运行。
报错为:
java.lang.Long cannot be cast to java.lang.String.
在java中long和String是可以互相转换的。但是这里不能转换。出现了强制类型转换错误的问题。
感觉程序内部做了什么特殊的处理,导致long变成了子类,String变成了父类。子类的指针没法引用父类的对象。
解决方案也很简单,将long转换成String即可。在以上代码中String类型转换成long是没有问题的。