SparkStreaming消費Kafka,手動維護Offset到Mysql


說明

當前處理只實現手動維護offset到mysql,只能保證數據不丟失,可能會重復

要想實現精准一次性,還需要將數據提交和offset提交維護在一個事務中

官網說明

Your own data store
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.

您自己的數據存儲
對於支持事務的數據存儲,即使在失敗情況下,將偏移與結果保存在同一事務中也可以使兩者保持同步。 如果您在檢測重復或跳過的偏移量范圍時很謹慎,則回滾事務可防止重復或丟失的消息影響結果。 這相當於一次語義。 即使是由於聚合而產生的輸出(通常很難使等冪),也可以使用此策略。

整體邏輯

offset建表語句

CREATE TABLE `offset_manager` (
  `groupid` varchar(50) DEFAULT NULL,
  `topic` varchar(50) DEFAULT NULL,
  `partition` int(11) DEFAULT NULL,
  `untiloffset` mediumtext,
  UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

代碼實現

在線教育:知識點實時統計

import java.sql.{Connection, ResultSet}
import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import scala.collection.mutable

/**
 * @description: 知識點掌握實時統計
 * @author: HaoWu
 * @create: 2020年10月13日
 */
object QzPointStreaming_V2 {
  val groupid = "test1"

  def main(args: Array[String]): Unit = {
    /**
     * 初始化ssc
     */
    val conf: SparkConf = new SparkConf()
      .setAppName("test1")
      .setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "100")
      .set("spark.streaming.backpressure.enabled", "true")
    val ssc = new StreamingContext(conf, Seconds(3))


    /**
     * 讀取mysql歷史的offset
     */
    val sqlProxy = new SqlProxy()
    val client: Connection = DataSourceUtil.getConnection
    val offsetMap = new mutable.HashMap[TopicPartition, Long]
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }

    /**
     * 消費kafka主題,獲取數據流
     */
    val topics = Array("qz_log")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      //手動維護offset,要設置為false
      "enable.auto.commit" -> (false: Boolean)
    )
    val inStream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      //第一次啟動程序消費
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      //程序掛了,恢復程序
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }

    //*************************************************處理邏輯 開始**********************************************//

    /**
     * 邏輯處理的套路:統計當前批 + DB中歷史的數據  => 更新DB中的表數據
     */
    inStream
      .filter(
        record => record.value().split("\t") == 6
      )
    


    

    //*************************************************處理邏輯 結束**********************************************//


    /**
     * 邏輯處理完后,更新 mysql中維護的offset
     */
    inStream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
        /*for (i <- 0 until 100000) {
          val model = new LearnModel(1, 1, 1, 1, 1, 1, "", 2, 1l, 1l, 1, 1)
          map.put(UUID.randomUUID().toString, model)
        }*/
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })


    //啟動
    ssc.start()
    //阻塞
    ssc.awaitTermination()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM