在前面的的討論里已經介紹了CQRS讀寫分離模式的一些原理和在akka-typed應用中的實現方式。通過一段時間akka-typed的具體使用對一些經典akka應用的遷移升級,感覺最深的是EvenSourcedBehavior和akka-cluster-sharding了。前者是經典akka中persistenceActor的替換,后者是在原有組件基礎上在使用方面的升級版。兩者都在使用便捷性方面提供了大幅度的提升。在我看來,cluster-sharding是分布式應用的核心,如果能夠比較容易掌握,對開發正確的分布式系統有着莫大的裨益。但這篇討論的重點將會集中在EventSourcedBehavior上,因為它是實現CQRS的關鍵。而CQRS又是大數據應用數據采集(輸入)管理最新的一個重要模式。
EventSourcedBehaviro是akka-typed為event-sourcing事件源模式提供的開發支持。具體的原理和使用方法在前面的博客里都介紹過了,在這篇就不再重復。我們把時間精力放到對event-sourcing的了解和應用上。
可以說,event-sourcing是一種數據庫操作的模式。簡單來說:event-sourcing的工作原理是把對數據庫的操作動作保存起來,不直接對數據庫進行即時更新,而是在一個階段之后通過回溯replay這些動作才對數據庫進行實質的更新。event-sourcing與傳統數據庫操作模式的最大分別就是:event-sourcing對數據庫的更新過程可以重復,在一個既定的原點開始重演所有動作可以得出同樣的結果,即同樣的數據庫狀態。在大數據、高並發應用中最難控制的應該就是用戶操作了。用戶可能在任何時間同時對同一項數據進行更新。通用的傳統方式是通過“鎖”來保證數據的正確性,但“鎖”會給系統帶來更多的麻煩如響應慢甚至系統鎖死。而一旦出現系統鎖死重啟后並無有效辦法恢復數據庫正確狀態。event-sourcing恰恰就能有針對性的解決這些問題。
感覺到,event-sourcing模式應該可以避免對“鎖”的使用:在高並發環境里,event-sourcing系統的每個用戶在任何時間都有可能對數據庫進行操作。但他們並不直接改變數據庫內容,而是將這些對數據庫操作的動作保存起來。因為用戶保存的是各自的動作,互不關聯,所以不需要任何鎖機制。當系統完成一個階段的工作后,從這個階段的起點開始,把所有用戶的動作按發生時間順序重演並對數據庫進行實質的更新。可以看到,這個具體的數據庫更新過程是單一用戶的,所以不需要“鎖”了。階段的起點是由數據庫狀態快照來表示。在完成了這個階段所有動作重演后數據庫狀態一次性更新。整個過程即是CQRS讀寫分離模式了,其中:保存動作為寫部分,動作重演是讀部分。動作重演可以在之后的任何時間進行,因而讀、寫是完全分離的。實際上CQRS就是一個數據庫更新管理的狀態機器:從數據起始狀態到終結狀態的一種過程管理方法。下面就用一個實際的應用設計例子來介紹CQRS在應用系統中的具體使用。
下面討論一個超市收款機pos軟件的例子:
收款流程比較簡單:收款員登錄=>掃碼錄入銷售項目=>錄入折扣=>其它操作=>支付=>打小票
最終結果是在數據庫產生了一張銷售單,即一組交易數據,是實際反映在交易數據庫里的。從CQRS流程來解釋:這組銷售數據在開單時為空,然后在完成所有單據操作后一次性產生,也就是在CQRS模式的讀部分產生的。在這個過程中一直是寫部分的操作,不影響交易數據庫狀態。當然,我們還必須在內存里維護一個模擬的狀態來對每項操作進行控制,如:用戶未登錄時不容許任何操作動作。所以必須有個狀態能代表用戶登錄的,而這個狀態應該可以通過動作重演來重現,所以用戶登錄也是一個必須保存的動作。如此,每張銷售單在內存里都應該有一個狀態,這個狀態包括了單據狀態和一個動態的交易項目集合。這個項目集合就代表即將產生的數據庫交易數據。下面是單據狀態的定義:
case class VchStates( opr: String = "", //收款員
num: Int = 1, //當前單號
seq: Int = 1, //當前序號
void: Boolean = false, //取消模式
refd: Boolean = false, //退款模式
susp: Boolean = false, //掛單
canc: Boolean = false, //廢單
due: Boolean = false, //當前余額
su: String = "", mbr: String = "", disc: Int = 0, //預設折扣,如:會員折扣
mode: Int = 0 //當前操作流程:0=logOff, 1=LogOn, 2=Payment
) extends CborSerializable { ... }
交易項目是交易數據的直接對應:
case class TxnItem( txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")) , txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11) , opr: String = "" //工號
, num: Int = 0 //銷售單號
, seq: Int = 1 //交易序號
, txntype: Int = TXNTYPE.sales //交易類型
, salestype: Int = SALESTYPE.nul //銷售類型
, qty: Int = 1 //交易數量
, price: Int = 0 //單價(分)
, amount: Int = 0 //碼洋(分)
, disc: Int = 0 //折扣率 (%) 100% = 1
, dscamt: Int = 0 //折扣額:負值 net實洋 = amount + dscamt
, member: String = "" //會員卡號
, code: String = "" //編號(商品、部類編號、賬戶編號、卡號...)
, refnum: String = "" //參考號,如退貨單號
, acct: String = "" //賬號
, dpt: String = "" //部類
) extends CborSerializable {
為了提高系統效率,根據操作動作實時對交易項目進行了更新,如遇到折扣動作時需要更新上一條交易項目的優惠金額等。這也是在讀部分動作重演必須的,因為CQRS的讀部分目的是把正確的交易數據寫到數據庫里。所以,CQRS的寫部分就代表對內存中這個交易項目集的動態更新過程。
單據狀態在結單時用EventSourcedBehavior拿了個snapshot作為下一單的起始狀態。銷售中途出現異常退出后可以在上一單狀態快照的基礎上實施動作重演把狀態恢復到出現異常之前。
由於每個階段都可以清晰的用一張銷售單的生命周期來代表,所以在整單操作完成后就可以進行CQRS的讀部分了。操作結束的方式最明顯的是單據完成支付操作了,如下:
case PaymentMade(acct, dpt, num, ref,amount) =>
if (curItem.txntype != TXNTYPE.voided) { val due = items.totalSales - items.totalPaid val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount log.step(s"#${vchState.num} PaymentMade with input totalSales[${items.totalSales}], totalPaid[${items.totalPaid}], txnItems[${items}].") val vchs = vchState.copy( seq = vchState.seq + 1, due = (if ((items.totalPaid.abs + curItem.amount.abs) >= items.totalSales.abs) false else true), mode = (if (items.totalPaid.abs > 0) 2 else 1) ) val vItems = items.addItem(curItem.copy( salestype = SALESTYPE.ttl, price = due, amount = curItem.amount, dscamt = bal )).txnitems if (replay) { Voucher(vchs, vItems) } else { if (vchs.due) { val vch = Voucher(vchs,vItems) log.step(s"#${vchState.num} PaymentMade with current item: ${vch.items.head}") vch } else { writerInternal.lastVoucher = Voucher(vchs, vItems) if (!writerInternal.afterRecovery) endVoucher(Voucher(vchs,vItems),TXNTYPE.sales) Voucher(vchs.nextVoucher, List()) } } } else { log.step(s"#${vchState.num} PaymentMade with current item: $curItem") Voucher(vchState.copy( seq = vchState.seq + 1) , items.addItem(curItem).txnitems) }
確認了完成支付調用endVoucher. endVoucher啟動讀部分reader, 如下:
def endVoucher(voucher: Voucher, txntype: Int)(implicit writerInternal: WriterInternal,pid:Messages.PID) = { log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with state: ${writerInternal.lastVoucher.header}, txns: ${writerInternal.lastVoucher.items}") val readerShard = writerInternal.optSharding.get //ClusterSharding(writerInternal.actorContext.system)
val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId") val eseq = EventSourcedBehavior.lastSequenceNumber(writerInternal.optContext.get) val bseq = eseq - writerInternal.listOfActions.size + 1 log.step(s"#${writerInternal.lastVoucher.header.num } sending PerformRead(${pid.shopid}, ${pid.posid},${writerInternal.lastVoucher.header.num},${writerInternal.lastVoucher.header.opr},$bseq,$eseq,$txntype,${writerInternal.expurl},${writerInternal.expacct},${writerInternal.exppass}) ...") // log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass) writerInternal.clearListOfAction() log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}") }
reader是在一個sharding上即時構建的一個actor。這個actor的主要功能就是從journal里讀出這張單所有動作進行重演得出交易項目集后寫進交易數據庫:
def readActions(ctx: ActorContext[Command],vchnum: Int, cshr: String, startSeq: Long, endSeq: Long, trace: Boolean, nodeAddress: String, shopId: String, posId: String, txntype: Int): Future[List[TxnItem]] = { implicit val classicSystem = ctx.system.toClassic implicit val ec = classicSystem.dispatcher implicit var vchState = VchStates().copy(num = vchnum, opr = cshr) implicit var vchItems = VchItems() implicit var curTxnItem = TxnItem() implicit val pid = PID(shopId,posId) implicit val writerInternal = new Messages.WriterInternal(nodeAddress = nodeAddress, pid = pid, trace=trace) log.stepOn = trace log.step(s"POSReader: readActions($vchnum,$cshr,$startSeq,$endSeq,$trace,$nodeAddress,$shopId,$posId), txntype=$txntype") def buildVoucher(actions: List[Any]): List[TxnItem] = { log.step(s"POSReader: read actions: $actions") val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided]) val listOfActions = actions.reverse zip (LazyList from 1) //zipWithIndex
listOfActions.foreach { case (txn,idx) => txn.asInstanceOf[Action] match { case ti@_ => curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr) if (!ti.isInstanceOf[Voided]) { if (voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) { curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr = cshr) log.step(s"POSReader: voided txnitem: $curTxnItem") } } val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true) vchState = vch.header vchItems = vch.txnItems log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}") } } log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}") vchItems.txnitems } val query = PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra") // issue query to journal
val source: Source[EventEnvelope, NotUsed] = query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq) // materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny } for { lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items }