alpakka-kafka(7)-kafka應用案例,消費模式


  上篇描述的kafka案例是個庫存管理平台。是一個公共服務平台,為其它軟件模塊或第三方軟件提供庫存狀態管理服務。當然,平台管理的目標必須是共享的,即庫存是作為公共資源開放的。這個庫存管理平台是一個Kafka消費端獨立運行的軟件。kafka的生產方即平台的服務對象通過kafka生產端producer從四面八方同時、集中將消息寫入kafka。庫存管理平台在kafka消費端不間斷監控kafka里新的未讀過的消息並及時讀取,解析消息獲取發布者對庫存管理的指令,然后按指令更新庫存狀態。

設計這個庫存管理平台最主要的目的先是為了保證庫存狀態的時效性、准確性,然后才是庫存更新的效率。由於庫存更新指令的產生是在一個高並發、異類系統、分布式環境里,上篇已經提到多線程環境下更新共享數據會產生的問題。不過通過kafka把並發產生的指令轉換成隊列然后按順序單線程逐句執行就能解決主要問題了。現在,平台的數據來源變成kafka消費端口上的一個數據流了,數據的讀取和消費自然也變成了逐條的。kafka提供了某種游標機制來記錄數據讀取的最新位置,防止數據消費過程中的遺漏、重復。記錄當前讀取位置offset的方式就是所謂數據消費模式代表數據消費不同程度的安全/效率比例,安全系數越高,流量越低。具體讀取位置offset可以存放在kafka內部,或者保存在某種數據庫表里。簡單來講,數據消費模式分三種:至多一次at-most-once,至少一次at-least-once,只此一次exactly-once。

從由kafka中讀出指令到成功完成執行指令整個消息消費過程可能經歷多個步驟。每個步驟都可能有失敗的可能,從而中斷過程影響數據消費結果。保存offset即offset-commit的時間點代表了三種消費模式的特性:

1、至多一次at-most-once:讀出數據立即commit-offset,然后才開始消費數據。無論消費過程中發生異常與否,下次都會從新的位置開始讀取,過去不再。如果一條數據在消費過程中發生事故中斷了過程,那這條數據就沒有發生應有的作用,就等於遺失了。

2、至少一次at-least-once:讀出數據、消費數據、然后才commit-offset。如果消費過程出現問題中斷,那么offset就得不到保存,下次再讀取時還是從原先位置重新開始。所以,一條數據有可能被多次讀取,造成重復消費的效果。

3、只此一次exactly-once:把保存offse和消費過程放到同一個事務transaction里。這種模式需要數據庫事物處理支持,也就是說offset-commit和數據處理都必須在同一種提供事物處理支持的數據庫環境里進行。offset-commit只會在確保消費過程成功完成后才進行。

at-most-once和at-least-once都使用kafka內部commit機制保存offset。at-least-once可以利用kafka的自動commit機制實現offset保存,只要通過kafka配置就可以了。下面是這個配置的示范:

 val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(group) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset) .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs.toString)

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = "true" 代表開啟auto-commit模式。ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG設置了auto-commit之間的毫秒時間間隔。在這個間隔內如果中斷消費過程,那么在這個間隔內讀取所有數據的offset都未能commit,但其中有些數據已經完成消費了。重啟讀取就會從這個間隔開始時的offset從頭讀取,那么之前消費的數據就會再次消費,等於重復消費了。auto-commit間隔設置的越短,重復消費的數據就越少,不過kafka需要更密集的進行commit-offset,運行效率就越低。反之,重復消費的數據量就越大,消費計算精確度越低,但運行效率就會提高。

在alpakka-kafka里用一個普通的Source就可以實現at-least-once消費模式了:

 

val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(group) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset) .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs.toString) var subscription = Subscriptions .topics(topic) val stkTxns = new DocToStkTxns(trace) val curStk = new CurStk(trace) val pcmTxns = new PcmTxns(trace) val plainSource = Consumer .plainSource(consumerSettings,subscription)

 

run這個plainSource形成的akka-stream就實現了一個完整kafka-reader功能:

 

 plainSource .mapAsync(1) {msg => updateStock(msg) } .toMat(Sink.seq)(Keep.left) .run()

offset-commit在這個reader-stream里是不可控的,是kafka按預先設置自動進行的。

plainSource是一個獨立的stream,代表單個reader。為了充分利用平台的硬件資源,首先考慮的是同時運行多個stream,如下:

 (1 to numReaders).toList.map { _ => plainSource .mapAsync(1) {msg => updateStock(msg) } .toMat(Sink.seq)(Keep.left) .run() }

這樣可以同時運行numReaders條stream。不過,現在設計方案又返回了多線程環境,好像又要面臨多並發所產生的一系列問題了。我們來分析分析:首先,前面描述的庫存更新多線程競爭問題主要是針對同一門店,同一商品,同時更新庫存狀態引發的。以上設計中每條stream,即每個reader,如果屬於同一個reader-group(group-id相同)的話,應共同分別負責所有partition中的部分partition,是不會共享partition的。那么,寫入每個partition的數據是否交叉重復就很關鍵了。實際上,在上游消息發布階段決定了消息應該寫入的具體partition,如下:

def writeToKafka(posTxn: PosTxns)(implicit producerKafka: ProducerKafka) = { val doc = BizDoc.fromPosTxn(posTxn) if (producerKafka.producerSettings.isDefined) { implicit val producer = producerKafka.akkaClassicSystem.get SendProducer(producerKafka.producerSettings.get) .send(new ProducerRecord[String, String](producerKafka.publisherSettings.topic, doc.shopId, toJson(doc))) } else FastFuture.successful(Completed) }

ProducerRecord[K,V] 的key設定為shopId,具體目標partition由kafka的默認指派算法根據key的值產生,保證同一key值一定會指派給同一個partition。雖然在門店數量>partition數量的情況下每個partition可以包含多個shopId, 但各partition所包含的shopId不會交叉重復。所以,以上多reader同時運行的設計中,只要屬於同一個reader-group,shopId就不會相同,就不會產生線程競爭問題。

那么,在同一個reader的消費過程中是否能使用多線程方式呢?上面的例子中使用了mapAsync(parallelism=1),這個代表了stream里的一個階段。這個階段容許收到上游數據后以parallelism個future來並行處理,同時可以保證流出下游的數據遵守上游流入數據的順序。但是,在同一階段用多線程方式計算方式在遇到同門店、同商品庫存更新時同樣會產生多線程競爭問題,所以只能取parallelism=1。不過,可以考慮把數據處理過程分割成幾個階段,因為每個階段流入流出的數據是同循序的,所以可以容許多個階段在在各自的線程里運算。如:

 (1 to numReaders).toList.map { _ => plainSource .mapAsync(1) {msg => produceStkTxns(msg) } asyn.mapAsync(1) {msg => updateCurStock(msg) } asyn.mapAsync(1) {msg => updatePurchase(msg) } .toMat(Sink.seq)(Keep.left) .run() }

可以用asyn.mapAsync來分割異線程域async-boundary以實現多線程運算效果。

下面的完整例子里把異常處理和重啟也考慮了進去:

 

  def start = (1 to numReaders).toList.map { _ => RestartSource .onFailuresWithBackoff(restartSource) { () => plainSource } // .viaMat(KillSwitches.single)(Keep.right)
        .async.mapAsync(1) { msg =>
        for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-msg: $msg")(Messages.MachineId("", "")) } _ <- stkTxns.docToStkTxns(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
        for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", "")) } curstks <- curStk.updateStk(msg.value()) pmsg<- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
        for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", "")) } pcm <- pcmTxns.writePcmTxn(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
        for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", "")) } _ <- pcmTxns.updatePcm(msg.value()) _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", "")) } } yield "Completed" } .toMat(Sink.seq)(Keep.left) .run() }

 

下面是幾個消費模式的測試示范代碼:

package com.datatech.txn.server import akka.actor.ActorSystem import scala.concurrent._ import MgoRepo._ import com.typesafe.config.ConfigFactory import scala.jdk.CollectionConverters._ object ConsumeModeTest extends App with JsonConverter { val config_onenode = ConfigFactory.load("onenode") implicit val system = ActorSystem("kafka-sys",config_onenode) var config = ConfigFactory.load() implicit val ec: ExecutionContext = system.dispatcher //mat.executionContext

  var httpport: Int = 53081
  var mongohosts  = List("localhost:27017") var elastichost = "http://localhost:9200"
  var _http_parallelism: Int = 8
  var _seednodes: String = "" val txnCfg = ConfigFactory.load("txnserver.conf").getConfig("txn.server") try { mongohosts = txnCfg.getStringList("mongohosts").asScala.toList elastichost = txnCfg.getString("elastichost") _http_parallelism = txnCfg.getInt("http_parallelism") _seednodes = txnCfg.getString("seednodes") httpport = txnCfg.getInt("httpport") } catch { case excp: Throwable => httpport = 53081 mongohosts = List("localhost:27017") elastichost = "http://localhost:9200" _http_parallelism = 8 } implicit val mgoClient = mongoClient(mongohosts) val readerConfig = config.getConfig("akka.kafka.consumer") val readerSettings = ReaderSettings(config.getConfig("kafka-txnserver-consumer")) implicit val idxer = new TxnIndex(elastichost,true) readerSettings.consumeMode.toLowerCase() match { case "atleastonce" => val readerGroup = AtLeastOnceReaderGroup(readerConfig,readerSettings, true) readerGroup.start case "atmostonce" => val readerGroup = AtMostOnceReaderGroup(readerConfig,readerSettings, true) readerGroup.start case "exactlyonce" => val readerGroup = ExactlyOnceReaderGroup(readerConfig,readerSettings, true) readerGroup.start case _ => val readerGroup = AtLeastOnceReaderGroup(readerConfig,readerSettings, true) readerGroup.start } scala.io.StdIn.readLine() idxer.close() scala.io.StdIn.readLine() system.terminate() }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM