spark-Streaming窗口化和kafak-streaming 消費策略


Spark Streaming
----------------
    流計算,不間斷。
    Spark Streaming模塊,
    實現方式是批量計算,按照時間片對stream切割形成靜態數據。
    //創建上下文時,指定時間片。
    val ssc = new StreamingContext(conf, Seconds(1))
    //已經限定了時間片
    ssc.socketTextStream();
    ...

socket文本流運行在executor端,不在driver端。


SockeTextStream執行過程
-------------------------------
    driver端創建StreamingContext對象,啟動上下文時,依次創建
    JobScheduler和ReceiverTracker,並調用他們的start方法。

    ReceiverTracker在start方法中發送啟動接收器消息給遠程Executor,
    消息內部含有ServerSocket的地址信息,在executor一側,由Receiver
    TrackerEndpoint終端接受消息,抽取消息內容,利用sparkContext結合
    消息內容創建ReceiverRDD對象,最后提交rdd給spark集群.



流計算的窗口化處理
------------------------
    在批次的基礎上擴展應用,
    窗口長度和滑動間隔(計算頻率)這個指標都需要是batch的整倍數。

    reduceByKeyAndWindow((a:Int,b:Int)=> {a + b}, Seconds(5) , Seconds(3))
    windows().reduceByKey(...);

DStream的分區
------------------------
    DStream的分區是對內部每個RDD的分區。
    dstream.repartition(num){
        //
        this.transform(_.repartition(numPartitions))
    }

updateStateByKey()
-----------------------
    計算自流應用啟動以來,每個單詞的數量。
    更新可以結合windows出計算。
    val ds3 = ds2.window(Seconds(5),Seconds(3))
        def update(a:Seq[Int] , state:Option[ArrayBuffer[(Long, Int)]]) : Option[ArrayBuffer[(Long, Int)]] = {
            //println("a + " + a)
            val count = a.sum
            val time = System.currentTimeMillis()

            if(state.isEmpty){
                val buf:ArrayBuffer[(Long,Int)] = ArrayBuffer[(Long, Int)]()
                buf.append((time ,count))
                Some(buf)
            }
            else{
                val buf2: ArrayBuffer[(Long, Int)] = ArrayBuffer[(Long, Int)]()
                var buf = state.get
                for(t <- buf){
                    if((time - t._1) <= 4000){
                        buf2.+=(t)
                    }
                }
                buf2.append((time, count))
                Some(buf2)
            }
        }

        val ds3 = ds2.window(Seconds(5),Seconds(3))
        val ds4 = ds3.updateStateByKey(update _)

SparkStreaming計算中分區的計算方式
-----------------------------------
    DStream分區是RDD的分區,分區由conf.set("spark.streaming.blockInterval" ,"200ms"),
    就是哪找指定時間片切割數據成小塊,對應一個分區。

DStream.foreachRDD
----------------------
    針對流中的每個RDD進行操作。
    import java.sql.DriverManager

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
      * Created by Administrator on 2018/3/8.
      */
    object SparkStreamingForeachRDDScala {

        def createNewConnection() = {
            Class.forName("com.mysql.jdbc.Driver")
            val conn = DriverManager.getConnection("jdbc:mysql://192.168.231.1:3306/big9","root","root")
            conn
        }

        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("worldCount")
            conf.setMaster("local[4]")

            //時間片是2秒
            val ssc = new StreamingContext(conf ,Seconds(2))

            ssc.checkpoint("file:///d:/java/chk")

            //創建套接字文本流
            val ds1 = ssc.socketTextStream("s101", 8888)
            val ds2 = ds1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
            ds2.foreachRDD(rdd=>{

                rdd.foreachPartition(it=>{
                    val conn = createNewConnection()
                    // executed at the driver
                    val ppst = conn.prepareStatement("insert into wc(word,cnt) values(?,?)")
                    conn.setAutoCommit(false)
                    for(e <- it){
                        ppst.setString(1 , e._1)
                        ppst.setInt(2,e._2)
                        ppst.executeUpdate()
                    }
                    conn.commit()
                    conn.close()
                    ppst.close()
                })
            })
            //啟動流
            ssc.start()
            ssc.awaitTermination()
        }
    }

Spark Stream + Spark SQL組合使用
--------------------------------
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
      * Created by Administrator on 2018/3/8.
      */
    object SparkStreamingWordCountSparkSQLScala {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("worldCount")
            conf.setMaster("local[2]")

            //時間片是2秒
            val ssc = new StreamingContext(conf ,Seconds(2))

            ssc.checkpoint("file:///d:/java/chk")

            //創建套接字文本流
            val lines = ssc.socketTextStream("s101", 8888)

            //壓扁生成單詞流
            val words = lines.flatMap(_.split(" "))

            words.foreachRDD(rdd=>{
                val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
                import spark.implicits._
                val df1= rdd.toDF("word")
                df1.createOrReplaceTempView("_temp")
                spark.sql("select word,count(*) from _temp group by word").show()
            })
            //啟動流
            ssc.start()

            ssc.awaitTermination()
        }
    }




Kafka
--------------------
    消息系統.
    Scala
    針對分區,n - 1

    

Spark Streaming集成kafka
-------------------------
    1.注意
        spark-streaming-kafka-0-10_2.11不兼容之前的版本,
        spark-streaming-kafka-0-8_2.11兼容0.9和0.10.

    2.啟動kafka集群並創建主題.
        xkafka.sh start
    
    3.驗證kafka是否ok
        3.1)啟動消費者
            kafka-console-consumer.sh --zookeeper s102:2181 --topic t1

        3.2)啟動生產者
            kafka-console-producer.sh --broker-list s102:9092 --topic t1
            
        3.3)發送消息
            ...

    4.引入maven依賴
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

    5.編程
        import org.apache.kafka.clients.consumer.ConsumerRecord
        import org.apache.kafka.common.serialization.StringDeserializer
        import org.apache.spark.SparkConf
        import org.apache.spark.streaming.{Seconds, StreamingContext}
        import org.apache.spark.streaming.kafka010._
        import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
        import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

        /**
          * Created by Administrator on 2018/3/8.
          */
        object SparkStreamingKafkaScala {

            def main(args: Array[String]): Unit = {
                val conf = new SparkConf()
                conf.setAppName("kafka")
                conf.setMaster("local[*]")

                val ssc = new StreamingContext(conf , Seconds(2))

                //kafka參數
                val kafkaParams = Map[String, Object](
                    "bootstrap.servers" -> "s102:9092,s103:9092",
                    "key.deserializer" -> classOf[StringDeserializer],
                    "value.deserializer" -> classOf[StringDeserializer],
                    "group.id" -> "g1",
                    "auto.offset.reset" -> "latest",
                    "enable.auto.commit" -> (false: java.lang.Boolean)
                )

                val topics = Array("topic1")   //val topics = Array("topicl")
                val stream = KafkaUtils.createDirectStream[String, String](      //val stream = KafkaUtils.createDirectStream[String,String]
                    ssc,                                                            //ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams)
                    PreferConsistent,             //位置策略
                    Subscribe[String, String](topics, kafkaParams) //消費者策略
                )

                val ds2 = stream.map(record => (record.key, record.value))   //val ds2 = stream.map(record=>(record.key,record.value))
                ds2.print()         //ds2.print()    ssc.start()  ssc.awaitTermination()

                ssc.start()

                ssc.awaitTermination()
            }
        }
    
    6.在控制台生產者發送消息

SparkKafka直接流(createDirectStream)和kafka分區
--------------------------------
    每個kafka主題分區對應一個RDD分區。
    spark可以通過spark.streaming.kafka.maxRatePerPartition
    配置,對每個分區每秒接受的消息樹進行控制。


LocationStrategies
----------------
    位置策略,
    控制特定的主題分區在哪個執行器上消費的。
    在executor針對主題分區如何對消費者進行調度。
    位置的選擇是相對的,位置策略有三種方案:
    1.PreferBrokers
        首選kafka服務器,只有在kafka服務器和executor位於同一主機,可以使用該中策略。

    2.PreferConsistent
        首選一致性.
        多數時候采用該方式,在所有可用的執行器上均勻分配kakfa的主題的所有分區。
        綜合利用集群的計算資源。

    3.PreferFixed
        首選固定模式。
        如果負載不均衡,可以使用該中策略放置在特定節點使用指定的主題分區。手動控制方案。
        沒有顯式指定的分區仍然采用(2)方案。


