上篇介紹了kafka at-least-once消費模式。kafka消費模式以commit-offset的時間節點代表不同的消費模式,分別是:at-least-once, at-most-once, exactly-once。上篇介紹的at-least-once消費模式是通過kafka自身的auto-commit實現的。事后想了想,這個應該算是at-most-once模式,因為消費過程不會影響auto-commit,kafka在每個設定的間隔都會自動進行offset-commit。如果這個間隔夠短,比整個消費過程短,那么在完成消費過程前就已經保存了offset,所以是at-most-once模式。不過,如果確定這個間隔一定大於消費過程,那么又變成了at-least-once模式。具體能實現什么消費模式並不能明確,因為auto-commit是無法從外部進行控制的。看來實現正真意義上的at-least-once消費模式還必須取得offset-commit的控制權才行。
alpakka-kafka提供了一種CommittableSource:
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}
從這個CommittableSource輸出的元素是CommittableMessage[K,V]:
final case class CommittableMessage[K, V]( record: ConsumerRecord[K, V], committableOffset: CommittableOffset )
這個CommittableMessage除原始消息之外還提供了CommittableOffset。通過Flow或Sink都可以進行offset-commit。alpakka-kafka提供了Committer,通過Committer.sink, Committer.Flow幫助實現offset-commit,Committer.flow如下:
Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) .mapAsync(1) { msg => updateStock.map(_ => msg.committableOffset) } .via(Committer.flow(committerDefaults.withMaxBatch(1))) .to(Sink.seq) .run()
或Committer.sink:
Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) .mapAsync(1) { msg => updateStock.map(_ => msg.committableOffset) } .toMat(Committer.sink(committerSettings))(Keep.left) .run()
下面是一個具體的at-least-once示范:
val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch) val stkTxns = new DocToStkTxns(trace) val curStk = new CurStk(trace) val pcmTxns = new PcmTxns(trace) val commitableSource = Consumer .committableSource(consumerSettings, subscription) def start = (1 to numReaders).toList.map { _ => RestartSource .onFailuresWithBackoff(restartSource) { () => commitableSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", "")) } _ <- stkTxns.docToStkTxns(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { pcm <- pcmTxns.writePcmTxn(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- pcmTxns.updatePcm(msg.record.value()) } yield "Completed" FastFuture.successful(msg.committableOffset) } .toMat(Committer.sink(committerSettings))(Keep.left) .run() }
消費過程其它部分的設計考慮和實現,如多線程、異常處理等可參考上篇討論。
對於at-most-once消費模式的實現,alpakka-kafka提供了atMostOnceSource:
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}
下面是用這個Source實現at-most-once的示范:
val atmostonceSource = Consumer .atMostOnceSource(consumerSettings, subscription) def start = (1 to numReaders).toList.map { _ => RestartSource .onFailuresWithBackoff(restartSource) { () => atmostonceSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", "")) } _ <- stkTxns.docToStkTxns(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", "")) } curstks <- curStk.updateStk(msg.value()) pmsg<- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", "")) } pcm <- pcmTxns.writePcmTxn(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", "")) } _ <- pcmTxns.updatePcm(msg.value()) _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", "")) } } yield "Completed" } .toMat(Sink.seq)(Keep.left) .run() }
由於offset-commit和消息消費是兩個獨立的過程,無論如何努力都無法保證只讀一次,必須把這兩個過程合並成一個才有可能實現。所以,exactly-once可以通過數據庫系統的事務處理transaction-processing來實現,就是把offset-commit和數據更新兩個動作放到同一個事務transaction里,通過事務處理的ACID原子特性保證兩個動作同進同退的一致性。這也意味着這個exactly-once消費模式必須在一個提供事務處理功能的數據庫系統里實現,也代表kafka-offset必須和其它交易數據一起存放在同一種數據庫里。mongodb4.0以上支持事務處理,可以用來作示范。
首先,先研究一下exactly-once模式的框架:
val mergedSource = Consumer .plainPartitionedManualOffsetSource(consumerSettings,subscription, loadOffsets) .flatMapMerge(maxReaders, _._2) .async.mapAsync(1) { msg =>
for { cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed") pmsg <- FastFuture.successful { log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", "")) msg } } yield pmsg } .mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .toMat(Sink.seq)(Keep.left) .run() } }
在上面的例子里使用了plainPartitionedManualOffsetSource:
def plainPartitionedManualOffsetSource[K, V]( settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]], onRevoke: Set[TopicPartition] => Unit = _ => () ): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}
getOffsetsOnAssign提供指定partition的offset(從數據庫里讀出指定partition的offset值),如下:
private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = { offsetStore.getOffsets(partitions) } def getOffsets(partitions: Set[TopicPartition])( implicit ec: ExecutionContext) = { log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", "")) def getOffset(tp: TopicPartition) = { val query = and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()), equal(KfkModels.SCHEMA.PARTITION,tp.partition())) def offset: Future[Seq[Document]] = colOffset.find(query).toFuture() for { docs <- offset ofs <- FastFuture.successful(if(docs.isEmpty) None else Some(Offsets.fromDocument(docs.head))) } yield ofs } val listFut = partitions.toList.map(getOffset) val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut) futList.map { oofs => oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) => oof match { case None => m case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset) } } } }
注意loadOffset的函數類型: Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是個Map[partition,offset]。
另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:
/** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` * substreams are being consumed at any given time. * * '''Emits when''' a currently consumed substream has an element available * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels */ def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](breadth))
參數breadth代表需合並的source數量。
還有,saveOffset和writeStkTxns在同一個事務處理里:
def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = { val bizDoc = fromJson[BizDoc](jsonDoc) log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", "")) observable.map(clientSession => { val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace) StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace) offsetStore.saveOffset(clientSession,partition,offset) clientSession.commitTransaction() clientSession }) }
注意:mongodb的事務處理必須在復制集replica-set上進行。這也很容易理解,在復制集上才方便交易回滾rollback。
完整的exactly-once實現代碼如下:
private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = { offsetStore.getOffsets(partitions) } val mergedSource = Consumer .plainPartitionedManualOffsetSource(consumerSettings,subscription, loadOffsets) .flatMapMerge(maxReaders, _._2) def start = { (1 to numReaders).toList.map {_ => RestartSource .onFailuresWithBackoff(restartSource) { () => mergedSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed") pmsg <- FastFuture.successful { log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { pcm <- pcmTxns.writePcmTxn(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- pcmTxns.updatePcm(msg.value()) } yield "Completed" } .toMat(Sink.seq)(Keep.left) .run() } }
只有第一個異步階段使用了事務處理。也就是說保證了writeStkTxns只執行一次。這個函數的功能主要是把前端產生的交易全部固化。為了避免消費過程中出現異常中斷造成了前端交易的遺失或者重復入賬,必須保證前端交易只固化一次。其它階段的數據處理都是基於已正確固化的交易記錄的。如果出現問題,可以通過重算交易記錄獲取正確的狀態。為了保證平台運行效率,選擇了不使用事務處理的方式更新數據。
