spark 的 structured streaming 状态保存


 

状态保存:

        structured  streaming 提供了两个自定义分组聚合函数:mapGroupsWithState,flatMapGroupsWithState,允许开发者基于事件时间或者处理时间进行有状态的流计算。
 
        简单来说,可以将每一次的流计算的group结果,保存下来,用于下一次继续聚合。
 
 

实现方式:

        mapGroupsWithState与flatMapGroupsWithState方法是KeyValueGroupedDataset类的方法,要想调用该方法,必须先创建KeyValueGroupedDataset,由于该类不能由用户直接生成,须由已经存在的Dataset类型转换成KeyValueGroupedDataset。
        Dataset    =》   KeyValueGroupedDataset实现:
                A Dataset has been logically grouped by a user specified grouping key,Users should not construct a KeyValueGroupedDataset directly, but should instead call groupByKey on an existing Dataset.
                翻译:用户不能自己创建KeyValueGroupedDataset,必须对已经存在的Dataset调用group by  生成。
        常规实现有:
                xxx.groupByKey[String](......)
        
 

超时时间设置:

        超时时间的设置模式:

                    处理时间   :   processtime
                            数据到达spark后,spark给它添加的接收到该数据时的时间
                    事件时间   : eventtime
                            事件事件,是数据上报时,自己记录的发生时间。
                    
 

        处理时间模式:

                如果选择了基于”处理时间“,则超时的duration是通过GroupState.setTimeoutDuration来设定的,但是超时并非是严格地发生在设定的duration的时间点上,而是在当刚刚超过duration的那次trigger发生时,所以说timeout并没有一个严格的“上限”(也就是最晚发生的时间,相对应的“下限“是指最早的发生时间,这个时间是刚好在duration的时间点上)。
                举个例子:如果流上在最近一段时间没有任何数据,则整个负责维护状态的函数根本不会被触发,也就不会执行任何超时检测,直到有新的数据到达,这样有可能会导致在很长时间之后才会触发超时处理,而并不是在规定的那个超时时间点上触发的,这就是所谓的timeout并没有严格的“上限”的意思。
               超时后,并不会立刻处理数据,只有当数据来了,触发了trigger才会开始执行清理操作,也就是没有数据时,就算超时了之前的状态也会保存。
 
                处理时间超时依赖本机时间,时区等。
                

        事件时间模式

                如果选择的是基于“事件时间”,首先要开启基于事件时间框架,即必须在查询中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()设定超时,setTimeoutTimestamp有两类版本,一类是设定一个固定时间戳,另一类是在一个指定的时间戳上再指定一个duration, 前者适用于那些有明确超时时间点的场景,后者适用于那些在某个最新的事件时间上再追加一个duration的场景。
                超时后,并不会立刻处理数据,只有当数据来了,watermark时间更新后与设置做对比,才能判断做什么操作,也就是没有数据时,就算超时了之前的状态也会保存。
                但是有一点是非常重要和清楚的,就是这个每次设定的超时时间戳是不能晚于watermark的!
                
 

        共同点:

                超时后,并不会立刻处理数据,只有当数据来了才会做对应的group state更新,也就是没有数据时,就算超时了之前的状态也会保存。
 
 

        时间设置:

                时间包括两个参数,超时时间点,和可以忍受的时间范围
                超时时间点:可以是未来的一个准确时间(时间戳)-当前时间(时间戳)得到的一个毫秒数
                                        还可以是一个准确的日期。
                                        具体的处理,看选择哪个类型的函数(此处函数重载较多)。
 