ConsumerStrategy
-------------------

ConsumerStrategies
--------------------
    消費者策略,是控制如何創建和配制消費者對象。
    或者對kafka上的消息進行如何消費界定,比如t1主題的分區0和1,
    或者消費特定分區上的特定消息段。
    該類可擴展,自行實現。
    1.ConsumerStrategies.Assign
        指定固定的分區集合,指定了特別詳細的方范圍。
        def Assign[K, V](
              topicPartitions: Iterable[TopicPartition],
              kafkaParams: collection.Map[String, Object],
              offsets: collection.Map[TopicPartition, Long])

    2.ConsumerStrategies.Subscribe
        允許消費訂閱固定的主題集合。

    3.ConsumerStrategies.SubscribePattern 
        使用正則表達式指定感興趣的主題集合。



消費者策略和語義模型
-----------------------------
    import java.net.Socket

    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.collection.mutable.ArrayBuffer
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    /**
      * Created by Administrator on 2018/3/8.
      */
    object SparkStreamingKafkaScala {

        def sendInfo(msg: String, objStr: String) = {
            //獲取ip
            val ip = java.net.InetAddress.getLocalHost.getHostAddress
            //得到pid
            val rr = java.lang.management.ManagementFactory.getRuntimeMXBean();
            val pid = rr.getName().split("@")(0);
            //pid
            //線程
            val tname = Thread.currentThread().getName
            //對象id
            val sock = new java.net.Socket("s101", 8888)
            val out = sock.getOutputStream
            val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + msg + "\t:" + objStr + "\r\n"
            out.write(m.getBytes)
            out.flush()
            out.close()
        }

        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("kafka")
    //        conf.setMaster("spark://s101:7077")
            conf.setMaster("local[8]")

            val ssc = new StreamingContext(conf, Seconds(5))

            //kafka參數
            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> "s102:9092,s103:9092",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "group.id" -> "g1",
                "auto.offset.reset" -> "latest",
                "enable.auto.commit" -> (false: java.lang.Boolean)
            )


            val map = scala.collection.mutable.Map[TopicPartition,String]()
            map.put(new TopicPartition("t1" , 0) , "s102")
            map.put(new TopicPartition("t1" , 1) , "s102")
            map.put(new TopicPartition("t1" , 2) , "s102")
            map.put(new TopicPartition("t1" , 3) , "s102")
            val locStra = LocationStrategies.PreferFixed(map) ;

            val consit = LocationStrategies.PreferConsistent

            val topics = Array("t1")

            //主題分區集合
            val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]()
            tps.+=(new TopicPartition("t1" , 0))
    //        tps.+=(new TopicPartition("t2" , 1))
    //        tps.+=(new TopicPartition("t3" , 2))

            //偏移量集合
            val offsets = scala.collection.mutable.Map[TopicPartition,Long]()
            offsets.put(new TopicPartition("t1", 0), 3)
    //        offsets.put(new TopicPartition("t2", 1), 3)
    //        offsets.put(new TopicPartition("t3", 2), 0)

            val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets)



            //創建kakfa直向流
            val stream = KafkaUtils.createDirectStream[String,String](
                ssc,
                locStra,
                ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets)
            )

            val ds2 = stream.map(record => {
                val t = Thread.currentThread().getName
                val key = record.key()
                val value = record.value()
                val offset = record.offset()
                val par = record.partition()
                val topic = record.topic()
                val tt = ("k:"+key , "v:" + value , "o:" + offset, "p:" + par,"t:" + topic ,"T : " + t)
                //xxxx(tt) ;
                //sendInfo(tt.toString() ,this.toString)
                tt
            })

            ds2.print()

            ssc.start()

            ssc.awaitTermination()
        }
    }

kafka消費語義

//tt.offset

-------------------
    1.at most once
        最多消費一次
        commit(offset)    //wrong
        xxx(tt)            //ok


    2.at least once
        最少一次
        xxx(tt)            //ok
        commit(offset)    //wrong

    3.extact once
        精准消費一次。
        mysql
        Assign:                        --->

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM