Kafka+Spark Streaming保證exactly once語義


在Kafka、Flink、Spark Streaming等分布式流處理系統中(Kafka本質上市流處理系統,不單是MQ),存在三種消息傳遞語義(Message Delivery Semantics):

  • At Least Once

每條消息會被收到1次或多次。例如發送方S在超時時間內沒有收到接收方R的通知,或者收到了R的報錯,就會不斷重發消息直至R傳回ACK

  • At Most Once

每條消息會被收到0次或1次。S只負責向R發送消息,R也沒有任何通知機制。無論R最終是否收到,S都不會重發

  • Exactly Once

是上面兩個的綜合,保證S發送的每一條消息,R都會“不重不漏”地恰好一次收到

一個Spark Streaming程序由三步組成:輸入、處理邏輯、輸出

要達到Exactly Once的理想狀態,需要三步協同進行,而不是只與處理邏輯有關

Kafka與Spark Streaming集成時有兩種方法:

  • 基於receiver
  • 基於direct

1、基於receiver

  • 基於receiver的采用kafka高級消費者API
  • 每個executor進程都會不斷拉取消息,並同時保存在executor內存與HDFS上的預寫日志(Write-Ahead log,WAL)
  • 當消息寫入WAL后,自動更新ZK中的offset

它可以保證At Least Once語義,但無法保證Exactly Once語義。雖然引入了WAL來保證消息不會丟失,但還有可能會出現消息已經寫入WAL,但offset更新失敗的情況,Kafka就會按上一次的offset重新發送消息。

這種方式還會造成數據冗余(Kafka broker中一份數據,spark executor中一分),是吞吐量和內存利用率降低

2、基於direct

  • 基於direct的方法采用Kafka的簡單消費者API,它的流程大大簡化。
  • executor不再從Kafka中連續讀取消息,也消除了receiver和WAL。
  • Kafka分區與RDD分區一一對應,更可控
  • driver線程只需要每次從Kafka獲得批次消息的offset range
  • 然后executor進程根據offset range去讀取該批次對應的消息
  • 由於offset在kafka中能唯一確定一條消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,達到了Exactly Once

不過由於它采用了簡單消費者API,我們需要自己來管理offset。否則一旦程序崩潰,整個流只能從earliest或者latest點恢復。

Spark RDD之所以被稱為“彈性分布式數據集”,是因為它具有不可變、可分區、可並行計算、容錯的特征。一個RDD只能由穩定的數據集生成,或者從其他RDD轉換(transform)得來。如果在執行RDD lineage的過程中失敗,那么只要元數據不發生變化,無論重新執行多少次lineage,都會得到同樣的、確定的結果

Spark Streaming的輸出一般是靠foreachRDD()算子來實現,它默認是at least once的。如果輸出過程中出錯,那么就會重復執行知道寫入成功。為了讓它符合Exactly once,可以施加兩種限制之一:冪等性(idempotent write)事務性寫入(transactional write)

2.1 冪等性寫入

冪等性原來是數學里的概念,即f(f(x))=f(x)。

冪等寫入就是寫入多次與寫入一次的結果完全相同,可以自動將at least once轉化為Exactly once。這對於自帶主鍵或主鍵組的業務比較合適(如:各類日志、MySQL binlog),並且實現起來簡單

但是它要求處理邏輯是map-only的,也就是只能包含轉換、過濾等操作,不能包含shuffle、聚合等操作。如果條件更嚴格,就只能采用事務性寫入方法

stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)

        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }

2.2 事務性寫入

這里的事務和DBMS中的事務含義基本相同,就是對數據進行一系列訪問與更新操作所組成的邏輯塊。為了符合事務性的ACID特性,必須引入一個唯一ID標識當前的處理邏輯,並且將計算結果與該ID一起落盤。

ID可以由主題、分區、時間、offset等共同組成

事務操作可以在foreachRDD()時進行。如果數據吸入失敗,或者offset吸入與當前offset range不匹配,那么這一批次數據都將失敗並且回滾

// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }
 
      // store offsets
      val offsetRows = sql"""
    update txn_offsets set off = ${osr.untilOffset}
      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
    """.update.apply()
      if (offsetRows != 1) {
        throw new Exception("...")
      }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM