➡️ 引言
近兩年,一直在折騰用FP與OO共存的編程語言Scala,采取以函數式編程為主的方式,結合TDD和BDD的手段,采用Domain Driven Design的方法學,去構造DDDD應用(Domain Driven Design & Distributed)。期間,嘗試了大量的框架:業務領域主要適用Akka、Scalaz等框架,UI和基礎設施方面主要適用Spring、Kafka等框架,測試則主要適用Spock、ScalaTest、Selenium等框架。
兩年的折騰,不能說沒有一點收獲。在核心的領域模型設計方面,通過嘗試用傳統Akka的Actor包裹聚合,以自定義的Command和Event進行信息交換,用Free Monad構造表達樹實現延遲計算,用Akka Persistence實現Event Store,用Kafka維護命令和事件隊列,讓我依稀看到了以FP的方式實現ES+CQRS架構應用的曙光。
但如此多的框架不僅讓人眼花繚亂,還要學習Scala、Groovy、Java等編程語言以及Gradle等Build工具的DSL,如果再加上Cluster、Cloud和Micro Service,這無疑是一個浩大的工程。所幸,伴隨Akka 2.6版本推出的Akka Typed,解決了消息難以管理、框架入侵過甚、節點部署繁復等痛點和難點。特別是在ES和CQRS方面,Akka Typed可以說是提供了一個完整的解決方案。就如同當初LINQ的出現一樣,給我打開了一扇新的窗口。
以下,便是我學習Akka Typed官方文檔時隨看隨寫留下的筆記(語種Scala,版本2.6.5,在學習過程中已更新至2.6.6),以備查考。內容和路線均以我個人的關注點為主,所以只是節選且難免有失偏頗。
📌 文中提到的RMP是指Vaughn Vernon撰寫的Reactive Messaging Patterns with the Actor Model一書。它是我學習傳統Actor模型時的重要參考書籍。
📎 在Github上,Paweł Kaczor開發出的akka-ddd框架,是一個非常棒的學習案例。
- akka-ddd:一個基於Akka和EventStore實現的CQRS架構的DDD框架。
- ddd-leaven-akka-v2:使用akka-ddd框架實現的電子商務應用。
目錄
- ➡️ Get Started
- ➡️ 術語和概念
- ➡️ Actors
- Actor概貌
- Actor的生命周期
- 交互模式
- 容錯能力
- 發現Actor
- 路由 Route
- 暫存 Stash
- Behavior是一台有限狀態機
- 協調關機 Coordinated Shutdown
- 消息分發器 Dispatchers
- 郵箱 Mailbox
- 測試
- Akka之Classic與Typed共存
- 函數式與面向對象風格指南
- 從傳統Akka過渡
- ➡️ Cluster
- ➡️ Persistence
- ➡️ 其他
➡️ Get Started
Actor模型的優點
- 事件驅動的:Actor相互之間只能用異步的消息進行聯系,不會產生直接的耦合。
- 強壯的隔離性: Actor不象普通的對象那樣提供可供調用的API,只會暴露它所支持的消息(通信協議),從而避免了狀態的共享。
- 位置的透明性: ActorSystem使用工廠創建Actor並返回其引用,所以位置無關緊要,Actor也可以啟動、停止、移動或者重啟,甚至從故障中恢復。
- 輕量性: 每個Actor通常只需要數百字節的開銷,所以一個應用程序完全可能擁有上百萬個並發Actor實例。
🔗 Akka庫與模塊一覽
- Actor Library:Akka Typed的核心
- Remoting:使Actor得以分布部署
- Cluster及其Sharding、Singleton:集群支持
- Persistence:使Actor得以將其事件持久化,是實現ES+CQRS的重要組成
- Distributed Data:在Actor之間共享數據
- Stream:使Actor支持流處理
示例 Internet of Things (IoT)
🔗 https://doc.akka.io/docs/akka/current/typed/guide/tutorial.html
這是一個在物聯時代,讓用戶可以借助遍布家中的溫度監測儀,隨時掌握屋內各個角落溫度情況的應用。
📌 所需的准備工作
-
在IDEA的
Editor/General/Auto Import
中排除對*javadsl*
的自動導入。 -
需要的依賴:
implementation 'org.scala-lang:scala-library:2.13.2' implementation 'com.typesafe.akka:akka-actor-typed_2.13:2.6.5' implementation 'ch.qos.logback:logback-classic:1.2.3' testImplementation 'org.scalatest:scalatest_2.13:3.1.2' testImplementation 'com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5'
📝 Note
- 打印ActorRef將獲得Actor的URL,從中可獲悉actor族譜。
- Actor的生命周期始終保持與其父Actor一致,Actor自身停止時推薦返回Behaviors.stopped,父可用context.stop(childRef)停止葉子Actor。
- Actor在其生命周期中可觸發PostStop之類的信號(參見RMP-47)。
- 在Actor內部處理消息的是onMessage,處理信號的是onSignal。
- 使用
val child = context.spawn(Behaviors.supervise(child()).onFailure(SupervisorStrategy.restart), name = "child-actor")
改變默認的監督策略Stop。傳統Akka采取的方式是override supervisorStrategy,用一個閉包聲明Decider函數(參見RMP-52)。 - “協議即API”——在Actor的世界里,協議Protocol取代了傳統OOP的接口Interface。利用這些由Command和Event組成的協議,各方的Actor們最終將借助from-replyTo所指向的ActorRef[Command/Event]完成對話。
- ⚡ 傳統Akka圍繞若干Actor的實例構築整個系統,而Akka Typed則圍繞Behavior的實例構築系統,這是觀念上的巨大差別。
- Command用現在時,可以理解為“我能做什么”,是Actor對外公開的API、是它能提供的服務;而Event則用過去式,表明“我關心什么”,是觸發Actor后續動作的事件。
- 傳遞消息涉及網絡、主機、Actor郵箱、Actor消息處理函數等多個環節,所以非常脆弱。主要的模式有三種(參見RMP-164:確保送達機制):
- 最多一次:消息在發出去就不用管,也不用保存消息傳送的狀態。所以消息可能會丟失。這是Actor默認采用的方式,簡單而高效。
- 最少一次:發送后還要保存消息傳送狀態甚至進行重試,以確保收件人收到消息。所以消息不會丟失,但不能避免重復。
- 正好一次:除了發件人,還要在收件人保存消息傳送狀態,以確保收件人不會接到重復的消息。所以消息既不會丟失,也不會重復。
- Actor保證直連雙方的消息會嚴格按序傳送,但不保證不丟失消息。
- 合理決定Actor的粒度,是Akka設計的核心難點:通常情況下都推薦使用較大的粒度,以降低細粒度引入的復雜度。僅在以下一些情況下方才增加粒度:
- 需要更多的Actor提供更高的並發性。
- 場景本身需要復雜的對話。
- 為減少不同狀態之間的耦合度,需要將多個狀態分別交由更小的參與者進行獨立維護。
- 為隔離失敗,減少不同參與者之間相互的干擾與牽扯,確保失敗情況造成最小的負面影響。
- 使用Dead Watch實現有關Actor停止時的互動。Dead Watch關系不僅限於父子之間,只要知道對方的ActorRef即可。當被觀察的Actor停止時,觀察者Actor將會收到一個Terminated(actorRefOfWatchee)信號。由於該信號無法附加其他信息,所以推薦做法是將其再包裝成一條消息WatcheeTerminated,並在創建被觀察者時就用
context.watchWith(watcheeActor, WatcheeTerminated(watcheeActor,...))
建立觀察關系。(💀 WatcheeTerminated會被context自動填充嗎?貌似是的。) - 遵循CQRS的原則,在Actor里也推薦讀寫分離,將Query放入單獨的Actor,避免對業務Actor的干擾。在示例中,由業務Actor負責創建Query Actor。
- Query通常都要設置超時,於是引出Actor內建的調度機制,在工廠的Behaviors.setup中使用Behaviors.withTimers定義timers,然后在Actor類里用timers.startSingleTime來調度一條經過給定延時后才發出的消息。
- 對於跨Actor的消息,通常需要使用context.messageAdapter()來提供一個消息轉譯器。而轉譯器最簡單的方案就是把消息(通常是響應)包裹在本Actor的某個消息里。
➡️ 術語和概念
三種無阻塞設計理念
為防止死鎖(大家都搶,結果都吃不上)、飢餓(弱肉強食,弱者老是吃不上)和活鎖(大家都謙讓,結果都不好意思吃),有三種確保死鎖無憂的無阻塞設計,其能力由強到弱如下:
- Wait-Freedom:需要確保每個方法調用都能在有限的步數內完成,能保證不死鎖、不飢餓。
- Lock-Freedom:需要確保某些關鍵方法調用能在有限的步數內完成,即可保證不死鎖,但不能避免飢餓。
- Obstruction-Freedom:需要確保某些關鍵方法在特定的時段或條件下能在有限的步數內完成,即可避免死鎖。所有的Lock-Freedom都是Obstruction-Freedom的,反之卻不盡然。樂觀並發控制(Optimistic Concurrency Control)就是典型的Obstruction-Freedom,因為在特定的時點,當只有一名參與者在嘗試時,其共享操作即可完成。
Actor System
關於Actor體系設計的最佳實踐
整個Actor System的體系,如同一個組織,任務總是逐級下派,命令總是下級服從上級。這與常見的分層軟件設計(Layered Software Design)是不同的,后者總是想方設法把問題隱藏和解決在自己那一層,而不是交給上級去處理或與其他人協商。推薦的做法主要包括:
- 如果一個Actor攜帶的數據非常重要,那么為了防止自身崩潰,導致數據損失,就應該把危險的任務交給子Actor負責,確保每個Request都由一個獨立的子Actor進行處理,並負責好子Actor失敗時的善后工作。(這被稱作Erlang的“錯誤內核模式 Error Kernel Pattern”)
- 如果一個Actor依賴另一個Actor來完成自己的工作,那么就要建立Watch關系,確保接受其委托的代理Actor始終處於有效狀態。
- 如果一個Actor承擔了太多不同的職責,那么就把這些職責分派給不同的子Actor去負責。
關於Actor設計的最佳實踐
- Actor應當是位很好的同事,它總是能獨立完成自己份內的工作,而且盡可能不打擾別人、不獨占資源。即便需要訪問某些外部資源,除非是逼不得已,它也不會處於阻塞狀態。
- 不要在Actor之間傳遞可變對象,應盡可能使用不可變的消息。
- Actor被設計成包含了行為與狀態的容器,所以不要習慣性地使用閉包等語法糖在消息里夾帶行為,這將因分享可變狀態而產生各種不可控的意外情況。
- 應用中最頂層的Actor是整個錯誤內核模式的最核心,它應當只負責啟動各個子系統,而不承擔其他的業務職責。否則,它會因監督責任過重,影響失敗和故障的處理。
協調關機
🔗 https://doc.akka.io/docs/akka/current/coordinated-shutdown.html
當應用的所有工作完成后,可以通知/user監督者停止運行,或者調用ActorSystem.terminate方法,從而通過運行協調關機CoordinatedShutdown來停止所有正在運行的Actor。在此過程中,你還可以執行其他一些清理和掃尾工作。
Actor基礎
官方文檔有專章講解Actor的方方面面,本章只是介紹基本概念。
Actor的主要作用包括:向熟識的其他Actor發送消息,創建新的Actor,指定處理下一條消息的行為。它作為一個容器,包括有狀態State、行為Behavior、郵箱Mailbox、監督策略Supervisor Strategy以及若干的子Actor等內容物,且該容器只能通過指定消息類型的參數化ActorRef進行引用,以確保最基本的隔離:
- State可以是一台復雜的狀態機,也可以只是一個簡單的計數值,本質上是由Actor內部維護的一個狀態。它將在Actor重啟時回復到Actor剛創建時候的樣子,或者也可以采用Event Sourcing的方式在重啟后恢復到故障發生前的樣子。
- Behavior總是和當前Actor要處理的消息相對應,並且在Actor創建之初總會有一個初始化的行為。而在Actor的生命周期內,Actor的Behavior將可能隨Actor的狀態變化而變化,由上一個Behavior切換至下一個Behavior。
- 由於消息總是發送給ActorRef的,而這背后實際對應的是能響應該消息的Behavior,所以這種對應關系必須在Actor創建之時就聲明,且Behavior自身也和ActorRef一樣是參數化的,這同時也決定了彼此切換的兩個Behavior必須是類型相容的,否則便無法與其ActorRef保持一致。(💀 這便是為什么同一個Actor的Message要從同一個trait派生,以表明它就只處理這一類的消息。)
- 在回應Command的回復消息里,通常都會包括指向應回復Actor的replyTo引用,所以能以這種方式把第三者引入當前的會話當中。
- Mailbox按照消息的發送時間將收到的消息排好隊,再交給Actor處理。默認的Mailbox是FIFO隊列。從Mailbox中出隊的消息,總是交由當前的Behavior進行處理。如果Behavior無法處理,就只能作失敗處理。
- Child Actors總是由父Actor監管,在spawn或stop后從context的列表中加入或退出,且這一類異步操作不會造成父Actor的阻塞。
- Supervisor Strategy用於定義異常發生時的應對策略。默認情況下Akka Typed在觸發異常時采取停止Actor的策略,而傳統的Akka則采取的重啟策略。
在Actor終止后,其持有的所有資源將被回收,剩下未處理的消息將轉入Actor System的死信郵箱Dead Letter Mailbox,而后續新傳來的消息也將悉數轉到System的EventStream作為死信處理。
監管與監測
⚠️ Akka Typed的監管已經重新設計,與傳統Akka有顯著區別
監管 Supervision
監管的對象是意料之外的失敗(Unexpected Failure),而不是校驗錯誤或者try-catch能處理的預期異常。所以,監管是Actor的額外裝飾,並不屬於Actor消息處理的組成部分。而后者,則是屬於Actor業務邏輯的一部分。
當失敗發生時,監管的策略包括以下三種:
- Resume:恢復Actor及其內部狀態。
- Restart:清理Actor內部狀態並恢復到Actor剛創建時候的樣子。實際上,這是由父Actor使用一個新的Behavior實例替換掉當前失敗Child Actor的行為,並用新的Actor接管失敗Actor的郵箱,從而實現重啟。
- Stop:永久地停止Actor。
⚡ 要注意的是,引發失敗的那條消息將不會再被處理,而且期間Actor發生的這些變化,在父Actor以外的范圍都是不可知的。
生命周期監測 Lifecycle Monitoring
Lifecycle Monitoring通常是指DeathWatch(💀 之前叫Dead Watch,Watch觀察,Monitoring監測,譯為觀察更為妥帖)。這是除了父子間的監管關系外,Actor之間另一種監測關系。由於Supervision導致的Actor Restart對外是不可知的,所以要用Monitoring在一對Actor之間建立監測關系。但從目的上講二者是有區別的,Supervision主要為應對失敗情形,Monitoring主要為確保另一方知悉本方已終止運行。
使用context.watch(targetActorRef)
及unwatch來建立或撤銷監測關系。當被監測Actor終止時,監測方Actor將收到一條Terminated消息(💀不是Signal嗎?),而默認的消息處理是拋出一個DeathPactException
。
⚡ 要注意的是,監測關系的建立和目標Actor終止時間無關。這就意味着在建立監測關系時,即使目標Actor已經終止,此時監測Actor仍將收到一條Terminated消息。
在消息處理過程中觸發異常時的結果
- 對消息而言:該消息將被丟失,不再退回到郵箱。所以必須自己捕獲異常,並建立相應的重試機制,並兼顧到非阻塞的要求。
- 對郵箱而言:沒有任何影響,后續的消息即使Actor被重啟也將全部保留。
- 對Actor而言:如果Actor將異常拋出,則其將被父Actor掛起(Suspend),並根據父Actor的監管策略決定將被恢復、重啟還是終止。
容錯能力設計
🔗 https://doc.akka.io/docs/akka/current/typed/fault-tolerance.html
Actor引用、路徑和地址
一些基本的Actor Reference
- ActorContext.self:指向自己的引用
- PromiseActorRef:由Ask方式為回調而創建的ActorRef
- DeadLetterActorRef:默認的死信服務提供的ActorRef
- EmptyLocalActorRef:當被查找的Actor不存在時Akka使用的ActorRef。它雖等價於DeadLetterActorRef,但因其保留有path,因此該引用仍可被傳送,用以與位於相同路徑的Actor引用進行比較,以確定后者是否為Actor死亡前獲得的。(💀 有點類似Null Object模式。)
Actor引用與路徑之間的區別
- Reference與Actor同生共死,隨着Actor生命結束而失效。所以即便是處於同一Path的新舊2個Actor,也不會有同一個Reference,這也意味着發給舊ActorRef的消息永遠不會自動轉發發新的ActorRef。
- Path只是一個代表族譜關系的名字,不存在生存周期,所以永不會失效。
獲取Reference的2個主要渠道
- 直接創建Actor。
- 通過接線員Receptionist從已注冊的Actor里查找。
Actor與Java內存模型
為防止Actor相互可見和消息亂序問題,Akka嚴格遵守以下兩條“發生之前(happens before)”守則:
- The actor send rule:發件人發送消息將始終先於收件人收到消息。
- The actor subsequent processing rule:任何一個Actor,在任一時刻,有且只能處理一條消息。處理完成當前消息后,才接着處理下一條消息。
可靠的消息投遞
Delivery翻譯為“投遞”更為妥帖,更好模仿郵政業務的妥投等術語。“送達”側重結果,“發送"側重動作本身。
Akka消息投遞遵循的兩條原則
- 一條消息最多被投遞一次。從業務角度講,相比命令發成功沒有,我們實際更關心對方的回復,有回復即印證對方收到命令了,否則重發命令進行催促即可。
- 在一對發件人-收件人之間,消息的發送與接收順序始終保持一致(僅限於用戶自定義消息,不包括父子間的系統消息)
Akka消息傳遞采用的ACK-RETRY協議內容
- 區分不同的消息及其確認消息的標識機制
- 在超時前仍未收到預期的確認消息時的重試機制
- 收件人甄別重復消息並決定丟棄它的檢測機制。實現它的第一種方式,是直接采用Akka的妥投模塊,改變消息投遞模式為最少投遞一次。第二種方式,是從業務邏輯的角度,確保消息處理的設計是冪等的。
保證妥投模塊
借助Akka Persistence確保消息妥投。(參見RMP-164)
🔗 https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html
事件溯源
事件溯源的本質,是執行一條Command,衍生出若干條Event,這些Event既是Command產生的副作用,也是改變對象狀態的動因,及其生命周期內不可變的歷史。
Akka Persistence對事件溯源提供了直接支持。
🔗 https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing-concepts
帶確認回執的郵箱
可以通過自定義郵箱,實現消息投遞的重試。但這多數僅限於本地通訊的場景,具體原因參見🔗 The Rules for In-JVM (Local) Message Sends
死信
無法妥投的而不是因網絡故障等原因被丟失了的消息,將被送往名為/deadLetters的Actor,因此這些消息被稱為Dead Letter(參見RMP-161)。產生死信的原因主要是收件人不詳或已經死亡,而死信Actor也主要用於系統調試。
由於死信不能通過網絡傳遞,所以要搜集一個集群內的所有死信,則需要一台一台地收集每台主機本地的死信后再進行匯總。通過在系統的Event Stream對象akka.actor.DeadLetter
中注冊,普通Actor將可以訂閱到本地的所有死信消息。
配置
Akka使用Typesafe Config Library管理配置信息。該庫獨立於Akka,也可用於其他應用的配置信息管理。
Akka的ActorSystem在啟動時,所有的配置信息均會通過解析class path根目錄處的application.conf/.json/.properties等文件而加載入Config對象,並通過合並所有的reference.conf形成后備配置。
⚠️ 若正在編寫的屬於Akka應用程序,則Akka配置信息應寫入application.conf;若是基於Akka的庫,則配置信息應寫入reference.conf。並且,Akka不支持從另一個庫中覆寫(override)當前庫中的config property。
配置信息既可以從外部配置文件加載,也可用代碼實現運行時解析,還可以利用ConfigFactory.load()從不同地方加載。
import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory
val customConf = ConfigFactory.parseString("""
akka.log-config-on-start = on
""")
// ConfigFactory.load sandwiches customConfig between default reference
// config and default overrides, and then resolves it.
val system = ActorSystem(rootBehavior, "MySystem", ConfigFactory.load(customConf))
一個典型的多項目配置示例:
myapp1 {
akka.loglevel = "WARNING"
my.own.setting = 43
}
myapp2 {
akka.loglevel = "ERROR"
app2.setting = "appname"
}
my.own.setting = 42
my.other.setting = "hello"
相應的配置信息加載代碼示例:
val config = ConfigFactory.load()
val app1 = ActorSystem(rootBehavior, "MyApp1", config.getConfig("myapp1").withFallback(config))
val app2 = ActorSystem(rootBehavior, "MyApp2", config.getConfig("myapp2").withOnlyPath("akka").withFallback(config))
🔗 Akka的默認配置列表,長達近千行……
📎 Akka Config Checker是用於查找Akka配置沖突的有力工具。
➡️ Actors
🏭 com.typesafe.akka:akka-actor-typed:2.6.5
Actor概貌
Hello World
示例HelloWorld是由HelloWorldMain創建一個HelloWorld(即Greeter),在每次ActorSystem要求HelloWorld SayHello的時候,就創建一個SayHello消息所賦名稱對應的HelloWorldBot(所以會有若干個動作相同但名稱不同的Bot),然后要求Greeter去向這個Bot問好,最后以Greeter與Bot相互問候數次作為結束。
示例采用了FP風格,Actor的狀態和行為均在Singleton對象里定義,采用了類似傳統Akka receive()
的函數Behaviors.receive { (context, message) => ... }
,以消息類型作為約束,實現了Actor的互動與組合。在每個Bot里,利用消息的遞歸重入維持一個Greeting的計數值,屆滿則用Behaviors.stopped停止響應,否則遞歸重入。
Behaviors.receive {...}與receiveMessage {...}的區別,在於前者將把context帶入閉包。
ChatRoom
這是一個類似聊天室功能的示例,各Actor的職責、定義和聯系如下表:
Actor | 職責 | Behavior類型 | Command | Event |
---|---|---|---|---|
Main | 創建聊天室ChatRoom和客戶Gabbler,並為二者牽線搭橋 | NotUsed | ||
ChatRoom | 創建並管理一組Session | RoomCommand |
|
|
Session | 負責播發諸如Gabbler這樣的Client的發言 | SessionCommand |
|
|
Gabbler | 響應Session | SessionEvent |
|
示例先采用FP風格實現。比如ChatRoom在處理GetSession消息時,最后以chatRoom(ses :: sessions)返回一個新的Behavior實例結束,這里的sessions正是Actor ChatRoom維護的狀態。
示例演示了如何限制消息的發件人。比如session及其工廠方法,以及PublishSessionMessage類型均為chatroom私有,外部不可訪問;在session Behavior的PostMessage分支中,chatroom的ActorRef通過工廠方法傳入session,且類型被限制為ActorRef[PublishSessionMessage]。這樣外界只能與ChatRoom通信,然后由ChatRoom在內部將消息轉交Session處理。
處理消息的參數來源於工廠方法的傳入參數,還是封裝在消息的字段里,這個示例也分別給出了樣板。💀 在設計通信協議時,消息定義為Command還是Event,消息的主人是誰,處理消息需要的參數如何傳入等等,都是需要考慮的問題。
為實現程序安全退出,示例在Main的Behavior里,設置了Dead Watch觀察gabbler,並定義了Behaviors.receiveSignal {...},在收到gabbler處理完MessagePosted消息,因返回Behaviors.stopped而發出的Terminated信號后,以Main自身的Behaviors.stopped作為結束。
⚡ Behaviors.setup是一個Behavior的工廠方法,該Behavior的實例將在Actor啟動后才創建。而Behaviors.receive雖也是Behavior的工廠方法之一,但Behavior的實例卻是在Actor啟動的那一刻就同時創建的。
Actor的生命周期
Actor是一個需要顯式啟停並且自帶狀態的資源(子Actor與隨父Actor雖不共生、但定共死),所以回想在GC出現前需要自己管理內存句柄的時代吧。
Actor System是一個高能耗的系統,所以通常一個應用或者一個JVM里只有一個Actor System。
創建Actor
ActorContext
ActorContext可用作:
- 孵化(Spawn)子Actor和監管關系。
- 觀察(Watch)其他Actor,並在被觀察Actor停止運行時收到Terminated事件(信號)。
- 記錄日志(Logging)。
- 創建消息適配器Message Adapter。
- 以Request-Response方式與其他Actor進行交互。
- 訪問Actor自身引用
self
。
ActorContext本身並不是完全線程安全的,主要有以下限制:
- 不能從Future回調函數的線程訪問。
- 不能在多個Actor實例之間進行共享。
- 只能在普通的消息處理線程里使用。
孵化子Actor
孵化有兩層含義:創建並啟動。
孵化協議SpawnProtocol
在使用Behaviors.setup啟用SpawnProtocol后,在應用中任何地方都將可以不直接引用context,改用telling或asking方式完成Actor系統的組裝。其中,Ask方式的使用類似傳統Akka,它將返回Future[ActorRef[XX]]。
⚡ 留意示例代碼里的幾處泛型約束,由這些Message串起了應用的流程。
// 啟用SpawnProtocol的Actor
object HelloWorldMain {
def apply(): Behavior[SpawnProtocol.Command] =
Behaviors.setup { context =>
// Start initial tasks
// context.spawn(...)
SpawnProtocol()
}
}
implicit val system: ActorSystem[SpawnProtocol.Command] =
ActorSystem(HelloWorldMain(), "hello")
val greeter: Future[ActorRef[HelloWorld.Greet]] =
system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
context.log.info2("Greeting for {} from {}", message.whom, message.from)
Behaviors.stopped
}
val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))
for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
greeterRef ! HelloWorld.Greet("Akka", replyToRef)
}
停止Actor
Actor可以通過返回Behaviors.stopped作為接替Behavior來停止自身運行。
子Actor可以在處理完當前消息后,被其父Actor使用ActorContext.stop方法強行關停。
所有子Actor都將伴隨其父Actor關停而關停。
當Actor停止后將會收到一個PostStop信號,可以用Behaviors.receiveSignal在該信號的處理方法里完成其他的清理掃尾工作,或者提前給Behaviors.stopped傳入一個負責掃尾的閉包函數,以實現Actor優雅地關停。(💀 經測試,前者將先於后者執行。)
觀察Actor
由於Terminated信號只帶有被觀察者的ActorRef,所以為了添加額外的信息,在注冊觀察關系時可以用context.watchWith(watchee, SpecifiedMessageRef)取代context.watch(watchee)。這樣在Terminated信號觸發時,觀察者將收到預定義的這個SpecifiedMessageRef。
⚡ 注冊、撤銷注冊和Terminated事件的到來,在時序上並不一定嚴格遵守先注冊后Terminated這樣的規則,因為消息是異步的,且有郵箱的存在。
交互模式
Actor之間的交互,只能通過彼此的ActorRef[Message]來進行。這些ActorRef和Message,構成了Protocol的全部,既表明了通信的雙方,也表明了Actor能處理的消息、限制了能發給它的消息類型。
📎 要運行示例代碼,需要導入日志和Ask模式的支持:
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.AskPattern._
並且在test/resources文件夾下的logback-test.xml里配置好日志:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender"/>
<logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate">
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>
⭕ Fire-Forget
使用異步的、線程安全的tell發出消息,但不保證對方收到消息,也不關心該消息是否被對方處理完畢了。
實現要點
recipient ! message
適用場景
- 當消息是否被處理無關緊要時;
- 當對於消息未妥投或未處理的情形不需要處置預案時;
- 當為了提高吞吐量而需要最小化消息數量時(通常為發送一條響應需要創建兩倍數量的消息)。
缺點
- 如果來信數量超過處理能力,會把郵箱撐破;
- 發件人不會知曉消息是否丟失了。
⭕ Request & Response
發件人發出Request並附上回信地址,並以獲得收件人的Response作為消息妥投並被處理的確信。
實現要點
先定義Request和Response,隨后sender在發出Request時把self作為replyTo的ActorRef一並傳出,方便recipient收到Request后回復Response。
適用場景
- 當需要訂閱對方的Response時。
缺點
- Actor之間通常不會為彼此通信而專門定義一個Response消息(參見Adapted Response);
- 如果未收到Response,很難確定究竟是因為Request未妥投還是未被對方處理(參見ask方式);
- 如果沒有Request與Response之間一一對應的甄別機制或上下文,必然毫無用處(參見ask方式,或者每會話子Actor模式)。
⭕ Adapted Response
把收件人的Response進行簡單封裝,即作為發件人可處理的消息類型,從而減少發件人定義Protocol的負擔。
實現要點
定義收件人recipient的Response類型,再在sender里定義適配后的Response類型,然后在其Behavior.setup里用context.messageAdapter(rsp => WrappedResponse(rsp))
注冊一個消息適配器,最后在適配后消息的分支里取出原始的Response(當初由收件人回復的),再處理該消息。適配器能匹配預定義的響應類型及其派生類,總以最晚注冊的為有效版本,屬於sender且與sender同生命周期,所以當適配器觸發異常時將導致其宿主停止。
適用場景
- 當需要在2種不同協議間進行轉譯時;
- 當需要訂閱一個Actor返回的多種Response時。
缺點
- 如果未收到Response,很難確定究竟是因為Request未妥投還是未被對方處理(參見ask方式);
- 每種Response類型在任何時候只能有一個有效的適配器,所以若干個Actor只能共用一個適配器。
- 如果沒有Request與Response之間一一對應的甄別機制或上下文,必然毫無用處。
⭕ 在Actor之間使用ask方式實現Request-Response
把Request-Response原本使用的tell方式改為ask,從而能限定Response發回的時間,超時則視作ask失敗。
實現要點
在提問人中,通過Behaviors.setup定義隱式的超時時限,以context.ask(recipientRef, request) { case Success(Response(msg)) => AdaptedResponse(msg); case Failure(_) => AdaptedResponse(...) }
使用ask方式發出Request,並備妥Response到來時的適配預案(無需再額外象Adapted Response那樣注冊消息適配器),最后用工廠Behaviors.receiveMessage定義適配后響應消息的處理函數。
適用場景
- 當提問人需要查詢單次的Response時;
- 當提問人需要根據上一次Request的回復情況來決定下一步怎么做時;
- 當提問人在指定時限內未收到Response,需要在這種情況下決定重發Request時;
- 當提問人需要主動跟蹤Request被處理的情況,而不是一味追問答復人時(參見RMP-93 Back Pressure回壓模式,類似有最大容量限制的阻塞隊列,超出的請求將被直接拒絕);
- 當Protocol在設計時遺漏了必要的上下文信息,但又需要將信息臨時添附到會話中時。(這是指提問人在使用context.ask發出Request前,在ask調用語句前放置的相關信息。💀 這安全嗎,如果這些信息被其他代碼修改了怎么辦?真有必要的話,為什么不放進Request消息的結構里?)
缺點
- ask一次只能得到一條Response消息;
- 提問人給自己提問設置了時限,答復人卻未必知曉。所以當ask超時那一刻,答復人可能還在處理Request甚至才剛收到正要處理;
- 很難決策超時設置多長為妥,不當的時限設置可能導致過多的誤報。
⭕ 從Actor系統外部使用ask方式實現Request-Response
在Actor System以外直接用ask向某個Actor提問,最終得到用Future包裝好的回復。
實現要點
定義隱式的ActorSystem實例和超時時限,用reply: Future[Response] = recipient.ask(ref => Request).flatMap { case response => Future.successful(response); case another => Future.failed(...) }
定義Request-Response的對應關系,再通過system.executionContext
啟動執行,最后在reply的回調onComplete { case Success(Response) => ...; case Failure(_) => ...}
里取出Response區別處理。
適用場景
- 當需要從Actor系統之外向某個Actor提問時。
缺點
- 處在不同線程內的Future回調將可能導致各種意外;
- ask一次只能得到一條Response消息;
- 提問人給自己提問設置了時限,答復人卻未必知曉。
⭕ 忽略回復
當不關心收件人的回應時,在Request里把回信地址設置為什么也不干的ignoreRef
,使模式從Request-Response變為Fire-Forget。
實現要點
發件人發出Request時,把回復地址從context.self
改為什么消息也不處理的context.system.ignoreRef
。
適用場景
- 當協議里本設定有回復類型,但發件人偶爾不關心Response時。
缺點
由於ignoreRef將忽略所有發給它的消息,所以使用時必須小心。
- 如果使用不當,將會中斷兩個Actor的已有聯系;
- 當有外部ask請求發來時,ignoreRef將必定導致超時。
- Watch ignoreRef將變得沒有意義。
⭕ 自提Future結果
在Actor內部有Future類型的調用時,使用pipeToSelf獲取回調結果。盡管直接用Future.onComplete也能取出結果,但會因此將Actor的內部狀態暴露給外部線程(在onComplete里能直接訪問Actor內部狀態),所以並不安全。
實現要點
在Actor內部,先定義Future調用futureResult,再使用context.pipeToSelf(futureResult) { case Success(_) => WrappedResult(...); case Failure(_) => WrappedResult(...)}
將回調結果封裝入WrappedResult消息,最后在WrappedResult消息分支里再作回應。
適用場景
- 當需要從Actor里使用Future訪問諸如數據庫之類的外部資源時;
- 當Actor依賴Future返回結果才能完成消息處理時;
- 當需要在Future返回結果時仍保持調用前上下文時。
缺點
- 引入了額外的消息包裝。
⭕ 每會話子Actor
當一份響應需要綜合多個Actor的回復信息才能作出時,由一個父Actor委托多個子Actor搜集信息,待信息齊備后才由父Actor匯總發回給Request的請求人,請求人除與父Actor之間的協議外,對其間細節一概不知。這些子Actor僅活在每次會話期間,故名為“每會話”的子Actor。
實現要點
由父Actor在Behaviors.setup里構造實際承擔工作的一組子Actor,在Request處理過程中構造負責組織協調子Actor的管家Actor(其行為類型為Behavior[AnyRef],以保證類型最大程度地兼容)。隨后在管家Actor的Behaviors.setup里向子Actor發出Request,接着在Behaviors.receiveMessage里,使用遞歸反復嘗試從子Actor的Response里取出結果(生產條件下應該設定子Actor響應超時)。當所有結果都取出后,由管家Actor利用父Actor傳入的replyTo直接向外發出Response,最后停止管家Actor。
這當中的關鍵點包括:一是在管家Actor里的幾處,使用narrow限定Actor的類型T:<U,這也算是一種妥協,確保消息類型為子類型T而非父類型U,從而實現更嚴謹的約束;二是利用遞歸配合Option[T]取出子Actor的響應結果。
// 子Actor
case class Keys()
case class Wallet()
// 父Actor
object Home {
sealed trait Command
case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command
case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)
def apply(): Behavior[Command] = {
Behaviors.setup[Command] { context =>
val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet")
val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer")
Behaviors.receiveMessage[Command] {
case LeaveHome(who, replyTo) =>
context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who")
Behaviors.same
}
}
}
// 管家Actor
def prepareToLeaveHome(whoIsLeaving: String, replyTo: ActorRef[ReadyToLeaveHome],
keyCabinet: ActorRef[KeyCabinet.GetKeys], drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = {
Behaviors.setup[AnyRef] { context =>
var wallet: Option[Wallet] = None
var keys: Option[Keys] = None
keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys])
drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet])
Behaviors.receiveMessage {
case w: Wallet =>
wallet = Some(w)
nextBehavior()
case k: Keys =>
keys = Some(k)
nextBehavior()
case _ =>
Behaviors.unhandled
}
def nextBehavior(): Behavior[AnyRef] = (keys, wallet) match {
case (Some(w), Some(k)) =>
// 已取得所有結果
replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
Behaviors.stopped
case _ =>
Behaviors.same
}
}.narrow[NotUsed]
}
}
適用場景
- 當需要的結果來自於數個Actor的響應匯總時;
- 為保證至少送達一次而設計重試功能時(委托子Actor反復重試,直到獲取結果)。
缺點
- 由於子Actor是隨管家Actor的停止而停止的,因此要切實防止資源泄漏;
- 增加了實現的復雜度。
⭕ 一般意義上的響應聚合器
本模式非常類似每會話子Actor模式,由聚合器負責收集子Actor回應的信息,再反饋給委托人Actor。
實現要點
實現與Per Session Child Actor近似,只是在具體代碼上更具通用性而已。其中,context.spawnAnonymous
是起聯結作用的重要步驟。它不僅負責孵化聚合器,還要提前准備向子Actor發出Request的閉包,以及將子Actor回復轉換為統一的格式的映射閉包。聚合器被啟動后,即開始收集子Actor的回復,收集完成時即告終止。
// 允許子Actor有不同的協議,不必向Aggregator妥協
object Hotel1 {
final case class RequestQuote(replyTo: ActorRef[Quote])
final case class Quote(hotel: String, price: BigDecimal)
}
object Hotel2 {
final case class RequestPrice(replyTo: ActorRef[Price])
final case class Price(hotel: String, price: BigDecimal)
}
object HotelCustomer {
sealed trait Command
final case class AggregatedQuotes(quotes: List[Quote]) extends Command
// 將子Actor的回復封裝成統一的格式
final case class Quote(hotel: String, price: BigDecimal)
def apply(hotel1: ActorRef[Hotel1.RequestQuote], hotel2: ActorRef[Hotel2.RequestPrice]): Behavior[Command] = {
Behaviors.setup[Command] { context =>
context.spawnAnonymous(
// 這個傳遞給聚合器工廠的sendRequests是銜接聚合器及其委托人的關鍵
Aggregator[Reply, AggregatedQuotes](
sendRequests = { replyTo =>
hotel1 ! Hotel1.RequestQuote(replyTo)
hotel2 ! Hotel2.RequestPrice(replyTo)
},
expectedReplies = 2,
context.self,
aggregateReplies = replies =>
AggregatedQuotes(
replies
.map {
case Hotel1.Quote(hotel, price) => Quote(hotel, price)
case Hotel2.Price(hotel, price) => Quote(hotel, price)
}
.sortBy(_.price)
.toList),
timeout = 5.seconds))
Behaviors.receiveMessage {
case AggregatedQuotes(quotes) =>
context.log.info("Best {}", quotes.headOption.getOrElse("Quote N/A"))
Behaviors.same
}
}
}
}
object Aggregator {
// 用來兼容不同子Actor響應而定義的回復類型
type Reply = Any
sealed trait Command
private case object ReceiveTimeout extends Command
private case class WrappedReply[R](reply: R) extends Command
def apply[Reply: ClassTag, Aggregate](
sendRequests: ActorRef[Reply] => Unit,
expectedReplies: Int,
replyTo: ActorRef[Aggregate],
aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate,
timeout: FiniteDuration): Behavior[Command] = {
Behaviors.setup { context =>
context.setReceiveTimeout(timeout, ReceiveTimeout)
val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
// 向子Actor發出Request並搜集整理回復信息
sendRequests(replyAdapter)
def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = {
Behaviors.receiveMessage {
case WrappedReply(reply: Reply) =>
val newReplies = replies :+ reply
if (newReplies.size == expectedReplies) {
val result = aggregateReplies(newReplies)
replyTo ! result
Behaviors.stopped
} else
collecting(newReplies)
case ReceiveTimeout =>
val aggregate = aggregateReplies(replies)
replyTo ! aggregate
Behaviors.stopped
}
}
collecting(Vector.empty)
}
}
}
適用場景
- 當需要以相同的方式,從分布多處的多個Actor獲取信息,並以統一方式回復時;
- 當需要聚合多個回復結果時;
- 為保證至少送達一次而設計重試功能時。
缺點
- 越是通用的消息類型,在運行時越缺少約束;
- 子Actor可能造成資源泄漏;
- 增加了實現復雜度。
⭕ 延遲掐尾器 (Latency tail chopping)
這是聚合器模式的一種變形。類似於集群條件下,每個Actor承擔着同樣的工作職責,當其中某個Actor未按期響應時,將工作從這個遲延的Actor手里交給另一個Actor負責。
實現要點
💀 這個例子不夠完整,還需要進一步理解,比如為什么sendRequests需要一個Int參數,如果換作OO風格如何實現。
參考文獻 🔗 Achieving Rapid Response Times in Large Online Services
- 使用Behaviors.withTimers設置若干個定時器,由定時器負責向子Actor發出Request。
- 設置2個超時,其中請求超時是單個Actor完成工作的時限,到期未完成就交出工作;另一個是最遲交付超時,是整個工作完成的時限,到期則說明無法交付委托人的工作。
- 利用sendRequest函數(類型為
(Int, ActorRef[Reply]) => Boolean
)聯結掐尾器和具體承擔工作的Actor。如果sendRequest成功,說明請求已經發送給承擔工作的子Actor,那么就調度一條由請求超時限定的單個Request的消息,否則就調度一條由最遲交付超時限定的消息。
object TailChopping {
sealed trait Command
private case object RequestTimeout extends Command
private case object FinalTimeout extends Command
private case class WrappedReply[R](reply: R) extends Command
def apply[Reply: ClassTag](
sendRequest: (Int, ActorRef[Reply]) => Boolean,
nextRequestAfter: FiniteDuration,
replyTo: ActorRef[Reply],
finalTimeout: FiniteDuration,
timeoutReply: Reply): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
sendNextRequest(1)
def waiting(requestCount: Int): Behavior[Command] = {
Behaviors.receiveMessage {
case WrappedReply(reply: Reply) =>
replyTo ! reply
Behaviors.stopped
// 單個任務沒能按時完成,另外找人
case RequestTimeout =>
sendNextRequest(requestCount + 1)
// 整個工作交付不了,抱歉
case FinalTimeout =>
replyTo ! timeoutReply
Behaviors.stopped
}
}
def sendNextRequest(requestCount: Int): Behavior[Command] = {
if (sendRequest(requestCount, replyAdapter)) {
timers.startSingleTimer(RequestTimeout, nextRequestAfter)
} else {
timers.startSingleTimer(FinalTimeout, finalTimeout)
}
waiting(requestCount)
}
}
}
}
}
適用場景
- 當需要快速響應而必須降低不必要的延遲時;
- 當工作總是一味重復的內容時。
缺點
- 因為引入了更多的消息並且要重復多次同樣的工作,所以增加了整個系統的負擔;
- 工作的內容必須是冪等和可重復的,否則無法轉交;
- 越是通用的消息類型,在運行時越缺少約束;
- 子Actor可能造成資源泄漏。
⭕ 調度消息給自己
使用定時器,在指定時限到期時給自己發送一條指定的消息。
實現要點
- 使用Behaviors.withTimers為Actor綁定TimerScheduler,該調度器將同樣適用於Behaviors的setup、receive、receiveMessage等工廠方法創建的行為。
- 在timers.startSingleTimer定義並啟動定時器,在startSingleTimer設定的超時到期時將會收到預設的消息。
object Buncher {
sealed trait Command
final case class ExcitingMessage(message: String) extends Command
final case class Batch(messages: Vector[Command])
private case object Timeout extends Command
private case object TimerKey
def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = {
Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle())
}
}
class Buncher(
timers: TimerScheduler[Buncher.Command],
target: ActorRef[Buncher.Batch],
after: FiniteDuration,
maxSize: Int) {
private def idle(): Behavior[Command] = {
Behaviors.receiveMessage[Command] { message =>
timers.startSingleTimer(TimerKey, Timeout, after)
active(Vector(message))
}
}
def active(buffer: Vector[Command]): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
// 收到定時器發來的Timeout消息,緩沖區buffer停止接收,將結果回復給target。
case Timeout =>
target ! Batch(buffer)
idle()
// 時限到達前,新建緩沖區並把消息存入,直到緩沖區滿
case m =>
val newBuffer = buffer :+ m
if (newBuffer.size == maxSize) {
timers.cancel(TimerKey)
target ! Batch(newBuffer)
idle()
} else
active(newBuffer)
}
}
}
注意事項
- 每個定時器都有一個Key,如果啟動了具有相同Key的新定時器,則前一個定時器將被取消cancel,並且保證即便舊定時器的到期消息已經放入Mailbox,也不會再觸發(💀 定時器的Key可以自定義嗎?舊定時器的到期消息是被框架主動過濾掉的嗎?)。
- 定時器有周期性(PeriodicTimer)和一次性(SingleTimer)兩種,它們的參數形式都一樣:定時器鍵TimerKey、調度消息Message和時長Duration。區別在於最后一個參數對應周期時長或是超時時長。(⚡ 根據JAPI文檔,PeriodicTimer已經作廢,取而代之的是指定發送頻率的startTimerAtFixedRate或者指定兩次消息發送間隔時長的startTimerWithFixedDelay,區別參見下文調度周期的說明。)
- TimerScheduler本身是可變的,因為它要執行和管理諸如注冊計划任務等副作用。(💀 所以不是線程安全的?)
- TimerScheduler與其所屬的Actor同生命周期。
- Behaviors.withTimers也可以在Behaviors.supervise內部使用。當Actor重啟時,它將自動取消舊的定時器,並確保新定時器不會收到舊定時器的預設到期消息。
關於調度周期的特別說明
調度周期有兩種:一種是FixedDelay:指定前后兩次消息發送的時間間隔;一種是FixedRate:指定兩次任務執行的時間間隔。如果實難選擇,建議使用FixedDelay。(❗ 此處Task等價於一次消息處理過程,可見對Akka里的各種術語還需進一步規范。)
區別主要在於:Delay不會補償兩次消息間隔之間因各種原因導致的延誤,前后兩條消息的間隔時間是固定的,而不會關心前一條消息是何時才交付處理的;而Rate會對這之間的延誤進行補償,后一條消息發出的時間會根據前一條消息交付處理的時間而確定。(💀 換句話說,Delay以發出時間計,Rate以開始處理的時間計。)
長遠來看,Delay方式下的消息處理的頻率通常會略低於指定延遲的倒數,所以更適合短頻快的工作;Rate方式下的消息處理頻率恰好是指定間隔的倒數,所以適合注重完整執行次數的工作。
⚠️ 在Rate方式下,如果任務延遲超出了預設的時間間隔,則將在前一條消息之后立即發送下一條消息。比如scheduleAtFixedRate的間隔為1秒,而消息處理過程因長時間暫停垃圾回收等原因造成JVM被掛起30秒鍾,則ActorSystem將快速地連續發送30條消息進行追趕,從而造成短時間內的消息爆發,所以一般情況下Delay方式更被推崇。
⭕ 響應集群條件下分片后的Actor
在集群條件下,通常采用的在Request中傳遞本Shard Actor之ActorRef的方法仍舊適用。但如果該Actor在發出Request后被移動或鈍化(指Actor暫時地關閉自己以節約內存,需要時再重啟),則回復的Response將會全部發至Dead Letters。此時,引入EntityId作為標識,取代ActorRef以解決之(參見RMP-68)。缺點是無法再使用消息適配器。
⚠️ RMP-77:Actor的內部狀態不會隨Actor對象遷移,所以需要相應持久化機制來恢復Actor對象的狀態。
實現要點
把通常設計中的ActorRef換成EntityId,再使用TypeKey和EntityId定位Actor的引用即可。
object CounterConsumer {
sealed trait Command
final case class NewCount(count: Long) extends Command
val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response")
}
object Counter {
trait Command
case object Increment extends Command
final case class GetValue(replyToEntityId: String) extends Command
val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter")
private def apply(): Behavior[Command] = Behaviors.setup { context =>
counter(ClusterSharding(context.system), 0)
}
private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = Behaviors.receiveMessage {
case Increment =>
counter(sharding, value + 1)
case GetValue(replyToEntityId) =>
val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId)
replyToEntityRef ! CounterConsumer.NewCount(value)
Behaviors.same
}
}
容錯能力
默認情況下,當Actor在初始化或處理消息時觸發了異常、失敗,則該Actor將被停止(⚠️ 傳統Akka默認是重啟Actor)。
要區別校驗錯誤與失敗:校驗錯誤Validate Error意味着發給Actor的Command本身就是無效的,所以將其界定為Protocol規范的內容,由發件人嚴格遵守,這遠甚過收件人發現收到的是無效Command后直接拋出異常。失敗Failure則是由於Actor不可控的外因導致的,這通常無法成為雙方Protocol的一部分,發件人對此也無能為力。
發生失敗時,通常采取“就讓它崩”的原則。其思路在於,與其花費心思零敲碎打地在局部進行細粒度的修復和內部狀態糾正,不如就讓它崩潰停止,然后利用已有的災備方案,重建一個肯定有效的新Actor重新來過。
監管 Supervise
監管就是一個放置災備方案的好地方。默認監視策略是在引發異常時停止Actor,如果要自定義此策略,則應在spawn子Actor時,使用Behaviors.supervise進行指定。
策略有許多可選參數,也可以象下面這樣進行嵌套,以應對不同的異常類型。
Behaviors.supervise(
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restart))
.onFailure[IllegalArgumentException](SupervisorStrategy.stop)
⚠️ 若Actor被重啟,則傳遞給Behaviors.supervise的Behavior內定義的可變狀態就需要在類似Behaviors.setup這樣的工廠方法中進行初始化。若采用OO風格,則推薦在setup中完成初始化;若采用FP風格,由於通常不存在函數內的可變量,所以無需如此。
🔗 完整列表參見API指南:SupervisorStrategy
子Actor在父Actor重啟時停止
第二個放置災備的地方是Behaviors.setup里。因為當父Actor重啟時,其Behaviors.setup會再次執行。同時,子Actor會隨父Actor重啟而停止運行,以防止資源泄漏等問題發生。
注意區別以下兩種方式:
⭕ 方式一:由supervise包裹setup
這種方式下,每當父Actor重啟時,就會完全重構一次子Actor,從而總是回到父Actor剛創建時候的樣子。
def child(size: Long): Behavior[String] =
Behaviors.receiveMessage(msg => child(size + msg.length))
def parent: Behavior[String] = {
Behaviors
.supervise[String] {
// setup被supervise包裹,意味着每次父Actor重啟,該setup必被重新執行
Behaviors.setup { ctx =>
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
Behaviors.receiveMessage[String] { msg =>
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}
}
.onFailure(SupervisorStrategy.restart)
}
⭕ 方式二:由setup包裹supervise
這種方式下,子Actor不會受到父Actor的重啟影響,它們既不會停止,更不會被重建。
def parent2: Behavior[String] = {
Behaviors.setup { ctx =>
// 此setup只會在父Actor創建時運行一次
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
Behaviors
.supervise {
// 在父Actor重啟時,只有這段receiveMessage工廠會被執行
Behaviors.receiveMessage[String] { msg =>
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}
// 參數false決定了父Actor重啟時不會停止子Actor
.onFailure(SupervisorStrategy.restart.withStopChildren(false))
}
}
PreRestart信號
第三個放置災備方案的地方是在PreRestart信號處理過程里。和之前提過的PostStop信號一樣,Actor因監測而重啟前,會收到一個信號PreRestart信號,方便Actor自身在重啟前完成清理掃尾工作。
💀 RMP-47的對傳統Akka的描述適用於Akka Typed嗎?
- PreStart:在Actor啟動前觸發
- PostStop:在Actor停止后觸發
- PreRestart:在重啟Actor前觸發,完成任務后會觸發PostStop
- PostRestart:在Actor重啟后觸發,完成任務后會觸發PreStart
把異常當水泡一樣順着遺傳樹往上傳遞
在傳統Akka里,子Actor觸發的異常將被上交給父Actor,由后者決定如何處置。而在Akka Typed里,提供了更豐富的手段處理這種情況。
方法就是由父Actor觀察(watch)子Actor,這樣當子Actor因失敗而停止時,父Actor將會收到附上原因的ChildFailed信號。特別地,ChildFailed信號派生自Terminated,所以如果業務上不需要刻意區分的話,處理Terminated信號即可。
在子Actor觸發異常后,如果它的祖先Actor(不僅僅是父親)沒有處理Terminated信號,那么將會觸發akka.actor.typed.DeathPactException異常。
📎 示例里用Boss -> MiddleManagement -> Work這樣的層級進行了演示。當Boss發出Fail消息后,MiddleManagement將消息轉發給Work,Work收到Fail消息后拋出異常。因MiddleManagement和Boss均未對Terminated信號進行處理,因此相繼停止。隨后Boss按預定策略重啟,並順次重建MiddleManagement和Work,從而確保測試腳本嘗試在等候200毫秒后重新發送消息Hello成功。
發現Actor
除了通過創建Actor獲得其引用外,還可以通過接線員Receptionist獲取Actor的引用。
Receptionist采用了注冊會員制,注冊過程仍是基於Akka Protocol。在Receptionist上注冊后的會員都持有key,方便集群上的其他Actor通過key找到它。當發出Find請求后,Receptionist會回復一個Listing,其中將包括一個由若干符合條件的Actor組成的集合。(⚠️ 同一個key可以對應多個Actor)
由Receptionist維護的注冊表是動態的,其中的Actor可能因其停止運行、手動從表中注銷或是節點從集群中刪除而從表中消失。如果需要關注這種動態變化,可以使用Receptionist.Subscribe(keyOfActor, replyTo)訂閱關注的Actor,Receptionist會在注冊表變化時將Listing消息發送給replyTo。
⚠️ 切記:上述操作均是基於異步消息的,所以操作不是即時產生結果的。可能發出注銷請求了,但Actor還在注冊表里。
要點:
- 用
ServiceKey[Message]("name")
創建Key - 用
context.system.receptionist ! Receptionist.Register(key, replyTo)
注冊Actor,用Deregister注銷 - 用
context.system.receptionist ! Receptionist.Subscribe(key, replyTo)
訂閱注冊表變動事件 - 用
context.system.receptionist ! Receptionist.Find(key, messageAdapter)
查找指定key對應的若干Actor
集群的接線員
在集群條件下,一個Actor注冊到本地節點的接線員后,其他節點上的接線員也會通過分布式數據廣播獲悉,從而保證所有節點都能通過ServiceKey找到相同的Actor們。
但需要注意集群條件下與本地環境之間的差別:一是在集群條件下進行的Subscription與Find將只能得到可達Actor的集合。如果需要獲得所有的已注冊Actor(包括不可達的Actor),則得通過Listing.allServiceInstances獲得。二是在集群內各節點之間傳遞的消息,都需要經過序列化。
接線員的可擴展性
接線員無法擴展到任意數量、也達不到異常高吞吐的接轉要求,它通常最多就支持數千至上萬的接轉量。所以,如果應用確實需要超過Akka框架所能提供的接轉服務水平的,就得自己去解決各節點Actor初始化連接的難題。
路由 Route
盡管Actor在任意時刻只能處理一條消息,但這不並妨礙同時有多個Actor處理同一條消息,這便是Akka的路由功能使然。
路由器本身也是一種Actor,但主要職責是轉發消息而不是處理消息。與傳統Akka一樣,Akka Typed的路由也分為兩種:池路由池與組路由。
⭕ 池路由
在池路由方式下,由Router負責構建並管理所有的Routee。當這些作為子actor的Routee終止時,Router將會把它從Router中移除。當所有的Routee都移除后,Router本身停止運行。
示例要點
- 使用
val pool = Routers.pool(poolSize = 4)(Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
定義池路由,其中監管策略應是必不可少的內容,被監管的Worker()即是Routee,poolSize則是池中最多能創建並管理的Routee數目。 - 接着用
val router = ctx.spawn(pool, "worker-pool")
創建路由器本身。 - 之后便可以向路由器router發送消息了。
- 最終,消息將被路由給所有的routee(此處將有4個Worker的實例負責處理消息)。
- Behaviors.monitor(monitor, behaviorOfMonitee):將被監測的Monitee收到新消息的同時,將該消息抄送給監測者Monitor
由於Router本身也是Actor,Routee是其子Actor,因此可以指定其消息分發器。(💀 Router中以with開頭的API還有不少,需要仔細參考API文檔。)
// 指定Routee使用默認的Blocking IO消息分發器
val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
// 指定Router使用與其父Actor一致的消息分發器
val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
// 使用輪循策略分發消息,保證每個Routee都盡量獲得同樣數量的任務,這是池路由默認策略
// 示例將獲得a-b-a-b順序的日志
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
📌 在學習Akka Typed的過程中,應引起重視和警醒的是,不能象傳統Akka一樣執着於定義Actor的Class或Object本身,而應該緊緊圍繞Behavior來思考、認識和設計系統。
在Akka Typed的世界里,包括Behaviors各式工廠在內的許多API均是以Behavior為核心進行設計的。而Behavior又與特定類型的Message綁定,這便意味着Behavior與Protocol進行了綁定,於是消息Message及處理消息的Behavior[Message]便構成了完整的Protocol。
⭕ 組路由
與池路由不同的是,組路由方式下的Routee均由外界其它Actor產生(自行創建、自行管理),Router只是負責將其編組在一起。
組路由基於ServiceKey和Receptionist,管理着屬於同一個key的若干個Routee。雖然這種方式下對Routee構建和監控將更靈活和便捷,但也意味着組路由將完全依賴Receptionist維護的注冊表才能工作。在Router啟動之初,當注冊表還是空白時,發來的消息將作為akka.actor.Dropped扔到事件流中。當注冊表中注冊有Routee后,若其可達,則消息將順利送達,否則該Routee將被標記為不可達。
路由策略
-
輪循策略 Round Robin
輪循策略將公平調度各Routee,平均分配任務,所以適合於Routee數目不會經常變化的場合,是池路由的默認策略。它有一個可選的參數
preferLocalRoutees
,為true時將強制只使用本地的Routee(默認值為false)。 -
隨機策略 Random
隨機策略將隨機選取Routee分配任務,適合Routee數目可能會變化的場合,是組路由的默認策略。它同樣有可靠參數
preferLocalRoutees
。 -
一致的散列策略 Consistent Hashing
散列策略將基於一張以傳入消息為鍵的映射表選擇Routee。
🔗 參考文獻:Consistent Hashing
💀 該文只展示了如何設計一個ConsistentHash[T]類,並提供add/remove/get等API函數,卻沒講怎么使用它,所以需要完整示例!
關於性能
如果把Routee看作CPU的核心,那自然是多多益善。但由於Router本身也是一個Actor,所以其Mailbox的承載能力反而會成為整個路由器的瓶頸,而Akka Typed並未就此提供額外方案,因此遇到需要更高吞吐量的場合則需要自己去解決。
暫存 Stash
Stash(暫存),是指Actor將當前Behavior暫時還不能處理的消息全部或部分緩存起來,等完成初始化等准備工作或是處理完上一條冪等消息后,再切換至匹配的Behavior,從緩沖區取出消息進行處理的過程。
示例要點
trait DB {
def save(id: String, value: String): Future[Done]
def load(id: String): Future[String]
}
object DataAccess {
sealed trait Command
final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
final case class Get(replyTo: ActorRef[String]) extends Command
private final case class InitialState(value: String) extends Command
private case object SaveSuccess extends Command
private final case class DBError(cause: Throwable) extends Command
// 使用Behaviors.withStash(capacity)設置Stash容量
// 隨后切換到初始Behavior start()
def apply(id: String, db: DB): Behavior[Command] = {
Behaviors.withStash(100) { buffer =>
Behaviors.setup[Command] { context =>
new DataAccess(context, buffer, id, db).start()
}
}
}
}
// 大量使用context.pipeToSelf進行Future交互
class DataAccess(
context: ActorContext[DataAccess.Command],
buffer: StashBuffer[DataAccess.Command],
id: String,
db: DB) {
import DataAccess._
private def start(): Behavior[Command] = {
context.pipeToSelf(db.load(id)) {
case Success(value) => InitialState(value)
case Failure(cause) => DBError(cause)
}
Behaviors.receiveMessage {
case InitialState(value) =>
// 完成初始化,轉至Behavior active()開始處理消息
buffer.unstashAll(active(value))
case DBError(cause) =>
throw cause
case other =>
// 正在處理冪等消息,故暫存后續消息
buffer.stash(other)
Behaviors.same
}
}
// Behaviors.receiveMessagePartial():從部分消息處理程序構造一個Behavior
// 該行為將把未定義的消息視為未處理。
private def active(state: String): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Get(replyTo) =>
replyTo ! state
Behaviors.same
// 處理冪等的Save消息
case Save(value, replyTo) =>
context.pipeToSelf(db.save(id, value)) {
case Success(_) => SaveSuccess
case Failure(cause) => DBError(cause)
}
// 轉至Behavior saving(),反饋冪等消息處理結果
saving(value, replyTo)
}
}
private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = {
Behaviors.receiveMessage {
case SaveSuccess =>
replyTo ! Done
// 冪等消息處理結束並已反饋結果,轉至Behavior active()開始處理下一條消息
buffer.unstashAll(active(state))
case DBError(cause) =>
throw cause
case other =>
buffer.stash(other)
Behaviors.same
}
}
}
注意事項
- Stash所使用的緩沖區由Akka提供,其大小一定要在Behavior對象創建前進行設定,否則過多的消息被暫存將導致內存溢出,觸發
StashOverflowException
異常。所以在往緩沖區里暫存消息前,應當使用StashBuffer.isFull
提前進行檢測。 unstashAll()
將會停止Actor響應新的消息,直到當前暫存的所有消息被處理完畢,但這有可能因長時間占用消息處理線程而導致其他Actor陷入飢餓狀態。為此,可改用方法unstash(numberOfMessages)
,確保一次只處理有限數量的暫存消息。
Behavior是一台有限狀態機
有限狀態機:當前處於狀態S,發生E事件后,執行操作A,然后狀態將轉換為S’。
這部分內容對應傳統Akka的FSM:Finite State Machine,可參考RMP及下文
📎 參考示例:哲學家用餐問題,及其解析:🔗 Dining Hakkers
object Buncher {
// 把FSM里驅動狀態改變的事件,都用Message代替了
sealed trait Event
final case class SetTarget(ref: ActorRef[Batch]) extends Event
final case class Queue(obj: Any) extends Event
case object Flush extends Event
private case object Timeout extends Event
// 狀態
sealed trait Data
case object Uninitialized extends Data
final case class Todo(target: ActorRef[Batch], queue: immutable.Seq[Any]) extends Data
final case class Batch(obj: immutable.Seq[Any])
// 初始狀態為Uninitialized,對應初始的Behavior為idle()
def apply(): Behavior[Event] = idle(Uninitialized)
private def idle(data: Data): Behavior[Event] =
Behaviors.receiveMessage[Event] {
message: Event => (message, data) match {
case (SetTarget(ref), Uninitialized) =>
idle(Todo(ref, Vector.empty))
case (Queue(obj), t @ Todo(_, v)) =>
active(t.copy(queue = v :+ obj))
case _ =>
Behaviors.unhandled
}
}
// 處於激活狀態時,對應Behavior active()
private def active(data: Todo): Behavior[Event] =
Behaviors.withTimers[Event] { timers =>
// 設置超時條件
timers.startSingleTimer(Timeout, 1.second)
Behaviors.receiveMessagePartial {
case Flush | Timeout =>
data.target ! Batch(data.queue)
idle(data.copy(queue = Vector.empty))
case Queue(obj) =>
active(data.copy(queue = data.queue :+ obj))
}
}
}
在Akka Typed里,由於Protocol和Behavior的出現,簡化了傳統Akka中有限狀態機FSM的實現。不同的狀態下,對應不同的Behavior,響應不同的請求,成為Akka Typed的典型作法,這在此前的大量示例里已經有所展示。
協調關機 Coordinated Shutdown
CoordinatedShutdown是一個擴展,通過提前注冊好的任務Task,可以在系統關閉前完成一些清理掃尾工作,防止資源泄漏等問題產生。
關閉過程中,默認的各階段(Phase)都定義在下面這個akka.coordinated-shutdown.phases
里,各Task則后續再添加至相應的階段中。
在application.conf配置里,可以通過定義不同的depends-on來覆蓋缺省的設置。其中,before-service-unbind
、before-cluster-shutdown
和before-actor-system-terminate
是最常被覆蓋的。
各Phase原則上按照被依賴者先於依賴者的順序執行,從而構成一個有向無環圖(Directed Acyclic Graph,DAG),最終所有Phase按DAG的拓撲順序執行。
# CoordinatedShutdown is enabled by default and will run the tasks that
# are added to these phases by individual Akka modules and user logic.
#
# The phases are ordered as a DAG by defining the dependencies between the phases
# to make sure shutdown tasks are run in the right order.
#
# In general user tasks belong in the first few phases, but there may be use
# cases where you would want to hook in new phases or register tasks later in
# the DAG.
#
# Each phase is defined as a named config section with the
# following optional properties:
# - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted
# and depending phases will not be executed.
# - enabled=off: Skip all tasks registered in this phase. DO NOT use
# this to disable phases unless you are absolutely sure what the
# consequences are. Many of the built in tasks depend on other tasks
# having been executed in earlier phases and may break if those are disabled.
# depends-on=[]: Run the phase after the given phases
phases {
# The first pre-defined phase that applications can add tasks to.
# Note that more phases can be added in the application's
# configuration by overriding this phase with an additional
# depends-on.
before-service-unbind {
}
# Stop accepting new incoming connections.
# This is where you can register tasks that makes a server stop accepting new connections. Already
# established connections should be allowed to continue and complete if possible.
service-unbind {
depends-on = [before-service-unbind]
}
# Wait for requests that are in progress to be completed.
# This is where you register tasks that will wait for already established connections to complete, potentially
# also first telling them that it is time to close down.
service-requests-done {
depends-on = [service-unbind]
}
# Final shutdown of service endpoints.
# This is where you would add tasks that forcefully kill connections that are still around.
service-stop {
depends-on = [service-requests-done]
}
# Phase for custom application tasks that are to be run
# after service shutdown and before cluster shutdown.
before-cluster-shutdown {
depends-on = [service-stop]
}
# Graceful shutdown of the Cluster Sharding regions.
# This phase is not meant for users to add tasks to.
cluster-sharding-shutdown-region {
timeout = 10 s
depends-on = [before-cluster-shutdown]
}
# Emit the leave command for the node that is shutting down.
# This phase is not meant for users to add tasks to.
cluster-leave {
depends-on = [cluster-sharding-shutdown-region]
}
# Shutdown cluster singletons
# This is done as late as possible to allow the shard region shutdown triggered in
# the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.
# This phase is not meant for users to add tasks to.
cluster-exiting {
timeout = 10 s
depends-on = [cluster-leave]
}
# Wait until exiting has been completed
# This phase is not meant for users to add tasks to.
cluster-exiting-done {
depends-on = [cluster-exiting]
}
# Shutdown the cluster extension
# This phase is not meant for users to add tasks to.
cluster-shutdown {
depends-on = [cluster-exiting-done]
}
# Phase for custom application tasks that are to be run
# after cluster shutdown and before ActorSystem termination.
before-actor-system-terminate {
depends-on = [cluster-shutdown]
}
# Last phase. See terminate-actor-system and exit-jvm above.
# Don't add phases that depends on this phase because the
# dispatcher and scheduler of the ActorSystem have been shutdown.
# This phase is not meant for users to add tasks to.
actor-system-terminate {
timeout = 10 s
depends-on = [before-actor-system-terminate]
}
}
-
通常應在系統啟動后盡早注冊任務,否則添加得太晚的任務將不會被運行。
-
向同一個Phase添加的任務將並行執行,沒有先后之分。
-
下一個Phase會通常會等待上一個Phase里的Task都執行完畢或超時后才會啟動。可以為Phase配置
recover = off
,從而在Task失敗或超時后,中止整個系統的關機過程。 -
通常情況下,使用
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { ... }
向Phase中添加Task,此處的名稱主要用作調試或者日志。 -
使用
CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => Future { ... } }
添加可取消的Task,之后可以用c.cancel()取消Task的執行。 -
通常情況下,不需要Actor回復Task已完成的消息,因為這會拖慢關機進程,直接讓Actor終止運行即可。如果要關注該Task何時完成,可以使用
CoordinatedShutdown(system).addActorTerminationTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName", someActor, Some("stop"))
添加任務,並且給這個someActor發送一條消息,隨后watch該Actor的終止便可知曉Task完成情況。 -
使用
ActorSystem.terminate()
或val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
可以啟動協調關機過程,且多次調用也只會執行一次。 -
ActorSystem會在最后一個Phase里的Task全部執行完畢后關閉,但JVM不一定會停止,除非所有守護進程均已停止運行。通過配置
akka.coordinated-shutdown.exit-jvm = on
,可以強制一並關閉JVM。 -
在集群條件下,當節點正在從集群中離開或退出時,將會自動觸發協調關機。而且系統會自動添加Cluster Singleton和Cluster Sharding等正常退出群集的任務。
-
默認情況下,當通過殺死SIGTERM信號(Ctrl-C對SIGINT不起作用)終止JVM進程時,CoordinatedShutdown也將運行,該默認行為可以通過配置
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
禁用之。 -
可以使用
CoordinatedShutdown(system).addJvmShutdownHook { ... }
添加JVM Hook任務,以保證其在Akka關機前得以執行。 -
在測試時,如果不希望啟用協調關機,可以采用以下配置禁用之:
# Don't terminate ActorSystem via CoordinatedShutdown in tests akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.cluster.run-coordinated-shutdown-when-down = off
消息分發器 Dispatchers
MessageDispatcher是Akka的心臟,是它驅動着整個ActorSystem的正常運轉,並且為所有的Actor提供了執行上下文ExecutionContext,方便在其中執行代碼、進行Future回調等等。
-
默認Dispatcher
每個ActorSystem都有一個默認的Dispatcher,可以在
akka.actor.default-dispatcher
配置中細調,其默認的執行器Executor類型為 “fork-join-executor”,這在絕大多數情況下都能提供優越的性能,也可以在akka.actor.default-dispatcher.executor
一節中進行設置。 -
內部專用Dispatcher
為保護Akka各模塊內部維護的Actor,有一個獨立的內部專用Dispatcher。它可以在
akka.actor.internal-dispatcher
配置中細調,也可以設置akka.actor.internal-dispatcher為其他Dispatcher名字(別名)來替換之。 -
查找指定的Dispatcher
Dispatcher均實現了ExecutionContext接口,所以象這樣
val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))
就可加載不同的Dispatcher。 -
選擇指定的Dispatcher
// 為新的Actor使用默認Dispatcher context.spawn(yourBehavior, "DefaultDispatcher") context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default()) // 為不支持Future的阻塞調用(比如訪問一些老式的數據庫),使用blocking Dispatcher context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking()) // 使用和父Actor一樣的Dispatcher context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent()) // 從配置加載指定的Dispatcher context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
your-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 32 } throughput = 1 }
Dispatcher的兩種類型
對比 | Dispatcher | PinnedDispatcher |
---|---|---|
線程池 | 事件驅動,一組Actor共用一個線程池。 | 每個Actor都擁有專屬的一個線程池,池中只有一個線程。 |
可否被共享 | 沒有限制 | 不可共享 |
郵箱 | 每個Actor擁有一個 | 每個Actor擁有一個 |
適用場景 | 是Akka默認的Dispatcher, 支持隔板 | 支持隔板 |
驅動 | 由java.util.concurrent.ExecutorService 驅動。使用fork-join-executor、thread-pool-executor或基於akka.dispatcher.ExecutorServiceConfigurator實現的完全限定類名,可指定其使用的executor。 |
由任意的akka.dispatch.ThreadPoolExecutorConfigurator 驅動,默認執行器為thread-pool-executor 。 |
一個Fork-Join執行器示例:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
自定義Dispatcher以盡可能避免阻塞
📎 講解阻塞危害的參考視頻:
Managing Blocking in Akka video,及其示例代碼:https://github.com/raboof/akka-blocking-dispatcher
在使用默認Dispatcher的情況下,多個Actor共用一個線程池,所以當其中一些Actor因被阻塞而占用線程后,有可能導致可用線程耗盡,而使其他同組的Actor陷入線程飢餓狀態。
監測工具推薦:YourKit,VisualVM,Java Mission Control,Lightbend出品的Thread Starvation Detector等等。
示例使用了兩個Actor作對比,在(1 to 100)的循環里,新建的一個Actor在消息處理函數中sleep 5秒,導致同時新建的另一個Actor無法獲得線程處理消息而卡住。
針對上述情況,首先可能想到的象下面這樣,用Future來封裝這樣的長時調用,但這樣的想法實際上過於簡單。因為仍舊使用了由全體Actor共用的ExecutionContext作為Future的執行上下文,所以隨着應用程序的負載不斷增加,內存和線程都會飛快地被耗光。
object BlockingFutureActor {
def apply(): Behavior[Int] =
Behaviors.setup { context =>
implicit val executionContext: ExecutionContext = context.executionContext
Behaviors.receiveMessage { i =>
triggerFutureBlockingOperation(i)
Behaviors.same
}
}
def triggerFutureBlockingOperation(i: Int)(implicit ec: ExecutionContext): Future[Unit] = {
println(s"Calling blocking Future: $i")
Future {
Thread.sleep(5000) //block for 5 seconds
println(s"Blocking future finished $i")
}
}
}
正確的解決方案,是為所有的阻塞調用提供一個獨立的Dispatcher,這種技巧被稱作“隔板 bulk-heading”或者“隔離阻塞 isolating blocking”。
在application.conf里對Dispatcher進行如下配置,其中thread-pool-executor.fixed-pool-size
的數值可根據實際負載情況進行微調:
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
隨后,使用該配置替換掉前述代碼第4行加載的默認Dispatcher
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-blocking-dispatcher"))
以上便是處理響應性應用程序中阻塞問題的推薦方法。對有關Akka HTTP中阻塞調用的類似討論,請參閱🔗 Handling blocking operations in Akka HTTP。
其他一些建議:
- 在Future中進行阻塞調用,但務必要確保任意時刻此類調用的數量上限,否則大量的此類任務將耗盡您的內存或線程。
- 在Future中進行阻塞調用,為線程池提供一個線程數上限,該上限要匹配運行應用程序的硬件平台條件。
- 專門使用一個線程來管理一組阻塞資源,例如用一個NIO選擇器來管理多個通道,並在阻塞資源觸發特定事件時作為Actor消息進行分發調度。
- 使用路由器來管理進行阻塞調用的Actor,並確保相應配置足夠大小的線程池。這種方案特別適用於訪問傳統數據庫這樣的單線程資源,使每個Actor對應一個數據庫連接,由一個路由器進行集中管理。至於Actor的數量,則由數據庫部署平台的硬件條件來決定。
- 使用Akka的任務Task在application.conf中配置線程池,它,再通過ActorSystem進行實例化。
其他一些常見的Dispatcher配置
-
固定的線程池大小
blocking-io-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 32 } throughput = 1 }
-
根據CPU核心數設置線程池大小
my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
-
PinnedDispatcher
my-pinned-dispatcher { executor = "thread-pool-executor" type = PinnedDispatcher }
由於Actor每次獲得的不一定都是同一個線程,所以當確有必要時,可以設置
thread-pool-executor.allow-core-timeout=off
,以確保始終使用同一線程。 -
設置線程關閉超時
無論是fork-join-executor還是thread-pool-executor,線程都將在無人使用時被關閉。如果想設置一個稍長點的時間,可進行如下調整。特別是當該Executor只是作為執行上下文使用(比如只進行Future調用),而沒有關聯Actor時更應如此,否則默認的1秒將會導致整個線程池過度頻繁地被關閉。my-dispatcher-with-timeouts { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 16 # Keep alive time for threads keep-alive-time = 60s # Allow core threads to time out allow-core-timeout = off } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 60s }
郵箱 Mailbox
郵箱是Actor接收待處理消息的隊列,默認是沒有容量上限的。但當Actor的處理消息的速度低於消息送達的速度時,就有必要設置郵箱的容量上限了,這樣當有更多消息到達時,將被轉投至系統的DeadLetter。
選擇特定的郵箱
如果沒有特別指定,將使用默認的郵箱SingleConsumerOnlyUnboundedMailbox
。否則在context.spawn時指定,且配置可從配置文件中動態加載。
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100))
val props = MailboxSelector.fromConfig("my-app.my-special-mailbox")
context.spawn(childBehavior, "from-config-mailbox-child", props)
my-app {
my-special-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
}
Akka提供的郵箱
-
非阻塞類型的郵箱
郵箱 內部實現 有否上限 配置名稱 SingleConsumerOnlyUnboundedMailbox(默認) 一個多生產者-單消費者隊列,不能與BalancingDispatcher搭配 否 akka.dispatch.SingleConsumerOnlyUnboundedMailbox UnboundedMailbox 一個java.util.concurrent.ConcurrentLinkedQueue 否 unbounded 或 akka.dispatch.UnboundedMailbox NonBlockingBoundedMailbox 一個高效的多生產者-單消費者隊列 是 akka.dispatch.NonBlockingBoundedMailbox UnboundedControlAwareMailbox
akka.dispatch.ControlMessage派生的控制消息將被優先投遞兩個java.util.concurrent.ConcurrentLinkedQueue 否 akka.dispatch.UnboundedControlAwareMailbox UnboundedPriorityMailbox
不保證同優先級消息的投遞順序一個java.util.concurrent.PriorityBlockingQueue 否 akka.dispatch.UnboundedPriorityMailbox UnboundedStablePriorityMailbox
嚴格按FIFO順序投遞同優先級消息一個使用akka.util.PriorityQueueStabilizer包裝的java.util.concurrent.PriorityBlockingQueue 否 akka.dispatch.UnboundedStablePriorityMailbox -
阻塞類型的郵箱:若mailbox-push-timeout-time設置為非零時將阻塞,否則不阻塞
郵箱 內部實現 有否上限 配置名稱 BoundedMailbox 一個java.util.concurrent.LinkedBlockingQueue 是 bounded 或 akka.dispatch.BoundedMailbox BoundedPriorityMailbox
不保證同優先級消息的投遞順序一個使用akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue 是 akka.dispatch.BoundedPriorityMailbox BoundedStablePriorityMailbox
嚴格按FIFO順序投遞同優先級消息一個使用akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue 是 akka.dispatch.BoundedStablePriorityMailbox BoundedControlAwareMailbox
akka.dispatch.ControlMessage派生的控制消息將被優先投遞兩個java.util.concurrent.ConcurrentLinkedQueue,且當塞滿時將阻塞 是 akka.dispatch.BoundedControlAwareMailbox
自定義郵箱
如果要自己實現郵箱,則需要從MailboxType派生。該類的構造函數有2個重要參數:一個是ActorSystem.Settings對象,一個是Config的節。后者需要在Dispatcher或者Mailbox的配置中,修改mailbox-type
為自定義MailboxType的完全限定名。
💀 標記用trait的需求映射指的是什么?是必須的嗎?
// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics
object MyUnboundedMailbox {
// This is the MessageQueue implementation
class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {
private final val queue = new ConcurrentLinkedQueue[Envelope]()
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
queue.offer(handle)
def dequeue(): Envelope = queue.poll()
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
}
// This is the Mailbox implementation
class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = {
// put your initialization code here
this()
}
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MyMessageQueue()
}
測試
🏭
com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5
org.scalatest:scalatest_2.13:3.1.1
測試可以是在真實的ActorSystem上進行的異步測試,也可以是在BehaviorTestKit工具提供的測試專用線程上進行的同步測試。
異步測試
-
ScalaTest提供了ActorTestKit作為真實ActorSystem的替代品,通過混入
BeforeAndAfterAll
,覆寫其afterAll() = testKit.shutdownTestKit()
,可實現測試后關閉ActorSystem。通過使用一個固定的testKit實例,可以直接spawn/stop某個Actor(可以是匿名的Actor),並以這種方式創建臨時的Mock Actor,用以測試某個Actor的行為是否符合預期。
-
同時,ScalaTest提供TestProbe用於接受Actor的回復,並附上一組probe.expectXXX對Actor的活動進行斷言。
-
當然,更簡便的方式便是繼承ScalaTestWithActorTestKit並混入AnyFeatureSpecLike之類的trait,從而將注意力完全集中在測試用例本身,而不用關心ActorSystem如何關閉之類的細枝末節。
-
ScalaTest的配置從application-test.conf中加載,否則將會自動加載Akka庫自帶的reference.conf配置,而不是應用程序自定義的application.conf。同時,ScalaTest支持用ConfigFactory.load()加載自定義配置文件,或用parseString()直接解決配置字符串,若再附以withFallback()將實現一次性完成配置及其后備的加載。
ConfigFactory.parseString(""" akka.loglevel = DEBUG akka.log-config-on-start = on """).withFallback(ConfigFactory.load())
-
為測試與時間線關系密切的Actor活動,ScalaTest提供了手動的定時器ManualTime,可以象下面這樣測試指定時間點的活動:
class ManualTimerExampleSpec extends ScalaTestWithActorTestKit(ManualTime.config) with AnyWordSpecLike with LogCapturing { val manualTime: ManualTime = ManualTime() "A timer" must { "schedule non-repeated ticks" in { case object Tick case object Tock val probe = TestProbe[Tock.type]() val behavior = Behaviors.withTimers[Tick.type] { timer => // 10ms后才會調度消息 timer.startSingleTimer(Tick, 10.millis) Behaviors.receiveMessage { _ => probe.ref ! Tock Behaviors.same } } spawn(behavior) // 在9ms時還沒有任何消息 manualTime.expectNoMessageFor(9.millis, probe) // 再經過2ms后,收到Tock消息 manualTime.timePasses(2.millis) probe.expectMessage(Tock) // 在10ms之后再沒有消息傳來 manualTime.expectNoMessageFor(10.seconds, probe) } } }
-
為了驗證Actor是否發出了某些日志事件,ScalaTest提供了LoggingTestKit。
LoggingTestKit .error[IllegalArgumentException] .withMessageRegex(".*was rejected.*expecting ascii input.*") .withCustom { event => event.marker match { case Some(m) => m.getName == "validation" case None => false } } .withOccurrences(2) .expect { ref ! Message("hellö") ref ! Message("hejdå") }
-
為了集中有序輸出日志信息,ScalaTest提供了LogCapturing,把日志和控制台輸出信息整理在一起,在測試失敗的時候才一次性輸出,方便分析錯誤原因。具體示例參見交互模式一章。
同步測試
-
ScalaTest提供BehaviorTestKit用於Actor的同步測試。
val testKit = BehaviorTestKit(Hello()) // 創建子Actor testKit.run(Hello.CreateChild("child")) testKit.expectEffect(Spawned(childActor, "child")) // 創建匿名的子Actor testKit.run(Hello.CreateAnonymousChild) testKit.expectEffect(SpawnedAnonymous(childActor)) // 用一個InBox模擬Mailbox,方便測試收到的消息 val inbox = TestInbox[String]() testKit.run(Hello.SayHello(inbox.ref)) inbox.expectMessage("hello") // 測試子Actor的InBox testKit.run(Hello.SayHelloToChild("child")) val childInbox = testKit.childInbox[String]("child") childInbox.expectMessage("hello") // 測試匿名子Actor的InBox testKit.run(Hello.SayHelloToAnonymousChild) val child = testKit.expectEffectType[SpawnedAnonymous[String]] val childInbox = testKit.childInbox(child.ref) childInbox.expectMessage("hello stranger")
-
在以下一些情況下,不推薦使用BehaviorTestKit(未來可能會逐步改善):
- 涉及Future及類似的帶異步回調的場景
- 涉及定時器或消息定時調度的場景
- 涉及EventSourcedBehavior的場景
- 涉及必須實測的Stubbed Actor的場景
- 黑盒測試
-
除了Spawned和SpawnedAnonymous,BehaviorTestKit還支持以下一些Effect:
- SpawnedAdapter
- Stopped
- Watched
- WatchedWith
- Unwatched
- Scheduled
-
BehaviorTestKit也支持日志驗證
val testKit = BehaviorTestKit(Hello()) val inbox = TestInbox[String]("Inboxer") testKit.run(Hello.LogAndSayHello(inbox.ref)) testKit.logEntries() shouldBe Seq(CapturedLogEvent(Level.INFO, "Saying hello to Inboxer"))
Akka之Classic與Typed共存
現階段的Akka Typed的內部,實質還是由傳統Akka實現的,但未來將會有所改變。目前兩類Akka有以下一些共存的方式:
- Classic ActorSystem可以創建Typed Actor
- Typed Actor與Classic Actor可以互發消息
- Typed Actor與Classic Actor可以相互建立監管或觀察關系
- Classic Actor可以轉換為Typed Actor
在導入命名空間時使用別名,以示區別:
import akka.{ actor => classic }
⚠️ 在監管策略方面,由於Classic默認為重啟,而Typed為停止,所以Akka根據Child來決定實際策略。即如果被創建的Child是Classic,則默認采取重啟策略,否則采取停止策略。
⭕ 從Classic到Typed
// 導入Typed的Adapter幾乎必不可少
import akka.actor.typed.scaladsl.adapter._
val system = akka.actor.ActorSystem("ClassicToTypedSystem")
val typedSystem: ActorSystem[Nothing] = system.toTyped
val classicActor = system.actorOf(Classic.props())
class Classic extends classic.Actor with ActorLogging {
// context.spawn is an implicit extension method
val second: ActorRef[Typed.Command] = context.spawn(Typed(), "second")
// context.watch is an implicit extension method
context.watch(second)
// self can be used as the `replyTo` parameter here because
// there is an implicit conversion from akka.actor.ActorRef to
// akka.actor.typed.ActorRef
// An equal alternative would be `self.toTyped`
second ! Typed.Ping(self)
override def receive = {
case Typed.Pong =>
log.info(s"$self got Pong from ${sender()}")
// context.stop is an implicit extension method
context.stop(second)
case classic.Terminated(ref) =>
log.info(s"$self observed termination of $ref")
context.stop(self)
}
}
⭕ 從Typed到Classic
val system = classic.ActorSystem("TypedWatchingClassic")
val typed = system.spawn(Typed.behavior, "Typed")
object Typed {
final case class Ping(replyTo: akka.actor.typed.ActorRef[Pong.type])
sealed trait Command
case object Pong extends Command
val behavior: Behavior[Command] =
Behaviors.setup { context =>
// context.actorOf is an implicit extension method
val classic = context.actorOf(Classic.props(), "second")
// context.watch is an implicit extension method
context.watch(classic)
// illustrating how to pass sender, toClassic is an implicit extension method
classic.tell(Typed.Ping(context.self), context.self.toClassic)
Behaviors
.receivePartial[Command] {
case (context, Pong) =>
// it's not possible to get the sender, that must be sent in message
// context.stop is an implicit extension method
context.stop(classic)
Behaviors.same
}
.receiveSignal {
case (_, akka.actor.typed.Terminated(_)) =>
Behaviors.stopped
}
}
}
函數式與面向對象風格指南
區別 | 函數式編程風格 | 面向對象風格 |
---|---|---|
組成結構 | Singleton Object | Companion Object + AbstractBehavior[Message]派生類 |
工廠apply() | 在工廠方法里完成Behavior定義及其他所有工作 | 在Companion Object工廠方法里采取Behaviors.setup {context => new MyActor(context)} 這樣的方式構造初始化的Behavior,然后把context和其他必要參數注入給類的構造函數,完成Behavior的鏈接 |
Actor擴展類 | 沒有派生,所以只能用Behaviors.same | 從AbstractBehavior[Message]派生實例,所以可以使用this等同於Behaviors.same |
Behavior | 在Singleton Object里給Behaviors.receive這樣的工廠方法傳入一個函數(閉包)進行定義 | 覆寫派生類的onMessage函數 |
Context | Context與Message一起傳入給receive | 依賴Behaviors.setup等工廠方法傳遞給派生類,因此每實例對應一個context |
狀態 | 給工廠方法傳入參數(通常會把包括context在內的所有參數封裝成一個類似DTO的Class以適當解耦),返回帶新狀態的Behavior | 在AbstractBehavior實例對象的內部維護所有的可變狀態 |
推薦理由 |
|
|
推薦做法:
- 不要把消息定義為頂層Class,而應與Behavior一起定義在Companion Object里,這樣在使用時帶着對象名作為前綴才不會引起歧義。
- 如果某Protocol由幾個Actor共享,那么建議是在一個單獨的Object里定義完整的Protocol。
- 諸如定時器調度消息或者再包裝后的消息,通常作為Actor的私有消息用private修飾,但它們同樣要從trait Command派生。
- 另一種定義私有消息的方法,是所有消息均派生自trait Message,然后插入一個派生自Message的中間trait PrivateMessage,之后再從PrivateMessage派生所有的私有消息,使用這樣的層次結構區分公有和私有消息。
- 通常頂層的Message要定義為sealed,以避免case匹配時編譯器提示匹配項不完整的錯誤。
- 使用AskPattern從ActorSystem外部與Actor直接進行Request-Response方式的交互時,建議使用AskPattern.ask()而不是?的中綴語法,這樣可以最大程度保證類型安全(在Actor之間的屬於ActorContext.ask())。
- Behaviors.setup可以嵌套,用以加載不同類型的資源。習慣上也把setup放在最外層,不過要注意supervise對setup的影響。
從傳統Akka過渡
項目依賴的變化
Classic | Typed |
---|---|
akka-actor | akka-actor-typed |
akka-cluster | akka-cluster-typed |
akka-cluster-sharding | akka-cluster-sharding-typed |
akka-cluster-tools | akka-cluster-typed |
akka-distributed-data | akka-cluster-typed |
akka-persistence | akka-persistence-typed |
akka-stream | akka-stream-typed |
akka-testkit | akka-actor-testkit-typed |
import package的變化
Classic | Typed for Scala |
---|---|
akka.actor | akka.actor.typed.scaladsl |
akka.cluster | akka.cluster.typed |
akka.cluster.sharding | akka.cluster.sharding.typed.scaladsl |
akka.persistence | akka.persistence.typed.scaladsl |
➡️ Cluster
🏭 com.typesafe.akka:akka-cluster-typed_2.13:2.6.5
Member狀態圖
消息妥投 Reliable Delivery
import akka.actor.typed.delivery._
⚠️ 此模塊目前仍不成熟,不建議在生產環境使用。
確保消息至少投遞一次或恰好投遞一次,是此模塊的核心任務,但Akka框架沒法自主實現,因為確認收到消息並且處理之,是屬於業務邏輯的職責,所以必須在應用程序的配合下才能完全實現。而且,將消息妥投到目標郵箱還只是其中一個步驟(不丟失消息),確保目標Actor在消息到達前尚未崩潰(消息能被處理)也是其中重要的一環。
一個完整的消息妥投方案,包括發送消息、檢測丟包、重發消息、防止過載、冪等處理等細節,這些工作絕大部分要由消費消息的一方來承擔。比如消息重發,就要由消費者發現有丟包,然后向生產者提出,限流等其他一些工作亦是如此。Akka提供了以下三種模式(留意關於消息重發的細節):
⭕ 點對點模式 Point to Point
點對點模式適用於2個單一Actor之間的消息妥投。
- P:我准備舀了。
- C:我坐好了。
- P:舀好了,張嘴!
- C:我吃完了,再來一口!
- 運行時將檢查並確保Producer與ProducerController都必須是本地Actor,以保證高效率,Consumer一側亦如此。
- 由應用程序負責使用ProducerController.RegisterConsumer或ConsumerController.RegisterToProducerController消息,建立並維護兩個Controller之間的連接暢通。
- 在前一條消息被處理完並Confirmed之前,ConsumerController不會把下一條消息Delivery發給Consumer。
- 在兩個Controller之間的消息數量將由一個ConsumerController負責的流控制窗口(flow control window)進行管理。
- 無論是ProducerController亦或ConsumerController崩潰,所有未被Confirmed的消息都會被重新投遞(即使事實上Consumer已經處理過的消息),以確保至少投遞一次,否則消息將嚴格按Producer發出的順序投遞給Consumer。
⭕ 拉取模式 Worker Pulling
Worker Pulling,是若干個Worker根據自己的消費進度,主動從一個WorkManager處拉取任務的模式。
- P:我這有一堆活需要找人干。
- M:沒問題,我找人來做。
- W(C):我來應聘。
- M:你被錄用了!
- W1:給我點活干。
- W2:也給我點活干。
- M:這是今天的活,你們自己分吧!
- W1:我搶到了3份!
- W2:我搶到了4份!”
有新Worker加入時
- 由Receptionist負責登記所有的Worker,由WorkPullingProducerController負責從Receptionist的Listing里指定執行任務的Worker。
- 在WorkPullingProducerController與Worker之間建立聯系后,仍由ProducerController與ConsumerController負責具體的一對一投遞。
⭕ 分片模式 Sharding
🏭 com.typesafe.akka:akka-cluster-sharding-typed_2.13:2.6.5
Sharding,是在集群進行了分片后的消息妥投模式,將由Producer與Consumer兩端的ShardingController負責總協調,由ShardingController各自的小弟Controller負責點個端點的通信。
- P:喂喂,SPC,我有一批特定款式的鞋需要找工廠代工。
- SPC:好的,我在全世界找代工廠。
- SCC1:作為一家中國的鞋類加工連鎖企業,我OK。
- SCC2:我是加工襯衣的,Sorry。
- SPC:SCC1就你了,我的小弟PC稍后會直接和你聯系。
- PC:SCC1,訂單發給你了。
- SCC1:PC,我的小弟CC負責這批訂單,你們2個實際干活的直接聯系吧。
- CC:OK,我交給流水線C專門生產這款鞋。
- C:我這條線生產完了,貨交給你了CC。
- CC:PC,我按訂單交付地址把貨直接發給你了。
- PC:SPC,貨備妥了。
- SPC:P老板,貨備妥了你在哪?
- P:送過來吧。
發送消息到另一個Entity
從另一個節點上的Producer發送消息(圖中WorkPullingProducerController有誤,應為ShardingProducerController)
- 發送與接收方的任一端,均由本體(Producer或Consumer),Controller和ShardingController三個部件構成。其中,ShardingProducerController與ShardingConsumerController搭配,負責為ProducerController與ConsumerController牽線搭橋,但2個ShardingController之間不需要相互注冊,而是通過EntityId找到對方。
- 建立聯系通道后,消息從ShardingProducerController發出,經ProducerController發往ShardingConsumerController,由ShardingConsumerController找到相應的ConsumerController,將消發給最終的Consumer。Consumer在處理完消息后,直接回復給ConsumerController,再經其發還給ProducerController,最終由ShardingProducerController回復Producer。
- 消息RequestNext.entitiesWithDemand屬性將指向Consumer端若干同EntityId的Actor,所以這可以是一對多的關系。
⭕ 耐久的Producer
🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
需要Producer支持消息重發,就意味着Producer得把發出去的消息保存一段時間,直到確信該消息已被處理后才刪除之,所以能暫存消息的即為耐用的Producer。Akka為此提供了一個DurableProducerQueue的具體實現EventSourcedProducerQueue。其中,每個Producer必須對應一個唯一的PersistenceId。
import akka.persistence.typed.delivery.EventSourcedProducerQueue
import akka.persistence.typed.PersistenceId
val durableQueue =
EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager"))
val durableProducerController = context.spawn(
WorkPullingProducerController(
producerId = "workManager",
workerServiceKey = ImageConverter.serviceKey,
durableQueueBehavior = Some(durableQueue)),
"producerController")
⭕ 改用Ask模式
除了tell模式,Producer還可以改用ask模式發出消息,此時用askNext代替requestNext,回復將被包裝在MessageWithConfirmation里。
context.ask[MessageWithConfirmation[ImageConverter.ConversionJob], Done](
next.askNextTo,
askReplyTo => MessageWithConfirmation(ImageConverter.ConversionJob(resultId, from, to, image), askReplyTo)) {
case Success(done) => AskReply(resultId, originalReplyTo, timeout = false)
case Failure(_) => AskReply(resultId, originalReplyTo, timeout = true)
}
序列化 Serialization
對同處一個JVM上的不同Actor,消息將直接發送給對方,而對於跨JVM的消息,則需要序列化成一串二進制字節后傳出,再反序列化恢復成消息對象后接收。Akka推薦使用Jackson和Google Protocol Buffers,且使用后者用於其內部消息的序列化,但也允許使用自定義的序列化器。
使用序列化器
以配置方式使用序列化器
序列化的相關配置都保存在akka.actor.serializers
一節,其中指向各種akka.serialization.Serializer
的實現,並使用serialization-bindings
為特定對象實例時綁定序列化器。由於對象可能同時繼承了某個trait或者class,所以在判斷應使用哪一個序列化器時,通常是找其最特化的那一個。若二者之間沒有繼承關系,則會觸發警告。
akka {
actor {
serializers {
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
myown = "docs.serialization.MyOwnSerializer"
}
serialization-bindings {
"docs.serialization.JsonSerializable" = jackson-json
"docs.serialization.CborSerializable" = jackson-cbor
"com.google.protobuf.Message" = proto
"docs.serialization.MyOwnSerializable" = myown
}
}
}
⚠️ 如果待序列化的消息包含在Scala對象中,則為了引用這些消息,需要使用標准Java類名稱。對於包含在名為Wrapper對象中名為Message的消息,正確的引用是Wrapper $ Message
,而不是Wrapper.Message
。
以編程方式使用序列化器
完整的序列化信息包括三個部分:二進制字節串形式的有效載荷payload,序列化器的SerializerId及其適用類的清單manifest,所以它是自描述的,得以跨JVM使用。
而在啟動ActorSystem時,序列化器由SerializationExtension負責初始化,因此序列化器本身不能從其構造函數訪問SerializationExtension,而只能在完成初始化之后遲一點才能訪問它。
import akka.actor._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.Cluster
import akka.serialization._
val system = ActorSystem("example")
// Get the Serialization Extension
val serialization = SerializationExtension(system)
// Have something to serialize
val original = "woohoo"
// Turn it into bytes, and retrieve the serializerId and manifest, which are needed for deserialization
val bytes = serialization.serialize(original).get
val serializerId = serialization.findSerializerFor(original).identifier
val manifest = Serializers.manifestFor(serialization.findSerializerFor(original), original)
// Turn it back into an object
val back = serialization.deserialize(bytes, serializerId, manifest).get
自定義序列化器
創建序列化器
所有的序列化器均派生自akka.serialization.Serializer。
class MyOwnSerializer extends Serializer {
// If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
// class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
// Get a logger using:
// private val logger = Logging(actorSystem, this)
// This is whether "fromBinary" requires a "clazz" or not
def includeManifest: Boolean = true
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 40 is reserved by Akka itself
def identifier = 1234567
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the code that serializes the object here
//#...
Array[Byte]()
//#...
}
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
//#...
null
//#...
}
}
SerializerId必須是全局唯一的,該Id可以編碼指定,也可以在配置中指定:
akka {
actor {
serialization-identifiers {
"docs.serialization.MyOwnSerializer" = 1234567
}
}
}
使用StringManifest指定適用類
默認情況下,序列化器使用Class指定其適用目標,但也可以使用字符串名稱指定,具體參見fromBinary的第2個參數:
class MyOwnSerializer2 extends SerializerWithStringManifest {
val CustomerManifest = "customer"
val UserManifest = "user"
val UTF_8 = StandardCharsets.UTF_8.name()
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 40 is reserved by Akka itself
def identifier = 1234567
// The manifest (type hint) that will be provided in the fromBinary method
// Use `""` if manifest is not needed.
def manifest(obj: AnyRef): String =
obj match {
case _: Customer => CustomerManifest
case _: User => UserManifest
}
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the real code that serializes the object here
obj match {
case Customer(name) => name.getBytes(UTF_8)
case User(name) => name.getBytes(UTF_8)
}
}
// "fromBinary" deserializes the given array,
// using the type hint
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
// Put the real code that deserializes here
manifest match {
case CustomerManifest =>
Customer(new String(bytes, UTF_8))
case UserManifest =>
User(new String(bytes, UTF_8))
}
}
}
序列化ActorRef
ActorRef均可以使用Jackson進行序列化,但也可以自定義實現。
其中,要以字符串形式表示ActorRef,應借助ActorRefResolver實現。它主要有2個方法,分別對應序列化和反序列化:
- def toSerializationFormat[T](ref: ActorRef[T]): String
- def resolveActorRef[T](serializedActorRef: String): ActorRef[T]
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
private val actorRefResolver = ActorRefResolver(system.toTyped)
private val PingManifest = "a"
private val PongManifest = "b"
override def identifier = 41
override def manifest(msg: AnyRef) = msg match {
case _: PingService.Ping => PingManifest
case PingService.Pong => PongManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
}
override def toBinary(msg: AnyRef) = msg match {
case PingService.Ping(who) =>
actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
case PingService.Pong =>
Array.emptyByteArray
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
manifest match {
case PingManifest =>
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = actorRefResolver.resolveActorRef[PingService.Pong.type](str)
PingService.Ping(ref)
case PongManifest =>
PingService.Pong
case _ =>
throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
}
}
}
滾動升級 Rolling Updates
一個消息被反序列為消息對象,其決定因素只有3個:payload、serializerId和manifest。Akka根據Id選擇Serializer,然后Serializer根據manifest匹配fromBinary,最后fromBinary使用payload解析出消息對象。在這個過程中,起關鍵作用的manifest並不等價於Serializer綁定的消息類型,所以一個Serializer可以應用於多個消息類型,這就給換用新的序列化器提供了機會。主要步驟包括兩步:
- 第一步:暫時只向akka.actor.serializers配置節中添加Serializer的定義,而不添加到akka.actor.serialization-bindings配置節中,然后執行一次滾動升級。這相當於注冊Serializer,為切換到新的Serializer作准備。
- 第二步:向akka.actor.serialization-bindings配置節中添加新的Serializer,然后再執行一次滾動升級。此時,舊的節點將繼續使用舊的Serializer序列化消息,而新節點將切換使用新的Serializer進行序列化,並且它也可以反序列化舊的序列化格式。
- 第三步(可選):完全刪除舊的Serializer,因為新的Serializer已經能同時承擔新舊兩種版本的序列化格式。
校驗
為了在本地測試時確認消息被正常地序列化與反序列化,可以采取如下配置啟用本地消息的序列化。如果要將某個消息排除出此列,則需要繼承trait akka.actor.NoSerializationVerificationNeeded
,或者在配置akka.actor.no-serialization-verification-needed-class-prefix
指定類名的前綴。
akka {
actor {
# 啟用本地消息序列化
serialize-messages = on
# 啟用Prop序列化
serialize-creators = on
}
}
使用Jackson進行序列化
🏭 com.typesafe.akka:akka-serialization-jackson_2.12:2.6.6
Jackson支持文本形式的JSON(jackson-json)和二進制形式的CBOR字節串(jackson-cbor)。
使用前准備
在使用Jackson進行序列化前,需要在Akka配置里加入序列化器聲明和綁定聲明,此處用的JSON格式。
akka.actor {
serialization-bindings {
"com.myservice.MySerializable" = jackson-json
}
}
而所有要用Jackson序列化的消息也得擴展其trait以作標識。
// 約定的名稱是CborSerializable或者JsonSerializable,此處用MySerializable是為了演示
trait MySerializable
final case class Message(name: String, nr: Int) extends MySerializable
安全要求
出於安全考慮,不能將Jackson序列化器應用到諸如java.lang.Object、java.io.Serializable、java.util.Comparable等開放類型。
注解 Annotations
適用於普通的多態類型 Polymorphic types
多態類型是指可能有多種不同實現的類型,這就導致在反序列化時將面對多種可能的子類型。所以在使用Jackson序列化前,需要用JsonTypeInfo和JsonSubTypes進行注解說明。
- @JsonTypeInfo用來開啟多態類型處理,它有以下幾個屬性:
- use:定義使用哪一種類型識別碼,其可選值包括:
- JsonTypeInfo.Id.CLASS:使用完全限定類名做識別
- JsonTypeInfo.Id.MINIMAL_CLASS:若基類和子類在同一包類,使用類名(忽略包名)作為識別碼
- JsonTypeInfo.Id.NAME:一個合乎邏輯的指定名稱
- JsonTypeInfo.Id.CUSTOM:自定義識別碼,與@JsonTypeIdResolver相對應
- JsonTypeInfo.Id.NONE:不使用識別碼
- include(可選):指定識別碼是如何被包含進去的,其可選值包括:
- JsonTypeInfo.As.PROPERTY:作為數據的兄弟屬性
- JsonTypeInfo.As.EXISTING_PROPERTY:作為POJO中已經存在的屬性
- JsonTypeInfo.As.EXTERNAL_PROPERTY:作為擴展屬性
- JsonTypeInfo.As.WRAPPER_OBJECT:作為一個包裝的對象
- JsonTypeInfo.As.WRAPPER_ARRAY:作為一個包裝的數組
- property(可選):制定識別碼的屬性名稱。此屬性只有當use為JsonTypeInfo.Id.CLASS(若不指定property則默認為@class)、JsonTypeInfo.Id.MINIMAL_CLASS(若不指定property則默認為@c)、JsonTypeInfo.Id.NAME(若不指定property默認為@type),include為JsonTypeInfo.As.PROPERTY、JsonTypeInfo.As.EXISTING_PROPERTY、JsonTypeInfo.As.EXTERNAL_PROPERTY時才有效。
- defaultImpl(可選):如果類型識別碼不存在或者無效,可以使用該屬性來制定反序列化時使用的默認類型。
- visible(可選):是否可見。該屬性定義了類型標識符的值是否會通過JSON流成為反序列化器的一部分,默認為false,即jackson會從JSON內容中處理和刪除類型標識符,再傳遞給JsonDeserializer。
- use:定義使用哪一種類型識別碼,其可選值包括:
- @JsonSubTypes用來列出給定類的子類,只有當子類類型無法被檢測到時才會使用它,一般是配合@JsonTypeInfo在基類上使用。它的的值是一個@JsonSubTypes.Type[]數組,里面枚舉了多態類型(value對應子類)和類型的標識符值(name對應@JsonTypeInfo中的property標識名稱的值。此為可選值,若未指定則需由@JsonTypeName在子類上指定)。
- @JsonTypeName作用於子類,用來為多態子類指定類型標識符的值。
⚠️ 切記不能使用@JsonTypeInfo(use = Id.CLASS)
或ObjectMapper.enableDefaultTyping
,這會給多態類型帶來安全隱患。
final case class Zoo(primaryAttraction: Animal) extends MySerializable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
sealed trait Animal
final case class Lion(name: String) extends Animal
final case class Elephant(name: String, age: Int) extends Animal
適用於trait和case object創建的ADT
由於上述注解只能用於class,所以case class可以直接使用,但case object就需要采取變通的方法,通過在case object繼承的trait上使用注解@JsonSerialize和@JsonDeserialize,再使用StdSerializer和StdDeserializer實現序列化操作即可。
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.ser.std.StdSerializer
@JsonSerialize(using = classOf[DirectionJsonSerializer])
@JsonDeserialize(using = classOf[DirectionJsonDeserializer])
sealed trait Direction
object Direction {
case object North extends Direction
case object East extends Direction
case object South extends Direction
case object West extends Direction
}
class DirectionJsonSerializer extends StdSerializer[Direction](classOf[Direction]) {
import Direction._
override def serialize(value: Direction, gen: JsonGenerator, provider: SerializerProvider): Unit = {
val strValue = value match {
case North => "N"
case East => "E"
case South => "S"
case West => "W"
}
gen.writeString(strValue)
}
}
class DirectionJsonDeserializer extends StdDeserializer[Direction](classOf[Direction]) {
import Direction._
override def deserialize(p: JsonParser, ctxt: DeserializationContext): Direction = {
p.getText match {
case "N" => North
case "E" => East
case "S" => South
case "W" => West
}
}
}
final case class Compass(currentDirection: Direction) extends MySerializable
適用於枚舉 Enumerations
Jackson默認會將Scala的枚舉類型中的Value序列化為一個JsonObject,該JsonObject包含一個“value”字段和一個“type”字段(其值是枚舉的完全限定類名FQCN)。為此,Jackson為每個字段提供了一個注解JsonScalaEnumeration,用於設定字段的類型,它將會把枚舉值序列化為JsonString。
trait TestMessage
object Planet extends Enumeration {
type Planet = Value
val Mercury, Venus, Earth, Mars, Krypton = Value
}
// Uses default Jackson serialization format for Scala Enumerations
final case class Alien(name: String, planet: Planet.Planet) extends TestMessage
// Serializes planet values as a JsonString
class PlanetType extends TypeReference[Planet.type] {}
// Specifies the type of planet with @JsonScalaEnumeration
final case class Superhero(name: String, @JsonScalaEnumeration(classOf[PlanetType]) planet: Planet.Planet) extends TestMessage
綱要演進 Schema Evolution
參見Event Sourced一節中的Schema Evolution。
刪除字段
Jackson會自動忽略class中不存在的屬性,所以不需要做額外工作。
添加字段
如果新增的字段是可選字段,那么該字段默認值是Option.None,不需要做額外工作。如果是必備字段,那么需要繼承JacksonMigration並設定其默認值。示例如下:
// Old Event
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable
// New Event: optional property discount and field note added.
// 為什么要區分property與field?
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: String)
extends MySerializable {
// alternative constructor because `note` should have default value "" when not defined in json
@JsonCreator
def this(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: Option[String]) =
this(shoppingCartId, productId, quantity, discount, note.getOrElse(""))
}
// New Event: mandatory field discount added.
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Double) extends MySerializable
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.ObjectNode
import akka.serialization.jackson.JacksonMigration
class ItemAddedMigration extends JacksonMigration {
// 注明這是第幾個版本,之后還可以有更新的版本
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
root.set("discount", DoubleNode.valueOf(0.0))
}
root
}
}
ItemAddedMigration與ItemAdded的聯系,需要在配置里設定,下同:
akka.serialization.jackson.migrations {
"com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration"
}
重命名字段
// 將productId重命名為itemId
case class ItemAdded(shoppingCartId: String, itemId: String, quantity: Int) extends MySerializable
import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
class ItemAddedMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
root.set("itemId", root.get("productId"))
root.remove("productId")
}
root
}
}
重定義類結構
// Old class
case class Customer(name: String, street: String, city: String, zipCode: String, country: String) extends MySerializable
// New class
case class Customer(name: String, shippingAddress: Address, billingAddress: Option[Address]) extends MySerializable
//Address class
case class Address(street: String, city: String, zipCode: String, country: String) extends MySerializable
import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
class CustomerMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
val shippingAddress = root.`with`("shippingAddress")
shippingAddress.set("street", root.get("street"))
shippingAddress.set("city", root.get("city"))
shippingAddress.set("zipCode", root.get("zipCode"))
shippingAddress.set("country", root.get("country"))
root.remove("street")
root.remove("city")
root.remove("zipCode")
root.remove("country")
}
root
}
}
重命名類
// Old class
case class OrderAdded(shoppingCartId: String) extends MySerializable
// New class
case class OrderPlaced(shoppingCartId: String) extends MySerializable
class OrderPlacedMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transformClassName(fromVersion: Int, className: String): String = classOf[OrderPlaced].getName
override def transform(fromVersion: Int, json: JsonNode): JsonNode = json
}
刪除特定的序列化綁定
當某個類不再需要序列化,而只需要反序列化時,應將其加入序列化的白名單,名單是一組類名或其前綴:
akka.serialization.jackson.whitelist-class-prefix =
["com.myservice.event.OrderAdded", "com.myservice.command"]
Jackson模塊
Akka默認啟用了以下Jackson模塊:
akka.serialization.jackson {
# The Jackson JSON serializer will register these modules.
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
// FIXME how does that optional loading work??
# AkkaStreamsModule optionally included if akka-streams is in classpath
jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
jackson-modules += "com.fasterxml.jackson.module.scala.DefaultScalaModule"
}
JSON壓縮
默認的JSON壓縮策略如下:
# Compression settings for the jackson-json binding
akka.serialization.jackson.jackson-json.compression {
# Compression algorithm.
# - off : no compression (it will decompress payloads even it's off)
# - gzip : using common java gzip (it's slower than lz4 generally)
# - lz4 : using lz4-java
algorithm = gzip
# If compression is enabled with the `algorithm` setting the payload is compressed
# when it's larger than this value.
compress-larger-than = 32 KiB
}
每個綁定關系單獨配置
# 共有配置
akka.serialization.jackson.jackson-json {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = off
}
}
akka.serialization.jackson.jackson-cbor {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
}
}
akka.actor {
serializers {
jackson-json-message = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
}
serialization-identifiers {
jackson-json-message = 9001
jackson-json-event = 9002
}
serialization-bindings {
"com.myservice.MyMessage" = jackson-json-message
"com.myservice.MyEvent" = jackson-json-event
}
}
# 為每個綁定關系單獨配置
akka.serialization.jackson {
jackson-json-message {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
}
}
jackson-json-event {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = off
}
}
}
使用Manifest無關的序列化
默認情況下,Jackson使用manifest里的完全限定類名進行序列化,但這比較耗費磁盤空間和IO資源,為此可以用type-in-manifest關閉之,使類名不再出現在manifest里,然后再使用deserialization-type指定即可,否則Jackson會在綁定關系里去查找匹配的類型。
Akka Remoting已經實現了manifest的壓縮,所以這部分內容對它沒有什么實際效果。
akka.actor {
serializers {
jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
}
serialization-identifiers {
jackson-json-event = 9001
}
serialization-bindings {
"com.myservice.MyEvent" = jackson-json-event
}
}
# 由於manifest無關的序列化通常只適用於一個類型,所以通常采取每綁定關系單獨配置的方式
akka.serialization.jackson {
jackson-json-event {
type-in-manifest = off
# Since there is exactly one serialization binding declared for this
# serializer above, this is optional, but if there were none or many,
# this would be mandatory.
deserialization-type = "com.myservice.MyEvent"
}
}
日期與時間格式
WRITE_DATES_AS_TIMESTAMPS
和WRITE_DURATIONS_AS_TIMESTAMPS
默認情況下是被禁用的,這意味着日期與時間字段將按ISO-8601(rfc3339)標准的yyyy-MM-dd'T'HH:mm:ss.SSSZZ
格式,而不是數字數組進行序列化。雖然這樣的互操作性更好,但速度較慢。所以如果不需要ISO格式即可與外部系統進行互操作,那么可以作如下配置,以擁有更佳的性能(反序列化不受此設置影響)。
akka.serialization.jackson.serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
WRITE_DURATIONS_AS_TIMESTAMPS = on
}
其他可用配置
akka.serialization.jackson {
# Configuration of the ObjectMapper serialization features.
# See com.fasterxml.jackson.databind.SerializationFeature
# Enum values corresponding to the SerializationFeature and their boolean value.
serialization-features {
# Date/time in ISO-8601 (rfc3339) yyyy-MM-dd'T'HH:mm:ss.SSSZ format
# as defined by com.fasterxml.jackson.databind.util.StdDateFormat
# For interoperability it's better to use the ISO format, i.e. WRITE_DATES_AS_TIMESTAMPS=off,
# but WRITE_DATES_AS_TIMESTAMPS=on has better performance.
WRITE_DATES_AS_TIMESTAMPS = off
WRITE_DURATIONS_AS_TIMESTAMPS = off
}
# Configuration of the ObjectMapper deserialization features.
# See com.fasterxml.jackson.databind.DeserializationFeature
# Enum values corresponding to the DeserializationFeature and their boolean value.
deserialization-features {
FAIL_ON_UNKNOWN_PROPERTIES = off
}
# Configuration of the ObjectMapper mapper features.
# See com.fasterxml.jackson.databind.MapperFeature
# Enum values corresponding to the MapperFeature and their
# boolean values, for example:
#
# mapper-features {
# SORT_PROPERTIES_ALPHABETICALLY = on
# }
mapper-features {}
# Configuration of the ObjectMapper JsonParser features.
# See com.fasterxml.jackson.core.JsonParser.Feature
# Enum values corresponding to the JsonParser.Feature and their
# boolean value, for example:
#
# json-parser-features {
# ALLOW_SINGLE_QUOTES = on
# }
json-parser-features {}
# Configuration of the ObjectMapper JsonParser features.
# See com.fasterxml.jackson.core.JsonGenerator.Feature
# Enum values corresponding to the JsonGenerator.Feature and
# their boolean value, for example:
#
# json-generator-features {
# WRITE_NUMBERS_AS_STRINGS = on
# }
json-generator-features {}
# Configuration of the JsonFactory StreamReadFeature.
# See com.fasterxml.jackson.core.StreamReadFeature
# Enum values corresponding to the StreamReadFeatures and
# their boolean value, for example:
#
# stream-read-features {
# STRICT_DUPLICATE_DETECTION = on
# }
stream-read-features {}
# Configuration of the JsonFactory StreamWriteFeature.
# See com.fasterxml.jackson.core.StreamWriteFeature
# Enum values corresponding to the StreamWriteFeatures and
# their boolean value, for example:
#
# stream-write-features {
# WRITE_BIGDECIMAL_AS_PLAIN = on
# }
stream-write-features {}
# Configuration of the JsonFactory JsonReadFeature.
# See com.fasterxml.jackson.core.json.JsonReadFeature
# Enum values corresponding to the JsonReadFeatures and
# their boolean value, for example:
#
# json-read-features {
# ALLOW_SINGLE_QUOTES = on
# }
json-read-features {}
# Configuration of the JsonFactory JsonWriteFeature.
# See com.fasterxml.jackson.core.json.JsonWriteFeature
# Enum values corresponding to the JsonWriteFeatures and
# their boolean value, for example:
#
# json-write-features {
# WRITE_NUMBERS_AS_STRINGS = on
# }
json-write-features {}
# Additional classes that are allowed even if they are not defined in `serialization-bindings`.
# This is useful when a class is not used for serialization any more and therefore removed
# from `serialization-bindings`, but should still be possible to deserialize.
whitelist-class-prefix = []
# settings for compression of the payload
compression {
# Compression algorithm.
# - off : no compression
# - gzip : using common java gzip
algorithm = off
# If compression is enabled with the `algorithm` setting the payload is compressed
# when it's larger than this value.
compress-larger-than = 0 KiB
}
# Whether the type should be written to the manifest.
# If this is off, then either deserialization-type must be defined, or there must be exactly
# one serialization binding declared for this serializer, and the type in that binding will be
# used as the deserialization type. This feature will only work if that type either is a
# concrete class, or if it is a supertype that uses Jackson polymorphism (ie, the
# @JsonTypeInfo annotation) to store type information in the JSON itself. The intention behind
# disabling this is to remove extraneous type information (ie, fully qualified class names) when
# serialized objects are persisted in Akka persistence or replicated using Akka distributed
# data. Note that Akka remoting already has manifest compression optimizations that address this,
# so for types that just get sent over remoting, this offers no optimization.
type-in-manifest = on
# The type to use for deserialization.
# This is only used if type-in-manifest is disabled. If set, this type will be used to
# deserialize all messages. This is useful if the binding configuration you want to use when
# disabling type in manifest cannot be expressed as a single type. Examples of when you might
# use this include when changing serializers, so you don't want this serializer used for
# serialization and you haven't declared any bindings for it, but you still want to be able to
# deserialize messages that were serialized with this serializer, as well as situations where
# you only want some sub types of a given Jackson polymorphic type to be serialized using this
# serializer.
deserialization-type = ""
# Specific settings for jackson-json binding can be defined in this section to
# override the settings in 'akka.serialization.jackson'
jackson-json {}
# Specific settings for jackson-cbor binding can be defined in this section to
# override the settings in 'akka.serialization.jackson'
jackson-cbor {}
# Issue #28918 for compatibility with data serialized with JacksonCborSerializer in
# Akka 2.6.4 or earlier, which was plain JSON format.
jackson-cbor-264 = ${akka.serialization.jackson.jackson-cbor}
}
➡️ Persistence
Event Sourcing
🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
Akka Persistence為帶狀態的Actor提供了持久化其狀態以備崩潰后恢復的支持,其本質是持久化Actor相關的事件Event,從而在恢復時利用全部事件或階段性快照重塑(Reconstruct/Replay/Rebuild)Actor。ES在現實生活中最典型的一個例子是會計使用的復式記賬法。
📎 參考書目
-
MSDN上的 CQRS Journey。
該書以一個用C#編寫的Conference預約售票系統為例,由淺入深地展示了實現CQRS的各個環節需要關注的重點。書中的配圖和討論非常精彩,而其中提到的Process Manager也是當下實現Saga的流行方式之一。
-
Randy Shoup所著 Events as First-Class Citizens。
文中的Stitch Fix是一家智能零售商,它通過整合零售、技術、倉儲、數據分析等資源,使用數據分析軟件和機器學習來匹配顧客的服裝定制需求,為其挑選符合其個人風格、尺寸和偏好的服飾和配飾,提供了良好的消費體驗。
顧客按需訂購服裝或申請每月、每兩個月或每季度交貨。每個盒子有五件貨物。如果顧客喜歡配送貨物,可以選擇以標簽價購買,全部購買享受75%的折扣;如果不喜歡,則免費退貨。如果顧客沒有購買任何貨物,則需支付20美元的設計費。Stitch Fix的平均商品單價約65美元,公司期望在每個盒子中,用戶能夠保存2件商品。造型師是兼職,薪水為每小時15美元。每小時,造型師會完成4個盒子,這樣能產生較高的毛利率,以覆蓋巨大的開銷及庫存成本。
⚠️ 通用數據保護條例(General Data Protection Regulation,GDPR)要求,必須能根據用戶的要求刪除其個人信息。然而,在一個以Event Sourcing為基礎的應用里,要徹底刪除或修改帶有個人信息的所有事件是非常困難的,所以改用“數據粉碎”的技術來實現。其原理是給每個人分配一個唯一的ID,然后以該ID作為密鑰,對其相關的所有個人數據進行加密。當需要徹底刪除該用戶的信息時,直接刪除該ID,即可保證其個人數據無法被解密,從而達到保護目的。Lightbend為Akka Persistence提供了相應的工具,以幫助構建具有GDPR功能的系統。
Akka Persistence提供了event sourced actor(又稱為 persistent actor)作為實現。這類Actor在收到Command時會先進行檢驗Validate。如果Command各項條件通過了檢驗,則使之作用於當前實體,並產生相應的事件Event,待這些Event被持久化后,以更新實體的狀態結束;否則,實體將直接拒絕Reject該Command。(💀 不該是先更新狀態,然后才持久化事件嗎?貌似先持久化再更新會更靠譜。)
而在重塑Actor時,所有的事件將被加載,並無需再校驗地直接用於更新Actor的狀態,直到恢復到最新狀態。
一個典型的EventSourcedBehavior包括ID、初始State,CommandHandler與EventHandler四個組成部分,如果需要傳入ActorContext,則在外層用Behaviors.setup傳入即可:
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
object MyPersistentBehavior {
sealed trait Command
sealed trait Event
final case class State()
def apply(): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
// 1. 該Actor的唯一Id
persistenceId = PersistenceId.ofUniqueId("abc"),
// 2. 初始狀態
emptyState = State(),
// 3. Command Handler
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
// 4. Event Handler
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
}
PersistenceId
PersistenceId是Event Sourced Actor在其生命周期內唯一的身份標識(想想聚合Id)。因為Akka Cluster提供的EntityId可能為多個不同類型的Actor共享,所以一般配合EntityTypeKey一起組成唯一的PersistenceId。所以,PersistenceId.apply()用默認的分隔符|
將entityType.name與entityId兩個字符串連接成所需的Id。當然,也可以使用PersistenceId.ofUniqueId生成自定義分隔符的Id。
即使在集群條件下,持同一PersistanceId的Actor在任何時候只能存在一個,否則就世界大亂了。當然,因為有Recovery,這個Actor可以被分片甚至遷移到任何一個片及其節點上。
🔗 摘選自 https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#persistence-example
sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext =>
HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
Command Handler
一個CommandHandler有2個參數:當前的State、收到的Command,然后返回Effect。Effect由其工廠創建,創建動作包括:
- persist:原子性地保存處理完Command后產生的若干Event,若保存其中一個Event時失敗則所有Event都將失敗。但是在底層的事件存儲不支持一次寫入多個事件的情況下,CommandHandler為拒絕一次性持久化多個事件,可以拋出EventRejectedException(通常帶有UnsupportedOperationException),從而由父Actor進行監管處理。
- none:什么也不做,比如一個只包括讀操作的Query Command。
- unhandled:表明該命令不適用於當前狀態。
- stop:停止該Actor。
- stash:暫存當前命令。
- unstashAll:處理所有被Effect.stash暫存起來的命令。
- reply:向發來命令的Actor發送一條回復。
在返回Effect的同時,還可以在該Effect后接副作用SideEffect,比如Effect.persist(...).thenRun(...)。具體包括:
- thenRun:運行某個副作用函數。
- thenStop:停止該Actor。
- thenUnstashAll:處理所有被Effect.stash暫存起來的命令。
- thenReply:向發來命令的Actor發送一條回復。
任何SideEffect都最多只能執行一次。如果持久化失敗,或者Actor直接重啟、停止后再啟動,都不會執行任何副作用。所以通常是響應RecoveryCompleted信號,在其中去執行需要被確認的副作用,這種情況下,則可能會出現同一個副作用多次執行的情況。
副作用都是按注冊的順序同步執行,但也不能避免因為發送消息等而導致操作的並發執行。副作用也可能在事件被持久化之前就被執行,這樣的話,即使持久化失敗導致事件未被保存,副作用也生效了。
💀 關於翻譯:Akka用“日記”——Journal指代的Event Store,並與“日志”Log相區別。雖然我更喜歡用“事件簿”這樣的稱謂,但一來請教了師姐說“日記”更准確,二來電影《Joker》里做心理咨詢的社工在問Frank時也用的Journal這個詞,於是就此作罷。
Event Handler
一個EventHandler有2個參數:當前State,觸發的Event,然后返回新的State。
⚡ CommandHandler觸發並持久化事件,EventHandler處理事件並更新狀態,所以Actor的狀態實際是在EventHandler里才真正被改變的!
當事件Event被持久化后,EventHandler將使用它去修改作為參數傳入的當前狀態State,從而產生新的State。至於State的具體實現,可以是FP風格的不可變量,也可以是OO風格的可變量,但通常都會封裝在諸如Class這樣的一個容器里。
不同於Command Handler的是,Event Handler不會產生副作用,所以它將直接用於Actor的重塑Recovery操作上。如果需要在Recovery之后做點什么,那么恰當的楔入點包括:CommandHandler最后創建的Effect附加的thenRun(),或者是RecoveryCompleted事件的處理函數里。
改變Actor的行為
因為不同的消息將觸發Actor不同的行為,所以行為也是Actor狀態的一部分。所以在Recovery時除了恢復數據,還要小心恢復其相應的行為。盡管行為是函數,而函數是一等公民,所以行為理應可以象數據一樣保存,但困難的地方在於怎么保存編碼,因此Akka Persistence不提供Behavior的持久化。
面對這個棘手的問題,最容易想到的辦法是根據State定義不同的CommandHandler,並隨State變化而切換,從而使Actor成為一台有限狀態機。於是,由此得到的便是由State與Command兩級匹配構成的邏輯,利用繼承定義State的不同實現,然后先case State、再case Command,最后根據匹配結果將消息分發至相應的處理函數(處理函數亦相對獨立,以凸顯不同的邏輯分支)。而在代碼實現的結構上,就是在一個CommandHandler里,定義若干個協助完成消息處理的private function。這些處理函數的參數由Handler在case分支里賦與,返回類型則統一為與CommandHandler相同的Effect[Event, State]。最后,只需要將這個CommandHandler連殼帶肉交給EventSourcedBehavior工廠即可。
📎 更規范的方式是把Handler定義在State里,具體參見后續的Handler設計指南。
強制回復
Request-Response是最常見的通信模式之一。為了保證Persistent Actor一定會回復,EventSourcedBehavior推出了ReplyEffect,從而保證CommandHandler一定會發回Reply。它與Effect的唯一區別是必須用工廠Effect.reply
、Effect.noReply
、Effect.thenReply
或者Effect.thenNoReply
之一創建的結果作為返回值,而不再是Effect,否則編譯器會提示類型不匹配的錯誤。
為此,在定義Command時必須包含一個replyTo屬性,同時得用EventSourcedBehavior.withEnforcedReplies(id, state, cmdHandler, evtHandler)
來創建Behavior。
序列化
常見的序列化方案和工具也適用於Akka,推薦使用🔗 Jackson
在序列化時,必須考慮不同版本事件之間的向下兼容性,參考綱要演進 Schema Evolution(💀 統一個中文名真難。architecture 架構,pattern 模式,structure 結構,style 風格/樣式,template 模板,boilerplate 樣板,schema 綱要)
重塑
相比Recovery,我更喜歡Replay或者Reconstruct,使用“重塑實體”和“事件重播”在語義上也更生動。
Akka Persistence在Actor啟動或重啟時,將自動地直接使用EventHandler進行Actor的重塑。要注意的是,不要在EventHandler中執行副作用,而應該在重塑完成后,在receiveSignal里響應RecoveryCompleted信號,在響應程序里執行副作用。在RecoveryCompleted信號里帶有重塑后的當前狀態。而即使對於一個新的、還沒有任何已記錄事件的Actor,在執行Recovery之后也會觸發RecoveryCompleted信號。
由於在重塑完成前,所有新消息將會被Stash,所以為防止失去響應,Akka提供了最大並發的重塑數,可以按akka.persistence.max-concurrent-recoveries = 50
的方式進行配置。
重塑過濾 Replay Filter
在某些情況下,事件流可能會損壞,而此時多個寫入者(即多個Persistent Actor實例)准備寫入具有相同序列號的不同消息,則會引發不一致的沖突。為此,Akka Persistence提供了Replay Filter,通過消息序列號和寫入者的UUID來檢測並解決消息之間的沖突。具體配置需要寫入配置文件中的如下區段(leveldb視具體插件而不同):
💀 理解不能:為什么會有多個Actor實例要寫入有相同序列號的消息?PersistenceId不該是唯一的嗎?消息序列號是什么鬼?
🔈 Akka Persistence使用單一寫入者原則,即任一時刻,對於任何一個特定的PersistenceId,只有一個EventSourcedBehavior能持久化事件。
akka.persistence.journal.leveldb.replay-filter {
mode = repair-by-discard-old
}
包括4種策略:
- repair-by-discard-old:拋棄舊寫入者的事件,並且在Log里記下這次警告Warning。而在任何情況下,最高序列號的事件總會被重播,因此不用擔心新的事件會打亂你已有的事件日志。
- fail:讓重塑直接失敗,並在Log里記下這次錯誤Error。
- warn:繼續發出消息,並在Log里記下這次警告Warning。
- off:禁用此功能。
完全禁用重塑和快照功能
使用withRecovery()可以修改重塑的策略,包括禁用自動重塑功能。當然,快照功能可以單獨禁用,或者只選擇自己需要的那一類快照。
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRecovery(Recovery.disabled)
Tag標簽
在不使用EventAdapter的情況下,可以直接使用withTagger為EventSourcedBehavior中的事件打上標簽(准確說是標簽集),方便將Event根據Tag實現分組,比如屬於不同Actor實例但屬相同類型的所有Event,然后在Persistence Query中使用。
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withTagger(_ => Set("tag1", "tag2"))
適配Event
通過繼承EventAdapter[T, Wrapper]
並安裝到EventSourcedBehavior,可以自動將事件T轉換為Wrapper,然后持久化。
case class Wrapper[T](event: T)
class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
override def toJournal(e: T): Wrapper[T] = Wrapper(e)
override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event)
override def manifest(event: T): String = ""
}
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.eventAdapter(new WrapperEventAdapter[Event])
處理日記失敗
若Journal存取失敗,則EventSourcedBehavior將停止。該默認行為可以通過使用覆寫后的回退策略BackoffSupervisorStrategy
進行改變。普通的Supervisor在此處並不適用,因為事件已經被持久化,單純地重啟Actor並不能一並撤銷日記發生的改變。如果Journal存取失敗發生在重塑Actor的過程中,則會觸發RecoveryFailed信號,同時Actor將停止或在回退后重新啟動。
但若是Journal在持久化事件時發現錯誤,比如事件無法被序列化,那么它會主動拒絕持久化事件。此時該事件必定不會被Journal持久化,而會觸發一個EventRejectedException異常傳遞給EventSourcedBehavior,然后按Supervisor設定的策略進行處理。
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.onPersistFailure(
SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
暫存消息
在執行Effect.persist或persistAll,上一個unstashAll或者創建快照期間,所有新到來的消息將自動地被暫存,直到所有事件被持久化且所有的副作用執行完畢。同理,在重塑過程中,新消息也將被暫存,直到重塑過程結束。除了自動暫存之外,在需要的時候,比如需要等候其他條件一並成立時,也可以用Effect.stash手動開始暫存,待條件全部齊備后再thenUnstashAll。
設置Stash的消息數量請配置:akka.persistence.typed.stash-capacity = 10000
⚠️ 由於Stash的消息都暫存在內存里,所以在以下情況發生時,這些消息將丟失:
- 當Actor被Cluster Sharding鈍化或重新分配時
- 當Actor因處理命令或執行副作用時拋出異常而被停止或重啟時
- 當Actor在持久化事件過程中觸發異常時(若定義了onPersistFailure回退策略,則暫存的命令會被保留並在稍后再處理)
💀 Akka Persistence為什么沒有為Mailbox提供一個持久化方案?或者,這應該是ConsumerController的責任?
🔈 參見:耐久的Producer
CQRS
Akka Persistence使用EventSourcedBehavior,配合Persistence Query的EventsByTag,實現CQRS模式。
Handler設計指南
與Handler單獨放置的常見方案不同,Akka Persistence推薦將CommandHandler與EventHandler都設計在State里。這樣State便可當作包括了業務邏輯和數據的完整領域對象。而在State剛創建時,除了專門定義一個初始化的State類外,也可以用Option[State]來代替,這樣Option.None即代表了初始狀態,而Option.Some則是更新后的狀態,然后用case匹配即可。(💀 注意示例代碼里State用的Option[Account])
完整示例如下:
/**
* Bank account example illustrating:
* - Option[State] that is starting with None as the initial state
* - event handlers in the state classes
* - command handlers in the state classes
* - replies of various types, using withEnforcedReplies
*/
object AccountExampleWithOptionState {
//#account-entity
object AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command
// Reply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
// Event
sealed trait Event extends CborSerializable
case object AccountCreated extends Event
case class Deposited(amount: BigDecimal) extends Event
case class Withdrawn(amount: BigDecimal) extends Event
case object AccountClosed extends Event
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[Event, Option[Account]]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: Command): ReplyEffect
def applyEvent(event: Event): Account
}
// State: OpenedAccount
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case Deposit(amount, replyTo) =>
Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed)
case Withdraw(amount, replyTo) =>
if (canWithdraw(amount))
Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed)
else
Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed)
else
Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(Rejected("Account is already created"))
}
override def applyEvent(event: Event): Account =
event match {
case Deposited(amount) => copy(balance = balance + amount)
case Withdrawn(amount) => copy(balance = balance - amount)
case AccountClosed => ClosedAccount
case AccountCreated => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
}
def canWithdraw(amount: BigDecimal): Boolean = {
balance - amount >= Zero
}
}
// State: ClosedAccount
case object ClosedAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case c: Deposit =>
replyClosed(c.replyTo)
case c: Withdraw =>
replyClosed(c.replyTo)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
replyClosed(replyTo)
case CreateAccount(replyTo) =>
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect =
Effect.reply(replyTo)(Rejected(s"Account is closed"))
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
// when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("Account")
def apply(persistenceId: PersistenceId): Behavior[Command] = {
// type of State is Option[Account]
EventSourcedBehavior.withEnforcedReplies[Command, Event, Option[Account]](
persistenceId,
None,
// use result of case match for the parameter handler.
(state, cmd) =>
state match {
case None => onFirstCommand(cmd)
case Some(account) => account.applyCommand(cmd)
},
// match type Option[Account] declared in withEnforcedReplies.
(state, event) =>
state match {
case None => Some(onFirstEvent(event))
case Some(account) => Some(account.applyEvent(event))
})
}
def onFirstCommand(cmd: Command): ReplyEffect = {
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
}
}
def onFirstEvent(event: Event): Account = {
event match {
case AccountCreated => OpenedAccount(Zero)
case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
}
}
}
}
快照 Snapshot
快照的初衷,是為了提高Recovery的效率,所以相應可以在構造EventSourcedBehavior時使用.snapshotWhen()定義創建快照的兩種情況:一是每N條事件時創建一份快照;二是當滿足特定條件時創建一份快照。
💀 snapshotWhen里的case匹配什么鬼?
🔈 參見Akka API
snapshotWhen:在指定狀態和序列號的條件下,當指定事件被持久化后,即創建一份快照。當有多條事件時,則要等所有事件完成持久化后才會創建快照。
def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State]
withRetention:指定保留或刪除快照的策略。默認情況下,快照不會自動保存和刪除。
def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State]
以下示例即指定在觸發BookingCompleted事件后創建一份快照:
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => state)
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
在重塑Actor時,默認會使用SnapshotSelectionCriteria.Latest
來選擇最新的(最年輕)的快照版本,除非使用withRecovery里的Recovery參數指定其他策略(比如徹底禁用快照):
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRecovery(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none))
除了默認提供的snapshot-store
插件(akka.persistence.snapshot-store.plugin,需要配置),可以使用EventSourcedBehavior.withSnapshotPluginId
指定其他的替代插件。
保存快照可能會失敗,但它不會導致Actor的停止或重啟,只會觸發信號SnapshotCompleted或者SnapshotFailed,並記入日志Log。
刪除快照
每當有新的快照成功創建時,舊的快照都將根據RetentionCriteria里設置的條件自動刪除。在上面的例子里,將在每100條事件時(numberOfEvents = 100)創建一份快照,然后每份序列號小於已保存快照的序列號減去keepNSnapshots * numberOfEvents的快照會被自動刪除(每隔200號刪除之前的快照)。
⚠️ 根據Akka API的說明,如果將EventSourcedBehavior.withRetention和RetentionCriteria.snapshotEvery一起使用,則符合snapshotWhen定義條件而觸發的快照將不會導致舊快照被刪除。此類刪除僅當單獨使用withRetention,且匹配RetentionCriteria中的numberOfEvents設定值時才會觸發。
在刪除快照時,將會觸發DeleteSnapshotsCompleted或DeleteSnapshotsFailed信號,可借此進行調試。
刪除事件
💀 除非頭鐵,在一個以Event Sourced為基礎實現模式的系統里,誰會沒事刪除事件?!相反,即使是因為應用版本升級而對原有的事件進行改造,那么在CQRS Journey里提出的事件版本遷移才理應是更恰當的選擇。而在Akka Persistence里,這被稱為綱要演進Schema Evolution。
刪除事件與刪除快照的策略,都是在withRetention里用RetentionCriteria.withDeleteEventsOnSnapshot
指定的,且同期的事件會先於快照被刪除,而只保留最新版本的快照(💀這便與非EventSourced的應用有何區別?)。但這只是Akka Persistence認為的刪除,至於底層的Event Store是否真的從數據庫中刪除該事件,則由EventStore的具體實現決定。
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot)
.receiveSignal { // optionally respond to signals
case (state, _: SnapshotFailed) => // react to failure
case (state, _: DeleteSnapshotsFailed) => // react to failure
case (state, _: DeleteEventsFailed) => // react to failure
}
測試 Persistent Actor
🏭 (看到此處時候更新到2.6.6,要注意這部分和Akka Persistence一樣在未來版本會有大的變化)
com.typesafe.akka:akka-persistence-typed_2.12:2.6.6
com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6
單元測試
Akka Persistence提供了EventSourcedBehaviorTestKit幫助進行測試,它按照一次一條命令的方式同步執行並返回結果,方便你斷言其行為。
使用時,通過加載EventSourcedBehaviorTestKit.config來啟動在內存中模擬的事件存儲和快照功能。
Command、Event以及State的序列化校驗會自動完成,相關的設置可以在創建EventSourcedBehaviorTestKit時,使用SerializationSettings進行自定義。默認情況下,它只負責序列化是否可用而不檢查結果是否一致,所以要檢查一致性就要啟用verifyEquality,並且用case class之類的方法實現Command、Event和State的equals。
要測試重塑功能,可以使用EventSourcedBehaviorTestKit.restart
。完整示例如下:
class AccountExampleDocSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
with AnyWordSpecLike
with BeforeAndAfterEach
with LogCapturing {
private val eventSourcedTestKit =
EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account](
system,
AccountEntity("1", PersistenceId("Account", "1")))
override protected def beforeEach(): Unit = {
super.beforeEach()
eventSourcedTestKit.clear()
}
"Account" must {
"be created with zero balance" in {
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
result.reply shouldBe AccountEntity.Confirmed
result.event shouldBe AccountEntity.AccountCreated
result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
}
"handle Withdraw" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
result1.reply shouldBe AccountEntity.Confirmed
result1.event shouldBe AccountEntity.Deposited(100)
result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100
val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _))
result2.reply shouldBe AccountEntity.Confirmed
result2.event shouldBe AccountEntity.Withdrawn(10)
result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
}
"reject Withdraw overdraft" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _))
result.replyOfType[AccountEntity.Rejected]
result.hasNoEvents shouldBe true
}
"handle GetBalance" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
result.reply.balance shouldBe 100
result.hasNoEvents shouldBe true
}
}
}
持久化測試
🏭 com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6
要測試事件是否被成功持久化,則要使用PersistenceTestKit。它提供了不同的工具集:
- 類PersistenceTestKit用於事件,對應的PersistenceTestKitPlugin用於模擬事件存儲。
- 類SnapshotTestKit用於快照,對應的PersistenceTestKitSnapshotPlugin用於模擬快照存儲。
使用前,需要在用於初始化TestKit的ActorSystem中進行配置:
object TestKitTypedConf {
val yourConfiguration = ConfigFactory.defaultApplication()
val system = ActorSystem(
??? /*some behavior*/,
"test-system",
PersistenceTestKitPlugin.config.withFallback(yourConfiguration))
val testKit = PersistenceTestKit(system)
}
object SnapshotTypedConf {
val yourConfiguration = ConfigFactory.defaultApplication()
val system = ActorSystem(
??? /*some behavior*/,
"test-system",
PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration))
val testKit = SnapshotTestKit(system)
}
使用PersistenceTestKit,可以實施以下測試行為:
- 檢查某個關注事件是否將要被持久化的那一個。
- 檢查某個關注事件是否已被持久化。
- 讀取一組事件序列,方便逐個檢視。
- 清空所有已持久化的事件。
- 讀取所有已持久化的事件。
- 拒絕某個事件被持久化(快照不能被拒絕)。
- 把事件放入存儲,以測試重塑功能。
- 當要持久化、讀取或刪除某個事件時拋出異常。
- 自定義底層存儲設施的存取策略。
自定義存儲策略
通過為事件存儲實現ProcessingPolicy[EventStorage.JournalOperation]
或者為快照存儲實現ProcessingPolicy[SnapshotStorage.SnapshotOperation]
,然后使用withPolicy()加載,可以自定義存儲的存取策略,實現更細粒度的控制。
其中,較為關鍵的是ProcessingPolicy.tryProcess(persistenceId, storageOperation)方法。storageOperation方法包括:
- Event Storage
- ReadEvents
- WriteEvents
- DeleteEvents
- ReadSeqNum
- Snapshot Storage
- ReadSnapshot
- WriteSnapshot
- DeleteSnapshotByCriteria
- DeleteSnapshotByMeta:由SequenceNumber和TimeStamp構成的Meta
而tryProcess的結果則是下列情形之一:
- ProcessingSuccess:所有事件都被成功存取或刪除。
- StorageFailure:模擬觸發異常。
- Reject:模擬拒絕存取。
object PersistenceTestKitSampleSpec {
final case class Cmd(data: String) extends CborSerializable
final case class Evt(data: String) extends CborSerializable
object State {
val empty: State = new State
}
final class State extends CborSerializable {
def updated(event: Evt): State = this
}
}
class PersistenceTestKitSampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
with AnyWordSpecLike
with BeforeAndAfterEach {
val persistenceTestKit = PersistenceTestKit(system)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
}
"Persistent actor" should {
"persist all events" in {
val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
val persistentActor = spawn(
EventSourcedBehavior[Cmd, Evt, State](
persistenceId,
emptyState = State.empty,
commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
eventHandler = (state, evt) => state.updated(evt)))
val cmd = Cmd("data")
persistentActor ! cmd
val expectedPersistedEvent = Evt(cmd.data)
persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent)
}
}
}
class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType {
//you can use internal state, it does not need to be thread safe
var count = 1
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
if (count < 10) {
count += 1
//check the type of operation and react with success or with reject or with failure.
//if you return ProcessingSuccess the operation will be performed, otherwise not.
processingUnit match {
case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess
case WriteEvents(batch) if batch.size > 1 =>
ProcessingSuccess
case ReadSeqNum => StorageFailure()
case DeleteEvents(_) => Reject()
case _ => StorageFailure()
}
} else {
ProcessingSuccess
}
}
class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
//you can use internal state, it does not need to be thread safe
var count = 1
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult =
if (count < 10) {
count += 1
//check the type of operation and react with success or with reject or with failure.
//if you return ProcessingSuccess the operation will be performed, otherwise not.
processingUnit match {
case ReadSnapshot(_, payload) if payload.nonEmpty =>
ProcessingSuccess
case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 =>
ProcessingSuccess
case DeleteSnapshotsByCriteria(_) => StorageFailure()
case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 =>
ProcessingSuccess
case _ => StorageFailure()
}
} else {
ProcessingSuccess
}
}
class PersistenceTestKitSampleSpecWithPolicy
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
with AnyWordSpecLike
with BeforeAndAfterEach {
val persistenceTestKit = PersistenceTestKit(system)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
persistenceTestKit.resetPolicy()
}
"Testkit policy" should {
"fail all operations with custom exception" in {
val policy = new EventStorage.JournalPolicies.PolicyType {
class CustomFailure extends RuntimeException
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
processingUnit match {
case WriteEvents(_) => StorageFailure(new CustomFailure)
case _ => ProcessingSuccess
}
}
persistenceTestKit.withPolicy(policy)
val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
val persistentActor = spawn(
EventSourcedBehavior[Cmd, Evt, State](
persistenceId,
emptyState = State.empty,
commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
eventHandler = (state, evt) => state.updated(evt)))
persistentActor ! Cmd("data")
persistenceTestKit.expectNothingPersisted(persistenceId.id)
}
}
}
集成測試
PersistenceTestKit可以配合ActorTestKit一起使用,但有幾點需要注意。一是對集群條件下涉及多個節點的測試,得使用單獨的事件和快照存儲。盡管可以使用Persistence Plugin Proxy,但使用真實的數據庫通常會更好、更現實。二是某些Persistence插件會自動創建數據庫的表,但在多個ActorSystem並發要求建表時就有一定的局限性了。所以為協調數據庫的初始化工作,就得使用PersistenceInit工具。
val timeout = 5.seconds
val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
Await.result(done, timeout)
綱要演進
💀 這就是前面提到的事件版本遷移了!Schema,比如XML Schema,是描述特定結構的一種方式,翻譯為“綱要”貌似妥帖一些。
這一章的重點,是介紹不同的綱要演進策略,以及如何根據領域模型的實際情況,在不同策略之間作出抉擇。當然,這些策略並不是全部或者唯一的選擇,而只是Akka給出的有限方案。其本質,都是為了保證舊系統下舊綱要規格格式的事件,在遷移到新系統后也能保持一致性,且不會為了處理這些不同版本的同一類型事件,而給業務邏輯帶來額外負擔。所以,綱要演進要實現的目標包括:
- 保證系統繼續正常運行,而無需進行大規模的事件版本遷移。
- 保證新舊版本事件兼容,即使是舊版本的事件也能以新面貌統一呈現。
- 在重塑或查詢過程中,將舊版本事件透明地升級為最新版本,從而使業務邏輯無需考慮事件的多個版本的兼容性問題。
其中,綱要演進的誘因包括,相應的解決方案也多是利用EventAdapter實現過濾:
- 向事件中增加一個字段。
- 原有字段被刪除或更名。
- 事件從Protocol中刪除。
- 將一個事件划分為幾個更小粒度的事件。
選擇適當的序列化格式
選擇適當的序列化格式非常重要,這不僅關乎序列化的性能,還關乎綱要演進的方案確定和細節的實現。選擇不當的話,序列化的擴展將非常困難,系統中將不得不保留多個不同版本的序列化代碼。Akka Persistence推薦的序列化方案主要包括:
- Jackson:這是Akka強烈推薦的方案。
- Google Protocol Buffers:能獲得更精細的控制,但需要處理更多的序列化與領域模型之間的映射細節。
- Apache的Thrift與Avro:主要提供二進制格式的序列化支持。
🔗 參考文獻:Martin Kleppmann 所著Schema evolution in Avro, Protocol Buffers and Thrift
默認情況下,Akka Persistence使用Akka Serialization模塊里基於Google Protocol Buffers實現的一個序列化器。如果Journal插件希望使用其他類型的序列化器,則需要根據不同的數據庫進行挑選。但無論如何,Akka Persistence只是提供一個序列化器的可插拔接口,它不會自動處理消息的序列化。
所有的消息都會被序列化成如下封裝結構:最底層也是最內層的是用黃色標注的有效載荷,它就是消息對象的實例被序列化后的結果,然后序列化器會附加它自己的SerializerId等信息一起組成中間層的PersistentPayload,之后才是Akka Persistence附加的SequenceNumber、PersistenceId等其他一些信息包裹組成的最外層。(💀 想像一下對方收到這封信時又是怎么一層層剝開的就好理解了。外面2層都是框架直接包攬了,只有核心那層需要自己操心。)所以序列化的要點,就在於最內層的消息對象要序列化成什么樣子。對此,Java內置的序列化器用於調試還算勉強(想想多層屬性嵌套的情形),生產環境下最好還是另尋他途。所以,了解序列化器的優勢和局限性很重要,這樣才能在進行項目時迅速行動,並且無懼重構模型。
以下是在Akka Serialization里自定義有效載荷序列化器的示例:
/**
* Usually a serializer like this would use a library like:
* protobuf, kryo, avro, cap'n proto, flatbuffers, SBE or some other dedicated serializer backend
* to perform the actual to/from bytes marshalling.
*/
final case class Person(name: String, surname: String)
class SimplestPossiblePersonSerializer extends SerializerWithStringManifest {
val Utf8 = Charset.forName("UTF-8")
val PersonManifest = classOf[Person].getName
// unique identifier of the serializer
// 在反序列化時,這個SerializerId將用於加載同一類型的序列化器,以保證完全對稱
def identifier = 1234567
// extract manifest to be stored together with serialized object
override def manifest(o: AnyRef): String = o.getClass.getName
// serialize the object
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case p: Person => s"""${p.name}|${p.surname}""".getBytes(Utf8)
case _ => throw new IllegalArgumentException(s"Unable to serialize to bytes, class was: ${obj.getClass}!")
}
// deserialize the object, using the manifest to indicate which logic to apply
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
manifest match {
case PersonManifest =>
val nameAndSurname = new String(bytes, Utf8)
val Array(name, surname) = nameAndSurname.split("[|]")
Person(name, surname)
case _ =>
throw new NotSerializableException(
s"Unable to deserialize from bytes, manifest was: $manifest! Bytes length: " + bytes.length)
}
}
相應在application.conf里的配置:
akka {
actor {
serializers {
person = "docs.persistence.SimplestPossiblePersonSerializer"
}
serialization-bindings {
"docs.persistence.Person" = person
}
}
}
Akka Serialization提供了相應的Jackson示例
⭕ 情形一:增加字段
適用場景:向已經存在的事件類型里添加一個新的字段。
解決方案:添加字段是最常見的演進事由之一,只要添加的字段是二進制兼容的(💀 Jackson就是文本兼容的,不是二進制兼容的嗎?),就能很容易在序列化里實現演進。此處用ProtoBuf示范,為值機選座增加了一個靠窗或過道的字段seatType,然后給它一個默認值(此處用的SeatType.Unknown),或者可以用Option[T]包裝,最后用ProtoBuf提供的方法hasSeatType區分新舊事件,再使用SeatType.fromString從字符串析取值。
class ProtobufReadOptional {
sealed abstract class SeatType { def code: String }
object SeatType {
def fromString(s: String) = s match {
case Window.code => Window
case Aisle.code => Aisle
case Other.code => Other
case _ => Unknown
}
case object Window extends SeatType { override val code = "W" }
case object Aisle extends SeatType { override val code = "A" }
case object Other extends SeatType { override val code = "O" }
case object Unknown extends SeatType { override val code = "" }
}
case class SeatReserved(letter: String, row: Int, seatType: SeatType)
/**
* Example serializer impl which uses protocol buffers generated classes (proto.*)
* to perform the to/from binary marshalling.
*/
class AddedFieldsSerializerWithProtobuf extends SerializerWithStringManifest {
override def identifier = 67876
final val SeatReservedManifest = classOf[SeatReserved].getName
override def manifest(o: AnyRef): String = o.getClass.getName
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
manifest match {
case SeatReservedManifest =>
// use generated protobuf serializer
seatReserved(FlightAppModels.SeatReserved.parseFrom(bytes))
case _ =>
throw new NotSerializableException("Unable to handle manifest: " + manifest)
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case s: SeatReserved =>
FlightAppModels.SeatReserved.newBuilder
.setRow(s.row)
.setLetter(s.letter)
.setSeatType(s.seatType.code)
.build()
.toByteArray
}
// -- fromBinary helpers --
private def seatReserved(p: FlightAppModels.SeatReserved): SeatReserved =
SeatReserved(p.getLetter, p.getRow, seatType(p))
// handle missing field by assigning "Unknown" value
private def seatType(p: FlightAppModels.SeatReserved): SeatType =
if (p.hasSeatType) SeatType.fromString(p.getSeatType) else SeatType.Unknown
}
}
相應的ProtoBuf配置FlightAppModels.proto,其中新增加的seatType是optional
。ProtoBuf會根據配置生成一個具體負責Marshall的工具類,optional字段將會賦與一hasXXX的方法:
option java_package = "docs.persistence.proto";
option optimize_for = SPEED;
message SeatReserved {
required string letter = 1;
required uint32 row = 2;
optional string seatType = 3; // the new field
}
⭕ 情形二:重命名字段
適用場景:解決設計之初不恰當的字段命名,使其更符合業務需求。此處舉例用了SeatReserved中原來的code,現在的seatNr。
解決方案一:使用符合IDL規范(Interface Description Language)的序列化器。這是最簡單有效的方案,也是ProtoBuf和Thrift采取的方案,比如上面的.proto即是用IDL描述的映射結構,然后ProtoBuf再據此描述自動生成工具類。要改名時,只需要維護IDL映射結構,保持字段的ID不變,修改映射的名稱即可。
// protobuf message definition, BEFORE:
message SeatReserved {
required string code = 1;
}
// protobuf message definition, AFTER:
message SeatReserved {
required string seatNr = 1; // field renamed, id remains the same
}
解決方案二:手動處理事件版本的遷移。在沒辦法使用IDL方式,比如使用Jackson格式進行序列化時,就只有手動進行轉換,給事件附加一個版本號字段,然后用手寫的EventAdapter進行反序列化的轉換(該EventAdapter在EventSourcedBehavior創建時加載)。在使用Jackson進行增加字段、改變事件結構等情況下,這樣的方法也是適用的。
class JsonRenamedFieldAdapter extends EventAdapter {
import spray.json.JsObject
val marshaller = new ExampleJsonMarshaller
val V1 = "v1"
val V2 = "v2"
// this could be done independently for each event type
override def manifest(event: Any): String = V2
override def toJournal(event: Any): JsObject =
marshaller.toJson(event)
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case json: JsObject =>
EventSeq(marshaller.fromJson(manifest match {
case V1 => rename(json, "code", "seatNr")
case V2 => json // pass-through
case unknown => throw new IllegalArgumentException(s"Unknown manifest: $unknown")
}))
case _ =>
val c = event.getClass
throw new IllegalArgumentException("Can only work with JSON, was: %s".format(c))
}
def rename(json: JsObject, from: String, to: String): JsObject = {
val value = json.fields(from)
val withoutOld = json.fields - from
JsObject(withoutOld + (to -> value))
}
}
⭕ 情形三:刪除事件並忽略之
適用場景:某個事件被認為是多余的、毫無價值甚至影響效率的,但是在重塑時卻沒法跳過該事件。本例中是乘客按燈呼叫服務的事件CustomerBlinked。
最簡單的方案:由於事件並不能真正從Journal中徹底刪除,所以通常是在重塑時通過忽略特定的事件達到刪除的效果。最簡單的方案,就是在EventAdapter中截留該事件而返回一個空的EventSeq,同時放過其他類型的事件。該方案的弊端,在於從Storage中讀取事件時,仍需要反序列化這個事件,從而導致效率的損失。
更成熟的方案:在上述方案基礎上,增加了在序列化器端的過濾,使特定事件不再被反序列化。被忽略的事件被稱為墓碑Tombstone。
final case class CustomerBlinked(customerId: Long)
case object EventDeserializationSkipped
class RemovedEventsAwareSerializer extends SerializerWithStringManifest {
val utf8 = Charset.forName("UTF-8")
override def identifier: Int = 8337
val SkipEventManifestsEvents = Set("docs.persistence.CustomerBlinked"
// 其他被忽略的事件...
)
override def manifest(o: AnyRef): String = o.getClass.getName
override def toBinary(o: AnyRef): Array[Byte] = o match {
case _ => o.toString.getBytes(utf8) // example serialization
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
manifest match {
case m if SkipEventManifestsEvents.contains(m) =>
EventDeserializationSkipped
case other => new String(bytes, utf8)
}
}
class SkippedEventsAwareAdapter extends EventAdapter {
override def manifest(event: Any) = ""
override def toJournal(event: Any) = event
override def fromJournal(event: Any, manifest: String) = event match {
case EventDeserializationSkipped => EventSeq.empty
case _ => EventSeq(event)
}
}
⭕ 情形四:從數據模型中分離出域模型
適用場景:這主要是從持久化無關(Persistence Ignorance)的角度,堅持采用POJO(Plain Ordinary Java Object)這樣的case class實現領域模型,並盡量避免數據模型及數據庫、序列化器等底層細節和框架對領域模型的入侵。
解決方案:創建一個EventAdapter,實現case class與數據庫存取class之間一對一的映射。
/** Domain model - highly optimised for domain language and maybe "fluent" usage */
object DomainModel {
final case class Customer(name: String)
final case class Seat(code: String) {
def bookFor(customer: Customer): SeatBooked = SeatBooked(code, customer)
}
final case class SeatBooked(code: String, customer: Customer)
}
/** Data model - highly optimised for schema evolution and persistence */
object DataModel {
final case class SeatBooked(code: String, customerName: String)
}
class DetachedModelsAdapter extends EventAdapter {
override def manifest(event: Any): String = ""
override def toJournal(event: Any): Any = event match {
case DomainModel.SeatBooked(code, customer) =>
DataModel.SeatBooked(code, customer.name)
}
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case DataModel.SeatBooked(code, customerName) =>
EventSeq(DomainModel.SeatBooked(code, DomainModel.Customer(customerName)))
}
}
⭕ 情形五:以可讀樣式存儲事件
適用場景:希望以JSON等更可讀的樣式,而不是一個二進制流的方式來保存事件。這在最近的一些諸如MongoDB、PostgreSQL的NoSQL類型數據庫中應用較為普遍。在做這樣的決定前,必須要確定是存儲格式便是要可讀樣式的,還只是想窺探事件存儲而需要可讀樣式的。如果是后者,Persistence Query也能達到同樣目的,且不會影響存儲效率。
解決方案:創建EventAdapter,將事件轉化為JSON后交給Journal直接保存。前提是必須有適配Akka Persistence的Journal插件支持,這樣數據庫才能直接識別EventAdapter轉化來的JSon對象,然后存儲它。
// act as-if JSON library
class ExampleJsonMarshaller {
def toJson(any: Any): JsObject = JsObject()
def fromJson(json: JsObject): Any = new Object
}
class JsonDataModelAdapter extends EventAdapter {
override def manifest(event: Any): String = ""
val marshaller = new ExampleJsonMarshaller
override def toJournal(event: Any): JsObject =
marshaller.toJson(event)
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case json: JsObject =>
EventSeq(marshaller.fromJson(json))
case _ =>
throw new IllegalArgumentException("Unable to fromJournal a non-JSON object! Was: " + event.getClass)
}
}
替代方案:如果找不到能支持上述方案的Journal插件,使用akka.persistence.journal.AsyncWriteJournal
來自己手動實現一個JSON格式的序列化器,再配合一個EventAdapter實現toJournal與fromJournal也是可行的。
⭕ 情形六:把事件切分為更小的粒度
適用場景:隨着領域分析的深入,需要將原有粗粒度的一個事件切分為更小粒度的若干事件。此處以“用戶信息改變”為例,將其切分為更小粒度的“用戶名改變”“地址改變”等等。
解決方案:依舊是借助EventAdapter,將Journal里保存的一個大事件,切分為若干個小事件,反之亦然。
trait Version1
trait Version2
// V1 event:
final case class UserDetailsChanged(name: String, address: String) extends Version1
// corresponding V2 events:
final case class UserNameChanged(name: String) extends Version2
final case class UserAddressChanged(address: String) extends Version2
// event splitting adapter:
class UserEventsAdapter extends EventAdapter {
override def manifest(event: Any): String = ""
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address))
case UserDetailsChanged(name, null) => EventSeq(UserNameChanged(name))
case UserDetailsChanged(name, address) =>
EventSeq(UserNameChanged(name), UserAddressChanged(address))
case event: Version2 => EventSeq(event)
}
override def toJournal(event: Any): Any = event
}
Persistence Query
🏭 com.typesafe.akka:akka-persistence-query_2.12:2.6.6
Akka Persistence公開了一個基於異步流的查詢接口,使CQRS的讀端(Read-Side)可以利用該接口讀取Journal里保存的事件,從而執行更新UI等任務。但是,Persistence Query不能完全勝任讀端的要求,它通常只能協助應用將數據從寫端遷移到讀端,這也是為了更好的執行效率和可擴展性,而建議讀端與寫端分別使用不同類型數據庫的原因。
💀 寫端通常都是以追加方式寫入帶Id的Event,所以傳統的關系型數據庫MySQL、Oracle或者象LevelDB、Redis這類Key-Value類型的NoSQL數據庫通常會有較好的性能。而在讀端,類似MongoDB這樣文檔類型的NoSQL數據庫會有更大的市場。
考慮到要盡可能保持接口的通用性,Akka Persistence沒有過多干涉Persistence Query的API定義,只要求:每個讀日記(Read Journal)都必須明確表明其支持的查詢類型,並按最常見的場景預定義了一些查詢類型,而由Journal的插件自己去選擇其中的一部分並實現之。
讀日記
ReadJournal都屬於Akka社區插件(🔗 Community Plugins),由用戶自己開發並維護。每個ReadJournal對應一個存儲方案(可以是數據庫,甚至文本文件),且都有一個固定的Id。該Id可以使用類似readJournalFor[NoopJournal](NoopJournal.identifier)
的方式獲取它,但這不是強制的,只是推薦做法。
要使用ReadJournal進行查詢,就需要先獲取它的一個實例(此處的Id即為akka.persistence.query.my-read-journal
):
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal")
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)
// materialize stream, consuming events
source.runForeach { event =>
println("Event: " + event)
}
Akka Persistence的Read Journal API主要包括以下內容:
-
persistenceIds():查詢系統中所有活動的Live PersistenceId,每當有新的Id被創建時也將被加入流當中。
-
currentPersistenceIds():查詢系統中當前的PersistenceId,在執行查詢之后新創建的Id不會被加入流中。
-
eventsByPersistenceId(id, fromSequenceNr = 0L, toSequenceNr = Long.MaxValue):查詢PersistenceId對應Persistent Actor的事件,這有點類似於重塑過程中依次獲取事件的活動,但該查詢流也是活動的,所以多數Journal會采用輪詢的方式確保結果是最新的。此處的SequenceNr用於從指定位置開始查詢事件,這也變相地為“斷點續注”提供了支持。
-
eventTag(tag, offset):查詢指定Tag的所有事件。事件可能來源於多個PersistenceId,而且不能保證其先后順序,除非Journal在反饋結果時提前排好序。
val NumberOfEntityGroups = 10 def tagEvent(entityId: String, event: Event): Set[String] = { val entityGroup = s"group-${math.abs(entityId.hashCode % NumberOfEntityGroups)}" event match { // OrderCompleted類型的事件會額外多一個標簽 case _: OrderCompleted => Set(entityGroup, "order-completed") case _ => Set(entityGroup) } } def apply(entityId: String): Behavior[Command] = { EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("ShoppingCart", entityId), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withTagger(event => tagEvent(entityId, event)) } // assuming journal is able to work with numeric offsets we can: val completedOrders: Source[EventEnvelope, NotUsed] = readJournal.eventsByTag("order-completed", Offset.noOffset) // find first 10 completed orders: val firstCompleted: Future[Vector[OrderCompleted]] = completedOrders .map(_.event) .collectType[OrderCompleted] .take(10) // cancels the query stream after pulling 10 elements .runFold(Vector.empty[OrderCompleted])(_ :+ _) // start another query, from the known offset val furtherOrders = readJournal.eventsByTag("order-completed", offset = Sequence(10))
-
查詢附屬信息:Persistence Query還支持查詢屬於流的附屬信息,不過具體得由Journal實現。
final case class RichEvent(tags: Set[String], payload: Any) // a plugin can provide: order & infinite case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) // Journal提供的查詢附屬信息API def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = ??? // 使用上述API查詢Materialized values val query: Source[RichEvent, QueryMetadata] = readJournal.byTagsWithMeta(Set("red", "blue")) query .mapMaterializedValue { meta => println( s"The query is: " + s"ordered deterministically: ${meta.deterministicOrder}, " + s"infinite: ${meta.infinite}") } .map { event => println(s"Event payload: ${event.payload}") } .runWith(Sink.ignore)
性能與非范式化
在CQRS模式下,讀寫端只要能保證最終的一致性,可以分別擁有不同形式的存儲方案。據此,可以在讀端采取類似數據庫視圖的方式,建立固定結構的物化視圖Materialized Views反復使用,從而提高查詢的效率。這樣的視圖無需嚴格遵守數據庫的范式規則,怎么方便怎么來,比如用文檔類型的NoSQL就不錯。
-
借助兼容🔗 JDK9中Reactive Streams接口的數據庫創建物化視圖:這需要讀端數據存儲支持Reactive Streams接口,這樣Journal直接將寫端數據注入讀端即可。
implicit val system = ActorSystem() val readJournal = PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId) val dbBatchWriter: Subscriber[immutable.Seq[Any]] = ReactiveStreamsCompatibleDBDriver.batchWriter // Using an example (Reactive Streams) Database driver readJournal .eventsByPersistenceId("user-1337", fromSequenceNr = 0L, toSequenceNr = Long.MaxValue) .map(envelope => envelope.event) .map(convertToReadSideTypes) // convert to datatype .grouped(20) // batch inserts into groups of 20 .runWith(Sink.fromSubscriber(dbBatchWriter)) // write batches to read-side database
-
借助mapAsync創建物化視圖:在沒有Reactive Streams支持的情況下,自己動手實現從寫端事件數據庫到讀端數據庫的轉換過程。
// 模擬的讀端數據庫 trait ExampleStore { def save(event: Any): Future[Unit] } val store: ExampleStore = ??? readJournal .eventsByTag("bid", NoOffset) .mapAsync(parallelism = 1) { e => store.save(e) } .runWith(Sink.ignore)
- 實現可恢復的注入:保存好Offset或者SequenceNumber,即可實現“斷點續注”。
def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = { val readJournal = PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId) import system.executionContext implicit val timeout = Timeout(3.seconds) val bidProjection = new MyResumableProjection("bid") bidProjection.latestOffset.foreach { startFromOffset => readJournal .eventsByTag("bid", Sequence(startFromOffset)) .mapAsync(8) { envelope => writer .ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo)) .map(_ => envelope.offset) } .mapAsync(1) { offset => bidProjection.saveProgress(offset) } .runWith(Sink.ignore) } } // 用一個Actor來實際執行注入任務 object TheOneWhoWritesToQueryJournal { sealed trait Command final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command def apply(id: String, store: ExampleStore): Behavior[Command] = { updated(ComplexState(), store) } private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = { Behaviors.receiveMessage { case command: Update => val newState = updateState(state, command) if (state.readyToSave) store.save(Record(state)) updated(newState, store) } } private def updateState(state: ComplexState, command: Command): ComplexState = { // some complicated aggregation logic here ... state } }
查詢插件
無論使用何種數據庫,只要是實現了ReadJournal的插件,都屬於查詢插件Query Plugins,都要公開查詢適用的場景和語義。
讀日記插件
所有的ReadJournal插件都必須實現akka.persistence.query.ReadJournalProvider
,且該Provider必須能同時支持創建Scala與Java版本的ReadJournal實例(akka.persistence.query.scaladsl/javadsl.ReadJournal),其構造子主要有以下4種形式:
- (ExtendedActorSystem, com.typesafe.config.Config, pathOfConfig):Config里是ActorSystem中有關插件的配置,path里是插件自身的配置
- (ExtendedActorSystem, com.typesafe.config.Config)
- (ExtendedActorSystem)
- ()
如果數據庫只支持查詢當前結果集,那么對於這一類無限的事件流,ReadJournal就必須采取輪詢方式反復嘗試讀取更新的事件,此時建議在配置中使用refresh-interval
定義輪詢間隔。
class MyReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
override val scaladslReadJournal: MyScaladslReadJournal =
new MyScaladslReadJournal(system, config)
override val javadslReadJournal: MyJavadslReadJournal =
new MyJavadslReadJournal(scaladslReadJournal)
}
class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
extends akka.persistence.query.scaladsl.ReadJournal
with akka.persistence.query.scaladsl.EventsByTagQuery
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
with akka.persistence.query.scaladsl.PersistenceIdsQuery
with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
private val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval", MILLISECONDS).millis
/**
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match {
case Sequence(offsetValue) =>
Source.fromGraph(new MyEventsByTagSource(tag, offsetValue, refreshInterval))
case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
case _ =>
throw new IllegalArgumentException("MyJournal does not support " + offset.getClass.getName + " offsets")
}
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
// implement in a similar way as eventsByTag
???
}
override def persistenceIds(): Source[String, NotUsed] = {
// implement in a similar way as eventsByTag
???
}
override def currentPersistenceIds(): Source[String, NotUsed] = {
// implement in a similar way as eventsByTag
???
}
// possibility to add more plugin specific queries
def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = {
// implement in a similar way as eventsByTag
???
}
}
class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
extends akka.persistence.query.javadsl.ReadJournal
with akka.persistence.query.javadsl.EventsByTagQuery
with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
with akka.persistence.query.javadsl.PersistenceIdsQuery
with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long = 0L,
toSequenceNr: Long = Long.MaxValue): javadsl.Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
override def persistenceIds(): javadsl.Source[String, NotUsed] =
scaladslReadJournal.persistenceIds().asJava
override def currentPersistenceIds(): javadsl.Source[String, NotUsed] =
scaladslReadJournal.currentPersistenceIds().asJava
// possibility to add more plugin specific queries
def byTagsWithMeta(tags: java.util.Set[String]): javadsl.Source[RichEvent, QueryMetadata] = {
import akka.util.ccompat.JavaConverters._
scaladslReadJournal.byTagsWithMeta(tags.asScala.toSet).asJava
}
}
class MyEventsByTagSource(tag: String, offset: Long, refreshInterval: FiniteDuration)
extends GraphStage[SourceShape[EventEnvelope]] {
private case object Continue
val out: Outlet[EventEnvelope] = Outlet("MyEventByTagSource.out")
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {
lazy val system = materializer.system
private val Limit = 1000
private val connection: java.sql.Connection = ???
private var currentOffset = offset
private var buf = Vector.empty[EventEnvelope]
private val serialization = SerializationExtension(system)
override def preStart(): Unit = {
scheduleWithFixedDelay(Continue, refreshInterval, refreshInterval)
}
override def onPull(): Unit = {
query()
tryPush()
}
override def onDownstreamFinish(): Unit = {
// close connection if responsible for doing so
}
private def query(): Unit = {
if (buf.isEmpty) {
try {
buf = Select.run(tag, currentOffset, Limit)
} catch {
case NonFatal(e) =>
failStage(e)
}
}
}
private def tryPush(): Unit = {
if (buf.nonEmpty && isAvailable(out)) {
push(out, buf.head)
buf = buf.tail
}
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case Continue =>
query()
tryPush()
}
object Select {
private def statement() =
connection.prepareStatement("""
SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload
FROM journal WHERE tag = ? AND id > ?
ORDER BY id LIMIT ?
""")
def run(tag: String, from: Long, limit: Int): Vector[EventEnvelope] = {
val s = statement()
try {
s.setString(1, tag)
s.setLong(2, from)
s.setLong(3, limit)
val rs = s.executeQuery()
val b = Vector.newBuilder[EventEnvelope]
while (rs.next()) {
val deserialized = serialization
.deserialize(rs.getBytes("payload"), rs.getInt("serializer_id"), rs.getString("serializer_manifest"))
.get
currentOffset = rs.getLong("id")
b += EventEnvelope(
Offset.sequence(currentOffset),
rs.getString("persistence_id"),
rs.getLong("seq_nr"),
deserialized)
}
b.result()
} finally s.close()
}
}
}
}
擴展
關於這部分,請參考Lagom框架。Lagom是Lightbend開發的一個微服務框架,里面涉及了ES和CQRS的大量實現,里面已經有成熟的方案,可以借助Cluster Sharding實現有效擴展。
Lagom: The opinionated microservices framework for moving away from the monolith.
Lagom helps you decompose your legacy monolith and build, test, and deploy entire systems of Reactive microservices.
LevelDB實現Persistence Query的示例
📌 LevelDB是Google推出的一款Key-Value類型的NoSQL本地數據庫(暫不支持網絡)。本示例用LevelDB演示了如何用綠黑藍作Tag,然后進行Persistence Query。
LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.
import akka.NotUsed
import akka.testkit.AkkaSpec
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.scaladsl.Source
object LeveldbPersistenceQueryDocSpec {
//#tagger
import akka.persistence.journal.WriteEventAdapter
import akka.persistence.journal.Tagged
class MyTaggingEventAdapter extends WriteEventAdapter {
val colors = Set("green", "black", "blue")
override def toJournal(event: Any): Any = event match {
case s: String =>
var tags = colors.foldLeft(Set.empty[String]) { (acc, c) =>
if (s.contains(c)) acc + c else acc
}
if (tags.isEmpty) event
else Tagged(event, tags)
case _ => event
}
override def manifest(event: Any): String = ""
}
//#tagger
}
class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
def this() = this("")
"LeveldbPersistentQuery" must {
"demonstrate how get ReadJournal" in {
//#get-read-journal
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
//#get-read-journal
}
"demonstrate EventsByPersistenceId" in {
//#EventsByPersistenceId
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] =
queries.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
//#EventsByPersistenceId
}
"demonstrate AllPersistenceIds" in {
//#AllPersistenceIds
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val src: Source[String, NotUsed] = queries.persistenceIds()
//#AllPersistenceIds
}
"demonstrate EventsByTag" in {
//#EventsByTag
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] =
queries.eventsByTag(tag = "green", offset = Sequence(0L))
//#EventsByTag
}
}
}
相應的配置如下:
# Configuration for the LeveldbReadJournal
akka.persistence.query.journal.leveldb {
# Implementation class of the LevelDB ReadJournalProvider
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
# Absolute path to the write journal plugin configuration entry that this
# query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# If undefined (or "") it will connect to the default journal as specified by the
# akka.persistence.journal.plugin property.
write-plugin = ""
# The LevelDB write journal is notifying the query side as soon as things
# are persisted, but for efficiency reasons the query side retrieves the events
# in batches that sometimes can be delayed up to the configured `refresh-interval`.
refresh-interval = 3s
# How many events to fetch in one query (replay) and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
持久化插件 Persistence Plugins
持久化插件Persistence Plugins為數據庫存儲事件和快照提供支持,要注意與查詢插件Query Plugins區別。由Akka團隊負責維護的持久化插件包括:
- akka-persistence-cassandra
- akka-persistence-couchbase
- akka-persistence-jdbc
在Persistent Actor沒有覆寫journalPluginId和snapshotPluginId的情況下,Akka將使用在reference.conf中akka.persistence.journal.plugin
和akka.persistence.snapshot-store.plugin
中配置的默認日記和快照存儲插件。若配置留空,則需要在application.conf中明確指定。
如果需要遲早加載持久化插件,則需按如下配置:
akka {
extensions = [akka.persistence.Persistence]
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
auto-start-journals = ["akka.persistence.journal.leveldb"]
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
}
}
}
LevelDB Plugin使用示例
配置文件指定啟用LevelDB Plugin作為Persistence Plugin,並指定數據庫文件存放位置(默認是當前工作目錄下的journal文件夾,快照是snapshots文件夾):
# Path to the journal plugin to be used
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal"
# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
Gradle包管理加入LevelDB Plugin
dependencies {
implementation org.fusesource.leveldbjni:leveldbjni-all:1.8
}
設定LevelDB的持久化參數,此處主要是設定每到哪個id,就通過刪除事件而保留快照的方式(並非真正刪除,而是給事件打上邏輯標志,使之成為“墓碑”),實現壓縮數據庫的功能:
# Number of deleted messages per persistence id that will trigger journal compaction
akka.persistence.journal.leveldb.compaction-intervals {
persistence-id-1 = 100
persistence-id-2 = 200
# ...
persistence-id-N = 1000
# use wildcards to match unspecified persistence ids, if any
"*" = 250
}
僅供測試的共享LevelDB
Akka內置了用於測試的可共享的LevelDB實例,啟用配置如下:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
在使用前,還必須用SharedLeveldbJournal.setStore注入一個Actor完成初始化:
import akka.persistence.journal.leveldb.SharedLeveldbStore
trait SharedStoreUsage extends Actor {
override def preStart(): Unit = {
context.actorSelection("akka://example@127.0.0.1:2552/user/store") ! Identify(1)
}
def receive = {
case ActorIdentity(1, Some(store)) =>
SharedLeveldbJournal.setStore(store, context.system)
}
}
// 然后就能正常使用了
val store = system.actorOf(Props[SharedLeveldbStore], "store")
僅供測試的持久化插件代理
Akka還內置了用於測試的Persistence Plugin Proxy,它通過定向轉發整合若干個Journal,從而讓多個Actor共享底層的持久化支持,在一個Journal節點崩潰后,也能通過其災備節點繼續為Actor提供事件存儲和快照支持。代理的啟動,則可以通過實例化PersistencePluginProxyExtension擴展或調用PersistencePluginProxy.start方法來完成。
# 配置信息需放入相應配置塊
akka.persistence.journal.proxy { ... }
akka.persistence.snapshot-store.proxy { ... }
# 指定底層的Journal
target-journal-plugin = akka.persistence.journal.leveldb
target-snapshot-store-plugin = ...
# 指定用於初始化代理的ActorSystem
start-target-journal = xxx.xxx.myActor
start-target-snapshot-store = ...
## 指定Actor的位置(也可以在初始化代碼中調用PersistencePluginProxy.setTargetLocation指定)
target-journal-address =
target-snapshot-store-address =
建造持久化后端
為自定義持久化事件的后端數據庫支持,Akka Persistence公開了一組API。
🏭
import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._
Journal插件API:AsyncWriteJournal
🔗 AsyncWriteJournal本質也是一個Actor,公開的方法只有以下3個:
/**
* Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the
* journal.
*
* The batch is only for performance reasons, i.e. all messages don't have to be written
* atomically. Higher throughput can typically be achieved by using batch inserts of many
* records compared to inserting records one-by-one, but this aspect depends on the
* underlying data store and a journal implementation can implement it as efficient as
* possible. Journals should aim to persist events in-order for a given `persistenceId`
* as otherwise in case of a failure, the persistent state may be end up being inconsistent.
*
* Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to
* the event that was passed to the `persist` method of the `PersistentActor`, or it
* contains several `PersistentRepr` that corresponds to the events that were passed
* to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the
* `AtomicWrite` must be written to the data store atomically, i.e. all or none must
* be stored. If the journal (data store) cannot support atomic writes of multiple
* events it should reject such writes with a `Try` `Failure` with an
* `UnsupportedOperationException` describing the issue. This limitation should
* also be documented by the journal plugin.
*
* If there are failures when storing any of the messages in the batch the returned
* `Future` must be completed with failure. The `Future` must only be completed with
* success when all messages in the batch have been confirmed to be stored successfully,
* i.e. they will be readable, and visible, in a subsequent replay. If there is
* uncertainty about if the messages were stored or not the `Future` must be completed
* with failure.
*
* Data store connection problems must be signaled by completing the `Future` with
* failure.
*
* The journal can also signal that it rejects individual messages (`AtomicWrite`) by
* the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce
* number of allocations by returning `Future.successful(Nil)` for the happy path,
* i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding
* `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
* a message means it was not stored, i.e. it must not be included in a later replay.
* Rejecting a message is typically done before attempting to store it, e.g. because of
* serialization error.
*
* Data store connection problems must not be signaled as rejections.
*
* It is possible but not mandatory to reduce number of allocations by returning
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
*
* Calls to this method are serialized by the enclosing journal actor. If you spawn
* work in asynchronous tasks it is alright that they complete the futures in any order,
* but the actual writes for a specific persistenceId should be serialized to avoid
* issues such as events of a later write are visible to consumers (query side, or replay)
* before the events of an earlier write are visible.
* A PersistentActor will not send a new WriteMessages request before the previous one
* has been completed.
*
* Please note that the `sender` field of the contained PersistentRepr objects has been
* nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal
* for a sender reference that will likely be obsolete during replay.
*
* Please also note that requests for the highest sequence number may be made concurrently
* to this call executing for the same `persistenceId`, in particular it is possible that
* a restarting actor tries to recover before its outstanding writes have completed. In
* the latter case it is highly desirable to defer reading the highest sequence number
* until all outstanding writes have completed, otherwise the PersistentActor may reuse
* sequence numbers.
*
* This call is protected with a circuit-breaker.
*/
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
/**
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
* (inclusive).
*
* This call is protected with a circuit-breaker.
* Message deletion doesn't affect the highest sequence number of messages,
* journal must maintain the highest sequence number and never decrease it.
*/
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
/**
* Plugin API
*
* Allows plugin implementers to use `f pipeTo self` and
* handle additional messages for implementing advanced features
*
*/
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
如果想讓Journal只支持同步寫入,那么按如下方式阻塞掉異步的寫入即可:
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
Future.fromTry(Try {
// blocking call here
???
})
Journal還必須實現AsyncRecovery中定義的用於重塑和序列號恢復的方法:
/**
* Plugin API: asynchronously replays persistent messages. Implementations replay
* a message by calling `replayCallback`. The returned future must be completed
* when all messages (matching the sequence number bounds) have been replayed.
* The future must be completed with a failure if any of the persistent messages
* could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` method must return
* `true`.
*
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
* This does imply that this call is always preceded by reading the highest sequence
* number for the given `persistenceId`.
*
* This call is NOT protected with a circuit-breaker because it may take long time
* to replay all events. The plugin implementation itself must protect against
* an unresponsive backend store and make sure that the returned Future is
* completed with success or failure within reasonable time. It is not allowed
* to ignore completing the future.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param max maximum number of messages to be replayed.
* @param recoveryCallback called to replay a single message. Can be called from any
* thread.
*
* @see [[AsyncWriteJournal]]
*/
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit]
/**
* Plugin API: asynchronously reads the highest stored sequence number for the
* given `persistenceId`. The persistent actor will use the highest sequence
* number after recovery as the starting point when persisting new events.
* This sequence number is also used as `toSequenceNr` in subsequent call
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
* Journal must maintain the highest sequence number and never decrease it.
*
* This call is protected with a circuit-breaker.
*
* Please also note that requests for the highest sequence number may be made concurrently
* to writes executing for the same `persistenceId`, in particular it is possible that
* a restarting actor tries to recover before its outstanding writes have completed.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number. When a persistent actor is recovering this
* `fromSequenceNr` will be the sequence number of the used
* snapshot or `0L` if no snapshot is used.
*/
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
編碼完成后,通過配置即可啟用Journal,但切記不能在默認Dispatcher上執行Journal的任務或者Future,否則會造成其他Actor陷入飢餓:
# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"
# My custom journal plugin
my-journal {
# Class name of the plugin.
class = "docs.persistence.MyJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
Snapshot插件API:SnapshotStore
🔗 SnapshotStore也是一個Actor:
/**
* Plugin API: asynchronously loads a snapshot.
*
* If the future `Option` is `None` then all events will be replayed,
* i.e. there was no snapshot. If snapshot could not be loaded the `Future`
* should be completed with failure. That is important because events may
* have been deleted and just replaying the events might not result in a valid
* state.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for loading.
*/
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/**
* Plugin API: asynchronously saves a snapshot.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/**
* Plugin API: deletes the snapshot identified by `metadata`.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
*/
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
/**
* Plugin API
* Allows plugin implementers to use `f pipeTo self` and
* handle additional messages for implementing advanced features
*/
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
類似Journal,編碼完成后通過配置即可啟用Snapshot插件:
# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"
# My custom snapshot store plugin
my-snapshot-store {
# Class name of the plugin.
class = "docs.persistence.MySnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}
插件開發輔助工具 TCK
Akka開發了TCK(Technology Compatibility Kit),用於插件的測試。
🏭 com.typesafe.akka:akka-persistence-tck_2.12:2.6.6
以下分別是Journal與Snapshot必備的測試。如果插件需要一些額外的設置,比如啟動模擬數據庫、刪除臨時文件等,那么可以覆寫beforeAll和afterAll方法:
class MyJournalSpec
extends JournalSpec(
config = ConfigFactory.parseString("""akka.persistence.journal.plugin = "my.journal.plugin"""")) {
override def supportsRejectingNonSerializableObjects: CapabilityFlag =
false // or CapabilityFlag.off
override def supportsSerialization: CapabilityFlag =
true // or CapabilityFlag.on
}
class MySnapshotStoreSpec
extends SnapshotStoreSpec(
config = ConfigFactory.parseString("""
akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
""")) {
override def supportsSerialization: CapabilityFlag =
true // or CapabilityFlag.on
}
損壞的事件日志
如果無法阻止用戶同時運行具有相同persistenceId的Actor,則事件日志Log可能會因具有相同序列號的事件而被破壞。建議Journal在重塑過程中仍繼續傳遞這些事件,同時使用reply-filter來決定如何處理。
➡️ 其他
打包
使用Gradle打包,主要借助其Java插件的Jar任務來完成。為了保證多個reference.conf正確合並,推薦使用Gradle插件🔗 Shadow plugin,然后在build.gradle里這樣寫:
import com.github.jengelman.gradle.plugins.shadow.transformers.AppendingTransformer
plugins {
id 'java'
id "com.github.johnrengelman.shadow" version "5.0.0"
}
shadowJar {
transform(AppendingTransformer) {
resource = 'reference.conf'
}
with jar
}
以Docker包的形式發布
在Docker容器中,可以同時使用Akka Remoting和Akka Cluster,但要注意配置好網絡(🔗 Akka behind NAT or in a Docker container),並適當調整可用的CPU、內存等資源。
書籍與視頻
🔗 https://doc.akka.io/docs/akka/current/additional/books.html
Akka API
🔗 https://doc.akka.io/api/akka/2.6/index.html