GroupState特质:

        原文链接:

        使用GroupState 特质才可以操作状态数据
 

        GroupState特质参数

                1、groupby 后的key
                2、一个迭代器,存储的是每个group by 后的key内容
                3、用户自定义的状态(state object)对象
 

        GroupState特质使用注意:

                1、state的内容不能为null,否则update(用于设置和更新state内容)报错
                2、为了让线程之间数据共享,state的内容非线程安全
                3、使用remove后,exists返回false,get会报错,getOption会返回none
                4、update调用后,exists返回true。get和getOption返回更新的值
 

        GroupStateTimeout

                1、groupstate的超时,可以是基于时间(spark本身的时间),也可以是基于事件时间(数据中记录的时间)。基于时间的超时,一般设置为指定时间内,没有接收数据则删除内存记录,如:state.setTimeoutDuration("1 hour")。基于事件时间的任务超时,依赖watermark时间,如:groupState.setTimeoutTimestamp(timestamp)
                2、GroupStateTimeout的设置项是全局共享的
 

        ProcessingTimeTimeout:

                1、当且仅当在预设置的时间点上,超出了dms,才会触发超时处理。(这里可知道,存在两个参数,超时时间点,以及超时后允许的范围窗口时间)
                2、超时处理是没有一个严格的处理时间,虽然我们设置了超时参数 + 超时窗口参数,定制了处理时间。但是内部的超时触发器,依赖外部的数据输入,只有有数据输入时,超时触发器才会去处理超时数据,但是假如系统最后一个数据输入完后,没有后续的输入数据,则关于这个数据的超时数据会一直存在系统的内存上。
                3、超时处理依赖系统时间,系统时间错了,自然会有错误处理
 

        EventTime Timeout:

                如果选择的是基于“事件时间”,首先要开启基于事件时间框架,即必须在查询中使用Dataset.withWatermark(),然后需要在GroupState中使用GroupState.setTimeoutTimestamp()设定超时,setTimeoutTimestamp有两类版本,一类是设定一个固定时间戳,另一类是在一个指定的时间戳上再指定一个duration, 前者适用于那些有明确超时时间点的场景,后者适用于那些在某个最新的事件时间上再追加一个duration的场景。但是有一点是非常重要和清楚的,就是这个每次设定的超时时间戳是不能晚于watermark的!因为Spark在基于”事件时间“ 判定超时的原则就是:当且仅当watermark超过(晚于)了设定的timeout!某种意义上,我们可以认为watermark是当前“状态实例”(不是GroupState,而是它包裹的那个State对象)认定”存活“的一个”开始“的时间界限,timeout是当前”状态实例“认定”存活“的一个”截止“的时间界限,如果开始的时间比截止的时间还要晚,说明这个状态实例超时了!最后,在基于”事件时间“时,对于实际的超时时间也是没有一个准确的”上限“的,这和基于“事件时间”的超时判定的原因是一样的,因为watermark也是在流中有新数据时才会被触发更新,进而计算超时的时间。
 
 
 
 

状态保存的使用方法:

        在官方文档找:KeyValueGroupedDataset类即可找到
 
        mapGroupsWithState函数:
                def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout): Dataset[U]
                def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U]): Dataset[U]
                def mapGroupsWithState[S, U](timeoutConf: GroupStateTimeout)(func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
                def mapGroupsWithState[S, U](func: (K, Iterator[V], GroupState[S]) ⇒ U)(implicit arg0: Encoder[S], arg1: Encoder[U]): Dataset[U]
                        下面的示例采用该方式执行,将该方法柯西化后执行的
 
        flatMapGroupsWithState函数:
                
 
 
        两种状态保存的区别:
                mapGroupsWithState  要求返回,且必须返回一条记录
                flatMapGroupsWithState:可以返回多条,或者不返回记录
 
 

测试框架:

            写数据库   ---  maxwell --->  kafka   ------>   spark 汇总
 

注意watermark的用法:

            数据在传给spark后,最终按多久汇总一次数据由Trigger.ProcessingTime("1 minutes")决定的。该方法是按照一定时间汇总数据,与另一种持序及时处理的模式稍有不同。
 
另外注意处理时间问题,否则会出现各种各样的问题。
        1、是在创建sparkSession时,先将时区设置好
        2、是在spark中的提示信息中,展示时间是UTC的时间
 

一个代码实例:

 
package com.test
 
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Timestamp
import java.util.Date
import java.text.SimpleDateFormat
 
import org.apache.log4j.Logger
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}
 
case class NodeFIleVirusesDF(
                              id:Int,
                              nid:Int,
                              virus_name:String,
                              virus_op:Int,
                              virus_findby:Int,
                              virus_type:Int,
                              find_time:Timestamp,
                              create_time:String
                          )
case class NodeFIleViruses(
                              id:Int,
                              nid:Int,
                              virus_name:String,
                              virus_op:Int,
                              virus_findby:Int,
                              virus_type:Int,
                              find_time:Long,
                              create_time:String
                          )
case class DBMessage(
                    database:String,
                    table:String,
                    type1:String,
                    ts:Long,
                    xid:Int,
                    xoffset:Int,
                    data: NodeFIleViruses
                    )
 
object proj2 {
    def handleAllInfo(value: String): NodeFIleVirusesDF ={
        val gson = new Gson()
        val dbinfo = gson.fromJson(value, classOf[DBMessage])
        val t = new Timestamp(  dbinfo.data.find_time  * 1000)
        NodeFIleVirusesDF(
            dbinfo.data.id,
            dbinfo.data.nid,
            dbinfo.data.virus_name,
            dbinfo.data.virus_op,
            dbinfo.data.virus_findby,
            dbinfo.data.virus_type,
            t,
            dbinfo.data.create_time
        )
    }
 
    def main(args: Array[String]): Unit = {
        val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
 
        val spark = SparkSession
            .builder
            .config("spark.sql.session.timeZone", "Asia/Shanghai")                                            //写时区,不写时区的话,所有的本地时间 ,必须减8小时才能有效
            .appName("StructuredNetworkWordCount")
            .master(" spark://192.168.10.40:7077")
            .getOrCreate()
 
        import spark.implicits._
 
        val kafkadf = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "192.168.10.40:9092")
            .option("subscribe", "jmvirus")
            .option("startingOffsets", "latest")
            .load()
 
        val infods = kafkadf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
            .as[(String, String)]
        //spark.sparkContext.setLogLevel()  :DataSet[(String, String)]
        log.info("start-----------------------------")
 
        val infodf = infods.map(value => value._2)
            .map(value => handleAllInfo(value))
            .toDF()
 
        val result = infodf
            .withWatermark("find_time", "3 minutes")
            .groupByKey[String]((row: Row) =>{
                val currentet = sdf2.format(new Date(row.getTimestamp(6).getTime))
                currentet + "," + row.getAs[String](1) + "," + row.getAs[String](2)   //nid , virusname
            })
            .mapGroupsWithState[(String, Long), (String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState)=>{
                var count = 0L
                var key = "null"
                val tarr = timeAndWord.split(',')
                if(groupState.hasTimedOut){                                         //根据timeout删除内存上保存的数据
                    groupState.remove()
                } else if( groupState.exists){                                            //由于每个数据在group后都会执行这里的逻辑,我们也可以避开timeout设置,自己配置一个删除内存数据的判断条件
                                                                                                       //当然,自己设置的条件和spark structured本身的缺陷一样,需要后一条数据来触发判断
                    key = tarr(1) + "," + tarr(2)
                    count = groupState.getOption.getOrElse((key, 0L))._2 + iterator.size
                    groupState.update(key, count)
                }else{
                    key = tarr(1) + "," + tarr(2)
                    count = iterator.size
                    groupState.update(key, count)
                    val arr = timeAndWord.split((","))
                    val timestamp = sdf2.parse(arr(0)).getTime
                    groupState.setTimeoutTimestamp(timestamp)            //无论如何,用户这里设置的超时设置 + duration时间,必须小于此时的时间+watermark的允许漂移时间,才有意义
                }
 
                if(count != 0){
                    if (key != "null"){
                        (key, count)
                    }
                    else{
                        (timeAndWord, count)
                    }
                }else{
                    null
                }
            }).filter( _ != null).toDF("key", "count")
 
        //直接输出到屏幕,但是显示不全,使用文件保存查阅数据
        val query = result.writeStream
            //.outputMode("append")   //select
            .outputMode("update")   // group by
            //.trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.ProcessingTime("1 minutes"))
            .format("console")
            .start()
 
        query.explain()
 
        query.awaitTermination()
    }
}
 
 
数据上报测试脚本:
# _*_coding:utf-8_*_
 
import time
import random
import datetime
 
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Date, BigInteger, TIMESTAMP,DateTime
from sqlalchemy.orm import sessionmaker
 
# 创建连接
engine = create_engine('mysql+ pymysql://root:lsl@192.168.10.40/testdata',
                       encoding='utf-8')
Base = declarative_base()  # 生成orm基类
 
class NodeFileViruses(Base):
    __tablename__ = 'node_file_viruses'   # table的名字
    id = Column(BigInteger, primary_key=True)  # 创建id属性
    nid = Column(Integer)
    virus_name = Column(String(36))  # 创建name属性
    virus_op = Column(Integer)
    virus_findby = Column(Integer)
    virus_type = Column(Integer)
    find_time = Column(BigInteger)
    create_time = Column(DateTime, default=datetime.datetime.now) #
 
    def __repr__(self):  # 用于进行查找时的数据返回
        return '<%s name :%s>' %(self.id, self.nid)
Base.metadata.create_all(engine) # 进行指令的调用,即生成table
Session_class = sessionmaker(bind=engine)  # 进行数据库的连接
Session = Session_class() # 生成session 实例
 
for i in range(5):
    vnid = i
    vtype = i
    vop = i
    vfind = i
    vname = 'vname' + str(i)
 
    nfv = NodeFileViruses(
        nid=vnid,
        virus_name=vname,
        virus_op=vop,
        virus_findby=vfind,
        virus_type=vtype,
        find_time=int(time.time())
 
    )
    Session.add(nfv)
 
 
 
测试结果:
        以下仅仅时记录了部分结果
        其中count=2,是因为在一分钟内,执行了上面的操作两次
Batch: 3
-------------------------------------------
+--------+-----+
|     key|count|
+--------+-----+
|3,vname3|    2|
|2,vname2|    2|
|1,vname1|    2|
|0,vname0|    2|
|4,vname4|    2|
+--------+-----+
 
20/10/06 17:52:03 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@577bf8f6 committed.
20/10/06 17:52:03 INFO SparkContext: Starting job: start at proj2.scala:152
20/10/06 17:52:03 INFO DAGScheduler: Job 7 finished: start at proj2.scala:152, took 0.000042 s
20/10/06 17:52:03 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "741f6382-3195-4350-8bdd-84f553dd8d74",
  "runId" : "dffe982c-e7b5-43ae-b74d-70e6d4ccebec",
  "name" : null,
  "timestamp" : "2020-10-06T09:52:00.001Z",                     //UTC展示的时间,与spark本身有关   代码中使用的是北京时间,通过spark配置文件可以解决
  "batchId" : 3,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.1666638889351844,
  "processedRowsPerSecond" : 3.2948929159802307,
  "durationMs" : {
    "addBatch" : 2945,
    "getBatch" : 8,
    "getOffset" : 1,
    "queryPlanning" : 49,
    "triggerExecution" : 3035,
    "walCommit" : 30
  },
  "eventTime" : {
    "avg" : "2020-10-06T09:51:41.000Z",
    "max" : "2020-10-06T09:51:43.000Z",
    "min" : "2020-10-06T09:51:39.000Z",
    "watermark" : "2020-10-06T09:40:21.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 15,
    "numRowsUpdated" : 5,
    "memoryUsedBytes" : 24087
  } ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[jmvirus]]",
    "startOffset" : {
      "jmvirus" : {
        "0" : 20575
      }
    },
    "endOffset" : {
      "jmvirus" : {
        "0" : 20585
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 0.1666638889351844,
    "processedRowsPerSecond" : 3.2948929159802307
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2db704e"
  }
}
 
 
maxwell转发的部分数据:
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":0,"data":{"id":21581,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":1,"data":{"id":21582,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":2,"data":{"id":21583,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"xoffset":3,"data":{"id":21584,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977899,"xid":57416,"commit":true,"data":{"id":21585,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977899,"create_time":"2020-10-06 17:51:40"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":0,"data":{"id":21586,"nid":0,"virus_name":"vname0","virus_op":0,"virus_findby":0,"virus_type":0,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":1,"data":{"id":21587,"nid":1,"virus_name":"vname1","virus_op":1,"virus_findby":1,"virus_type":1,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":2,"data":{"id":21588,"nid":2,"virus_name":"vname2","virus_op":2,"virus_findby":2,"virus_type":2,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"xoffset":3,"data":{"id":21589,"nid":3,"virus_name":"vname3","virus_op":3,"virus_findby":3,"virus_type":3,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
{"database":"testdata","table":"node_file_viruses","type":"insert","ts":1601977903,"xid":57449,"commit":true,"data":{"id":21590,"nid":4,"virus_name":"vname4","virus_op":4,"virus_findby":4,"virus_type":4,"find_time":1601977903,"create_time":"2020-10-06 17:51:44"}}
 
 
 
 
 
一个特别注意点:
            以上实例中,将id和nid定义成Int类型,因此可以转换成String,但是假如把id和nid定义成long类型,以上程序就无法运行。
            报错为:
                    java.lang.Long  cannot be cast to java.lang.String.
                    在java中long和String是可以互相转换的。但是这里不能转换。出现了强制类型转换错误的问题。
                    感觉程序内部做了什么特殊的处理,导致long变成了子类,String变成了父类。子类的指针没法引用父类的对象。
 
            解决方案也很简单,将long转换成String即可。在以上代码中String类型转换成long是没有问题的。
 
 
 
 
 
 
 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM