SparkStreaming實現Exactly-Once語義


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

譯自:http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

查資料時發現上面這篇文章不錯,雖然是1.3的老版本的知識,但是還是有借鑒的地方,業余時間按照自己的理解翻譯了一遍,有不當的地方歡迎指正.

Apache Spark 1.3的版本包括從Apache Kafka讀取數據的新的RDD和DStream實現。 作為這些功能的主要作者,我想解釋一下它們的實現和用法。 你可能會感興趣因為你能從以下方面受益:

1>在使用Kafka時更均勻地使用Spark集群資源
2>消息傳遞語義的控制
3>交付保證,而不依賴於HDFS中的預寫日志
4>訪問message元數據


我假設你熟悉Spark Streaming文檔和Kafka文檔。 所有代碼示例都在Scala中,但在API中有很多方法對Java也比較友好

Basic Usage

Kafka RDD和DStream的新API在spark-streaming-kafka模塊中

SBT 依賴

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.3.0"

Maven 依賴:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>1.3.0</version>
</dependency>

利用spark streaming從kafka中讀取數據,請使用KafkaUtils.createDirectStream:
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
 
val ssc = new StreamingContext(new SparkConf, Seconds(60))
 
// hostname:port for Kafka brokers, not Zookeeper
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")
 
val topics = Set("sometopic", "anothertopic")
 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics)

對createDirectStream的調用會返回一個元組流,他由每個Kafka消息的鍵和值形成的。 它的返回類型是InputDStream [(K,V)],其中K和V在這種情況下的類型都是String。 這個返回類型的子類實現是DirectKafkaInputDStream。 createDirectStream方法還有其他重載,允許您訪問消息元數據,並精確的為每個主題和分區指定起始偏移。

如果從一個非流運算的spark job中讀取kafka數據,請使用KafkaUtils.createRDD:

import kafka.serializer.StringDecoder
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
 
    val sc = new SparkContext(new SparkConf)
 
    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")
 
    val offsetRanges = Array(
      OffsetRange("sometopic", 0, 110, 220),
      OffsetRange("sometopic", 1, 100, 313),
      OffsetRange("anothertopic", 0, 456, 789)
    )
 
    val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
      sc, kafkaParams, offsetRanges)

對createRDD的調用為每個Kafka消息的指定批次的偏移范圍內返回一個(key,value)格式的元組RDD。它的返回類型是RDD [(K,V)],子類實現是KafkaRDD。 createRDD方法還有其他重載來允許您訪問消息元數據,並指定當前每個主題和以及分區的leader。

 Implementation
DirectKafkaInputDStream是批處理流。 每個批次關聯對應的KafkaRDD。 KafkaRDD的每個分區對應一個OffsetRange。 大多數實現是私有的,但是
理解了以后還是非常有用的。

 OffsetRange
OffsetRange表示給定Kafka主題和分區中特定消息序列的下限和上限,以下是它的數據結構:
 

OffsetRange("visits", 2, 300, 310)

上面這行代碼標識從“visits”主題的第2個分區中的偏移300(包括)到偏移310(不包括)的10個消息。 注意,它實際上不包含消息的內容,它只是一種識別范圍的方法。

還要注意,因為Kafka排序只在每個分區的基礎上定義,下面這行代碼

OffsetRange("visits", 3, 300, 310)

對消息的引用可能來自於完全不同的時間段; 即使偏移量與上述相同,分區也不同。

KafkaRDD
回想一下RDD Class的定義如下:
1>包含一種將Job分區的方法(getPartitions)
2>包含為指定分區執行工作的方法(compute)
3>父RDD的列表, KafkaRDD是一個輸入,而不是一個轉換,所以它沒有parent
4>(可選)定義如何對Key進行哈希的partitioner。 KafkaRDD沒有定義.
5>(可選)給定分區的首選主機列表,以便將計算推送到數據所在的位置(getPreferredLocations).


KafkaRDD構造函數接收一個OffsetRanges數組和一個當前leader的主機和端口的映射,這個映射包含所有topic及其分區。 分離leader信息的原因是允許KafkaUtils.createRDD方法很方便調用KafkaRDD的構造函數,而你不需要知道leader信息 在這種情況下,createRDD將使用metadata.broker.list中指定的主機列表作為初始聯系,來調用Kafka API的必要的元數據信息去尋找leader 該初始查找將在Spark驅動程序進程中僅發生一次

KafkaRDD的getPartitions方法使用數組中的每個OffsetRange,並通過添加leader的主機和端口信息來將其轉換為RDD分區。 這里要注意的重要的是Kafka分區和RDD分區之間有1:1的對應關系。 這意味着Spark並行度(至少對於讀取消息)的程度將直接與Kafka並行度的程度相關。


getPreferredLocations方法使用給定分區的Kafka leader作為首選主機。 我沒有在和Kafka相同的主機上運行我的Spark executors,所以如果你這樣做了,讓我知道你是如何使它工作的

compute方法在Spark executors進程中運行。 它使用Kafka SimpleConsumer連接到給定主題和分區的leader,然后重復獲取請求以讀取指定范圍的偏移量的消息。

每個消息都使用構造器中的messageHandler參數做轉換。 messageHandler是Kafka MessageAndMetadata類型的用戶定義類型函數,默認是鍵和值的元組。 在大多數情況下,這種類型在每個分區的基礎上訪問主題和偏移元數據更為有效(參見下面對HasOffsetRanges的討論),但是如果真的需要將每個消息與其偏移關聯,你可以這樣做。

關於計算的關鍵點在於,由於偏移范圍是在驅動程序上預先定義的,因此由執行程序直接從Kafka讀取,特定KafkaRDD返回的消息是確定的。 因此,這里沒有重要的狀態保持在executors上,也沒有提交讀取偏移到Apache ZooKeeper的的概念,因為存在優先使用Kafka高級消費者解決方案。

由於計算操作是確定性的,所以如果任務失敗通常可以重新嘗試任務。 例如,如果Kafka leader丟失了,計算方法將進行休眠,休眠時間取決於Kafka參數refresh.leader.backoff.ms Kafka 中定義的時間,然后該任務失敗並讓正常的Spark任務重試機制處理它。 在第一次之后的后續嘗試中,新的leader的找出邏輯執行會作為executor 的compute方法的執行中的一部分。

DirectKafkaInputDStream

如果您有現有代碼來獲取和管理偏移,KafkaUtils.createRDD返回的KafkaRDD可用於批處理作業。 然而,在大多數情況下,您可能會使用KafkaUtils.createDirectStream,它返回一個DirectKafkaInputDStream。 類似於RDD,DStream定義為:
1>包含一個父DStreams列表。 再次說明,這是一個輸入DStream,而不是一個轉換,所以這里它沒有parent
2>包含stream生成批次的時間間隔。 這個stream使用上下文中定義的時間間隔
3>包含一種為給定時間間隔(compute)生成RDD的方法.

compute方法在驅動程序上運行。 它連接每個主題和分區的leader,不是讀取消息而是獲取最新的可用偏移量。 然后定義KafkaRDD,其偏移范圍跨越從最后一個批次的結束點到最近leader的offset.

要定義第一個批次的起始點,您可以為每個TopicAndPartition指定精確的偏移量,或者使用Kafka參數auto.offset.reset,它可以設置為“最大”或“最小”(默認為“最大”) 。 對於速率限制,可以使用Spark配置變量spark.streaming.kafka.maxRatePerPartition設置每個分區每個批次的最大消息數。

一旦定義了給定時間間隔的KafkaRDD,它將按照上述批處理用例情況完全執行。 與以前的Kafka DStream實現不同,不存在這種長期運行占用每個流的核心而不顧及消息量是多少的reveiver任務,對於我們在Kixer的使用案例,通常在大量主題的同一作業中有重要但少量的主題。 使用 direct stream,低容量分區導致較小的任務快速完成,並釋放該節點以處理批處理中的其他分區。 與此同時保持各個主題在邏輯上分開,這是一個相當大的成功,因為其均衡一致了集群的使用,

與批處理使用情況的顯着差異是存在隨時間變化的一些重要狀態,換句話說,即在每個時間間隔產生的偏移范圍。 executor或Kafka leader失敗不是一個大問題,如上所述,但如果驅動程序失敗,偏移范圍將會丟失,除非存儲在某個地方。 我將在下面的交付語義中更詳細地討論這一點,但你基本上有三個選擇:
1>如果您不關心丟失或重復的message,請不要擔心,只需從最早或最新的偏移重新啟動流
2>給stream建立檢查點,在這種情況下,偏移范圍(而不是消息,只是偏移范圍定義)將存儲在檢查點中
3>自己存儲偏移范圍,並在重新啟動stream時提供正確的起始偏移

同樣,沒有消費者偏移存儲在ZooKeeper中。 如果你想與現有的Kafka監控工具直接與ZK交談,你自己需要將偏移存儲到ZK(這不意味着它需要系統記錄你的偏移量,你可以只是復制它們)

請注意,因為Kafka被視為持久化的消息存儲,而不是瞬態網絡源,您不需要將消息復制到HDFS以進行錯誤恢復。 然而,這種設計確實有一些含義。 首先,您無法讀取Kafka中不再存在的message,因此請確保您保存了足夠的message。 第二個還是是你不能讀取Kafka中不存在的消息。 換句話說,執行者的消費者不輪詢新消息,驅動程序只是定期地與領導在每個批處理間隔檢查,因此有一些固有的延遲。

HasOffsetRanges

另一個實現細節是公共接口HasOffsetRanges,它帶有返回OffsetRange數組的單個方法。 KafkaRDD實現了此接口,允許您在每個分區的基礎上獲取主題和偏移量信息。

val stream = KafkaUtils.createDirectStream(...)
      ...
      stream.foreachRDD { rdd =>
        // Cast the rdd to an interface that lets us get a collection of offset ranges
        val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
        rdd.mapPartitionsWithIndex { (i, iter) =>
          // index to get the correct offset range for the rdd partition we're working on
          val osr: OffsetRange = offsets(i)
 
          // get any needed data from the offset range
          val topic = osr.topic
          val kafkaPartitionId = osr.partition
          val begin = osr.fromOffset
          val end = osr.untilOffset
          ...

使用這種間接層的原因是因為DStream方法使用的靜態類型像foreachRDD和transform只是RDD,而不是底層實現的類型(在這種情況下,私有)。 因為createDirectStream返回的DStream生成了KafkaRDD的批次,所以可以安全地轉換為HasOffsetRanges。 還要注意,由於偏移范圍和rdd分區之間的1:1對應關系,rdd分區的索引對應由offsetRanges返回的數組中的索引。
Delivery Semantics

首先,了解delivery 語義的Kafka文檔。 如果你已經閱讀過,請再次閱讀。 總之:消費者delivery 語義取決於你,不是Kafka。

第二,理解Spark不能保證輸出動作的一次性語義。 當Spark streaming guide談論至少一次時,它只是指一個RDD中的給定item包含在計算值中一次,純粹是功能意義上的。 任何包含副作用的輸出操作(即任何你在foreachRDD中保存結果的任何操作)可能會重復,因為進程的任何階段都可能失敗並重試。


第三,了解Spark檢查點可能無法恢復,例如在您需要更改應用程序代碼以獲取stream重新啟動的情況下。 這種情況可能會在1.4版本中被改進,但是要注意這是一個問題。 我以前遇到過這個坑,你可能也是。 任何地方,我提到“檢查點stream”作為一個選項,考慮所涉及的風險。 還要注意,反正任何窗口變換都將依賴於檢查點,

最后,我將重復,除了最多一次以外的任何語義需要您在Kafka有足夠的日志保留。 如果你看到像OffsetOutOfRangeException這樣的東西,這可能是因為你的Kafka存儲不足,而不是因為Spark或Kafka的錯誤。

鑒於所有這一切,你如何獲得相當於你想要的語義?

At-most-once
這在您將結果發送到非記錄系統,不想要重復的情況下,以及確保不會陷入message丟失的這種麻煩的情況下,這可能非常有用。 一個例子比如通過UDP發送摘要統計信息,因為這開始於一個不可靠的協議

要獲取最常用的語義,請執行以下所有操作:
1>將spark.task.maxFailures設置為1,因此作業失敗時作業立即結束。
2>確保spark.speculation為false(默認值),因此任務的多個副本不會被推測地運行。
3>當作業死亡時,使用Kafka param auto.offset.reset設置為“最大”啟動stream備份,因此它將跳轉到日志的當前結尾。

這意味着您在重新啟動時會丟失消息,但至少大概不應該重演一次 請仔細測試,如果你的消息不會重復對你實際上很重要,因為它不是一個常見的用例,我沒有提供它的示例代碼。

At-least-once
您可以重復message,但不會丟失message。 這方面的一個stream的例子相對少見,比如發送內部電子郵件警報。 在短時間內獲得重復的緊急警報比沒有得到它們好得多。

基本選項如下:
1>建立stream檢查點,或者
2>設置auto.offset.reset為最小並重新啟動作業。 這將從您的保留開始重新獲取整個日志,因此您最好保留相對較短的時間,或者對重復的郵件確實很好。

對stream建立檢查點作為下一個選項的基礎,因此請查看它的示例代碼。

Exactly-once using idempotent writes

冪等寫入使重復的消息安全,至少轉換一次到等同於一次。 這樣做的典型方式是通過具有某種類型的唯一key(嵌入在消息中,或使用主題/分區/偏移量作為key),並根據該key存儲結果。 依賴於每個消息的唯一鍵意味着這對於轉換或過濾單獨有價值的消息很有用,對於聚合多個消息則不一定。

關於這個idea,在IdempotentExample.scala有一個完整的例子。 它使用Postgres,為了與下一個示例的一致性,但是可以使用允許唯一鍵的任何存儲系統。

stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
 
        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }

在失敗的情況下,可以安全地重試上述輸出動作。 對stream進行檢查點確保偏移范圍在生成時保存。 檢查點以通常的方式完成,通過定義配置流上下文(ssc)和設置stream的功能,然后調用   
ssc.checkpoint(checkpointDir)
在返回ssc之前。 有關更多詳細信息,請參閱Streaming Guide
Exactly-once using transactional writes
對於支持事務的數據存儲,即使在故障情況下,也可以將結果保存在同一事務中的偏移來保持同步。 如果你仔細檢查那些重復或跳過的偏移范圍,回滾事務可防止重復或丟失的message影響結果。 這給出了恰好一次語義的等價物,並且直接用於聚合。
TransactionalExample.scala是一個完整的Spark作業,它實現了這個想法。 它雖然使用Postgres,但是可以使用任何具有事務語義的數據存儲。
第一個重要的點是,使用最后成功提交的偏移量作為開始點來啟動stream。 這允許故障恢復:

// begin from the the offsets committed to the database
    val fromOffsets = DB.readOnly { implicit session =>
      sql"select topic, part, off from txn_offsets".
        map { resultSet =>
          TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3)
        }.list.apply().toMap
    }
 
    val stream: InputDStream[Long] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long](
      ssc, kafkaParams, fromOffsets,
      // we're just going to count messages, don't care about the contents, so convert each message to a 1
      (mmd: MessageAndMetadata[String, String]) => 1L)

對於第一次運行作業,可以使用適當的起始偏移預先加載表。
如上面對HasOffsetRanges的討論中所述,該示例根據每個分區訪問偏移范圍,
注意mapPartitionsWithIndex是一個轉換,並且沒有等效的foreachPartitionWithIndex操作。 RDD轉換通常是惰性的,所以除非你添加某種類型的輸出動作,Spark將永遠不會調度作業來做任何事情。 在empty body上調用RDD的foreach就足夠了。 另外,注意一些迭代器方法,如map是懶惰的。 如果你正在設置瞬態狀態,如網絡或數據庫連接,則在映射完全強制時連接可能已關閉。 在這種情況下,請確保使用像foreach這樣的熱衷於使用迭代器的方法。

rdd.mapPartitionsWithIndex { (i, iter) =>
        // set up some connection
 
        iter.foreach {
          // use the connection
        }
 
        // close the connection
 
        Iterator.empty
      }.foreach {
        // Without an action, the job won't get scheduled, so empty foreach to force it
        // This is a little awkward, but there is no foreachPartitionWithIndex method on rdds
        (_: Nothing) => ()
      }

最后要注意的例子是,確保保存結果和保存偏移要么成功,要么都失敗,這個很重要。 如果先前提交的偏移不等於當前偏移范圍的開始,則存儲偏移將失敗; 這將防止間隙或重復。 Kafka語義確保在偏移范圍內的消息中沒有間隙(如果您特別關心,您可以通過將偏移范圍的大小與消息數量進行比較來驗證)。

// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }
 
      // store offsets
      val offsetRows = sql"""
    update txn_offsets set off = ${osr.untilOffset}
      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
    """.update.apply()
      if (offsetRows != 1) {
        throw new Exception("...")
      }
    }

示例代碼拋出異常,這將導致事務回滾。 其他故障處理策略可能是適當的,只要它們也導致事務回滾。

 Future Improvements
雖然Spark 1.3這個功能被認為是實驗性的,但是底層的KafkaRDD設計已經在Kixer生產幾個月。 它目前每天處理數十億條消息,批處理大小介於2秒到5分鍾之間。 話雖如此,已知已經有很多可以改進的地方(也可能有幾個未知的地方)。
1>連接池。 目前,Kafka消費者連接是根據需要創建的; 池應該有助於效率。 希望這可以以這樣一種方式實現:它很好地與正在進行的工作朝向Spark中的Kafka生產者API集成。
2>Kafka元數據API。 與Kafka交互的類目前是私有的,這意味着如果您想要通過low-level API訪問Kafka元數據,您需要復制一些工作。 這部分是因為Kafka消費者偏移API現在還不穩定, 如果這個代碼被證明是穩定的,那么有一個面向用戶的API來與Kafka元數據進行交互是很不錯的。
3>批生成策略。 現在,速率限制是唯一可用於定義流中下一批次的調整。 我們有一些涉及更大調整的用例,如固定的時間延遲。 定義批生成策略的靈活方式可能很有用

如果還有你可以想到的其他改進,請讓我知道

終於翻譯完了,有些復雜句子的意思我結合語境和相關知識斟酌了好久,有不當的地方歡迎指正.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM