
在Kafka、Flink、Spark Streaming等分布式流處理系統中(Kafka本質上市流處理系統,不單是MQ),存在三種消息傳遞語義(Message Delivery Semantics):
- At Least Once
每條消息會被收到1次或多次。例如發送方S在超時時間內沒有收到接收方R的通知,或者收到了R的報錯,就會不斷重發消息直至R傳回ACK
- At Most Once
每條消息會被收到0次或1次。S只負責向R發送消息,R也沒有任何通知機制。無論R最終是否收到,S都不會重發
- Exactly Once
是上面兩個的綜合,保證S發送的每一條消息,R都會“不重不漏”地恰好一次收到
一個Spark Streaming程序由三步組成:輸入、處理邏輯、輸出
要達到Exactly Once的理想狀態,需要三步協同進行,而不是只與處理邏輯有關
Kafka與Spark Streaming集成時有兩種方法:
- 基於receiver
- 基於direct
1、基於receiver

- 基於receiver的采用kafka高級消費者API
- 每個executor進程都會不斷拉取消息,並同時保存在executor內存與HDFS上的預寫日志(Write-Ahead log,WAL)
- 當消息寫入WAL后,自動更新ZK中的offset
它可以保證At Least Once語義,但無法保證Exactly Once語義。雖然引入了WAL來保證消息不會丟失,但還有可能會出現消息已經寫入WAL,但offset更新失敗的情況,Kafka就會按上一次的offset重新發送消息。
這種方式還會造成數據冗余(Kafka broker中一份數據,spark executor中一分),是吞吐量和內存利用率降低
2、基於direct

- 基於direct的方法采用Kafka的簡單消費者API,它的流程大大簡化。
- executor不再從Kafka中連續讀取消息,也消除了receiver和WAL。
- Kafka分區與RDD分區一一對應,更可控
- driver線程只需要每次從Kafka獲得批次消息的offset range
- 然后executor進程根據offset range去讀取該批次對應的消息
- 由於offset在kafka中能唯一確定一條消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,達到了Exactly Once
不過由於它采用了簡單消費者API,我們需要自己來管理offset。否則一旦程序崩潰,整個流只能從earliest或者latest點恢復。
Spark RDD之所以被稱為“彈性分布式數據集”,是因為它具有不可變、可分區、可並行計算、容錯的特征。一個RDD只能由穩定的數據集生成,或者從其他RDD轉換(transform)得來。如果在執行RDD lineage的過程中失敗,那么只要元數據不發生變化,無論重新執行多少次lineage,都會得到同樣的、確定的結果
Spark Streaming的輸出一般是靠foreachRDD()算子來實現,它默認是at least once的。如果輸出過程中出錯,那么就會重復執行知道寫入成功。為了讓它符合Exactly once,可以施加兩種限制之一:冪等性(idempotent write)和事務性寫入(transactional write)
2.1 冪等性寫入
冪等性原來是數學里的概念,即f(f(x))=f(x)。
冪等寫入就是寫入多次與寫入一次的結果完全相同,可以自動將at least once轉化為Exactly once。這對於自帶主鍵或主鍵組的業務比較合適(如:各類日志、MySQL binlog),並且實現起來簡單
但是它要求處理邏輯是map-only的,也就是只能包含轉換、過濾等操作,不能包含shuffle、聚合等操作。如果條件更嚴格,就只能采用事務性寫入方法
stream.foreachRDD { rdd =>
rdd.foreachPartition { iter =>
// make sure connection pool is set up on the executor before writing
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.foreach { case (key, msg) =>
DB.autoCommit { implicit session =>
// the unique key for idempotency is just the text of the message itself, for example purposes
sql"insert into idem_data(msg) values (${msg})".update.apply
}
}
}
}
2.2 事務性寫入
這里的事務和DBMS中的事務含義基本相同,就是對數據進行一系列訪問與更新操作所組成的邏輯塊。為了符合事務性的ACID特性,必須引入一個唯一ID標識當前的處理邏輯,並且將計算結果與該ID一起落盤。
ID可以由主題、分區、時間、offset等共同組成
事務操作可以在foreachRDD()時進行。如果數據吸入失敗,或者offset吸入與當前offset range不匹配,那么這一批次數據都將失敗並且回滾
// localTx is transactional, if metric update or offset update fails, neither will be committed DB.localTx { implicit session => // store metric data val metricRows = sql""" update txn_data set metric = metric + ${metric} where topic = ${osr.topic} """.update.apply() if (metricRows != 1) { throw new Exception("...") } // store offsets val offsetRows = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (offsetRows != 1) { throw new Exception("...") } }
