akka框架——異步非阻塞高並發處理框架


akka actor, akka cluster

akka是一系列框架,包括akka-actor, akka-remote, akka-cluster, akka-stream等,分別具有高並發處理模型——actor模型,遠程通信,集群管理,流處理等功能。

akka支持scala和java等JVM編程語言。

akka actor

akka actor是一個actor模型框架。actor模型是一種將行為定義到actor,actor間通過消息通信,消息發送異步進行,消息處理(在actor內)同步有序進行的一種高並發、非阻塞式編程模型。

Actor模型優:

  • event-driven model 事件驅動
  • strong isolation principles - 強隔離原則。 actor只應該處理消息,不該有其他方法接口,不保存狀態、不共享狀態
  • location transparency acto的物理位置對用戶不可見,用戶看到actor視圖邏輯上是一致的,盡管物理位置不同
  • lightweight 輕量級

actor的層級結構;actor名字與路徑、地址;actor的消息收件箱;發送消息的異步性;actor消息處理的有序性;actor按序挨個處理消息(而非並發);

case class MsgA(data:Type)
case class MsgB(x:X)

class SomeActor extends Actor {
    def receive()={
        case MsgA(d)=>
        case MsgB(x)=>
    }
}

val system=ActorSystem("sysname")
val act:ActorRef=system.actorOf(p:Props[], "act-name")
act ! MsgA("data")
act ! MsgB(xxx)

.tell() :Fire-Forget

.ask() : Send-And-Receive-Future

阻塞程序等待消息返回結果:

import akka.pattern._
import scala.concurrent._
implicit val akkaAskTimeout:Timeout = Timeout(5 seconds)
val awaitTimeout= 10 seconds
val res=Await.result(actor ? MessageXxx, awaitTimeout)
println(res)

akka programming general rules: (一般編碼規則,該建議來自官網)

  • messages with good names, rich semantic, domain specific
  • imuutable messages
  • put actor's associated messages in its companion object
  • a .props() : Props[?] method in actor companion object to construct the actor

.actorOf()創建Actor,返回ActorRef。
.actorSelect()查找actor,返ActorRef。

akka-remote

配置鍵akka.remote.netty.tcp.hostname定義remoting模塊通信的網口,akka.remote.netty.tcp.port定義通信端口。通信網口在未配置或配置為空串時(不能配置為null)默認監聽局域網網口(不是回環網口127.0.0.1)。通信端口在未配置默認監聽2552端口,在配置為0時會監聽一個隨機端口。網口名與ActorSystem地址中的主機名嚴格對應,不能試圖以回環地址連接本機上監聽局域網網口的actor。比如,本機上運行的單節點集群,集群即只有自身,其集群種子節點配置為akka.cluster.seed-nodes=["akka.tcp://xx@127.0.0.1:2551"],則其remoting通信網口配置akka.remote.netty.tcp.hostname只能是127.0.0.1,不能是局域網網口。

……

actor的創建和部署不單只是在本地,還有可能涉及遠程部署(如集群)。涉及遠程部署時牽涉到序列化問題。

如果能確信actor的創建只會涉及本地,則可通過配置關閉actor創建器的序列化行為akka.actor.serialize-creators=off(默認關閉)。

akka-cluster

akka cluster是……集群,作用……,特點……TODO

akka cluster由多個ActorSystem(節點)構成,actorsystem根據配置指定的種子節點組建集群。

論及集群時,所謂節點不一定指一台物理機,一般指ActorSystem實例,不同的ActorSystem實例對應不同的<主機+端口>。一台物理機可運行多個ActorSystem實例。論及集群成員時,這里的成員指的是節點,不是集群內的actor。

集群種子節點配置akka.cluster.seed-nodes中的節點不必都啟動,但節點列表中第一個必須啟動,否則其他節點(不論是否在seed-nodes中)不能加入集群。也就說,如果不啟動列表第一個節點,啟動多個其他節點,不能組成集群(也就不能產生/接收到集群事件),直到啟動列表第一個節點才將已啟動的多個節點組成集群。種子列表第一個節點可組建一個只有自己的集群。
任意節點均可加入集群,並非一定得是種子節點。節點加入的只能是集群(不能是另一個節點,即聯系的其他節點必須處於集群中),也就說有個節點得先組建一個僅包含自己的集群,以使得其他節點有集群可加入。
集群內節點ActorSystem的名字要求一致,由組建過程可知,即種子列表第一個節點的名字。

如果集群含超過2個節點,那么列表第一個節點可以不存活。如果種子節點全都同時不存活,那么以相同配置再次啟動節點將組建不同於以前的新集群,即不能進入以前的集群。

通過接口Cluster.joinSeedNodes(List[Address])可動態添加種子節點,節點可通過接口cluster.join(Address)加入集群。

配置項akka.cluster.seed-node-timeout指啟動中的節點試圖聯系集群(種子節點)的超時時間,如果超時,將在akka.cluster.retry-unsuccessful-join-after指定的時間后再次重試聯系,默認無限次重試聯系直到聯系上,通過配置akka.cluster.shutdown-after-unsuccessful-join-seed-nodes指定一個超時時間使得本節點在聯系不上集群種子節點超過該時間后不再繼續聯系,終止聯系后執行擴展程序CoordinatedShutdown以停掉本節點actor相關行為(即關停ActorSystem),如果設置akka.coordinated-shutdown.terminate-actor-system = on(默認開啟)將導致擴展程序關停ActorSystem后退出JVM。

配置項akka.cluster.min-nr-of-members指定集群要求的最小成員個數。

集群成員狀態

集群成員在加入集群、存在於集群、到退出集群整個生命周期中的變化有對應狀態,狀態包括joining, up, leaving, exiting, remoed, down, unreachable。。TODO

集群成員狀態轉移圖:
集群成員狀態轉移圖
圖片來源:akka官網

圖中的框表示成員狀態,邊表示驅動狀態轉換的相關操作。"(leader action)"表示該操作由集群首領驅使完成。“(fd*)”表示該操作由系統失敗檢測器(failure detector)驅使完成,失敗檢測器是一個監測集群成員通信狀態的后台程序。

網絡中的節點隨時可能無法通信(通信不可達),針對這個問題,集群系統設有失敗檢測器(Failure Detector),發現成員異常不可達時將廣播UnreachableMember事件。需要在程序中顯式調用接口Cluster.down(Address)來改變成員狀態為Down,集群之后會廣播MemberDowned事件。節點正常退出時不會轉入unreachable狀態,而是進入leaving(事件MemberLeft)。

自動down:通過配置可使成員自動從unreachable轉入down,通過配置akka.cluster.auto-down-unreachable-after開啟並指定成員自動從狀態unreachable轉入down的時長,官方提供此功能僅為測試,並告知不要在生產環境使用。

集群事件

集群可訂閱的事件(cluster.subscribe(,Class[_]*)方法)要求事件類型實參是ClusterDomainEvent類型的(ClusterDomainEvent或其子類),ClusterDomainEvent是一個標記型接口。事件相關的類型定義在akka.cluster.ClusterEvent中,ClusterEvent只是object,沒有伴生對象。

伴隨集群成員生命周期的事件:

  • MemberJoined - 新節點加入,其狀態被改為了 Joining。
  • MemberUp - 新節點成功加入,成為了集群成員,其狀態被改為了 Up。
  • MemberExited - 集群成員正要離開集群,其狀態被改為了 Exiting。注意當其他節點收到此事時,事件主體節點可能已經關停。
  • MemberDowned - 集群成員狀態為down。
  • MemberRemoved - 節點已從集群移除。
  • UnreachableMember - 成員被認為不可達(失敗檢測器詢問完其他所有成員均認為該節點不可達)。
  • ReachableMember - 之前的不可達成員重新變得可達。事件的發生要求所有之前認為該節點不可達的成員現在都可達該節點。

WeaklyUp事件:當部分節點不可達時gossip不收斂,沒有集群首領(leader),首領行為無法實施,但此時我們仍希望新節點可加入集群,這時需要WeaklyUp狀態特性。特性使得,如果集群不能達到gossip收斂,Joining成員將被提升為WeaklyUp,成為集群的一部分,當gossip收斂,WeaklyUp成員轉為Up,可通過配置akka.cluster.allow-weakly-up-members = off關閉這種特性。

CurrentClusterState:新節點加入集群后,在其收到任何集群成員事件前,集群會向其推送一個CurrentClusterState消息(此class並非ClusterDomainEvent子類,即不能成為cluster.subscribe()中事件類型的參數),表示新成員加入時(訂閱集群事件時)集群中的成員狀態快照信息,特別地,MemberUp事件伴隨的集群狀態快照可能沒有任何集群成員,避免此情況下CurrentClusterState事件被發送的方法是在cluster.registerOnMemberUp(...)參數中提交集群事件訂閱行為。接收CurrentClusterState消息是基於訂閱時參數cluster.subscribe(,initialStateMode=InitialStateAsSnapshot,)的條件下,CurrentClusterState將成為actor接收到的第一條集群消息,可不接收快照,而是將現有成員的存在視為相關事件,這時應使用訂閱參數(initialStateMode=InitialStateAsEvents)。

val cluster = Cluster(context.system)
cluster.registerOnMemberUp{
  cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
}

Terminated消息是被監控(.watch())節點在被下線(down)或移除(remove)后發送的消息,類繼承結構上不是集群事件(ClusterDomainEvent)。

actor需訂閱事件才能接收到(cluster.subscribe(self,,Class[EvenetType]*)),接收到的消息包括訂閱時提供的事件類型的子類。

actor接收到的事件有時是基於自己看到的集群狀態的,不是所有actor收到的事件都相同,尤其注意,當節點自己停機時,會收到集群內所有節點(包括自己以及其他沒停機節點)的MemberRemoved事件,也就相當於在actor自己看來這個集群停機了。而其他存活的節點只會收到停機節點的MemberRemoved事件。有時自己退群又只收到自己的MemeberLeft事件,而且沒有MemberRemoved事件,沒有其他成員的離群事件。(意外出現過,需重現以及分析原因)

節點獲取自身地址的方式:

val cluster=Cluster(actorSystem)
cluster.selfAddress     // 節點自身地址,信息包括協議+system名+主機+端口
cluster.selfUniqueAdress    //節點自身唯一性地址,信息包括自身地址+自身UID

節點角色 node roles

集群節點可標記若干自定義角色類型,通過配置項akka.cluster.roles指定。

每種角色群有一個首領節點,以執行角色相關操作,一般無需感知。

配置項akka.cluster.role.<role-name>.min-nr-of-members定義集群中角色為 的節點的最小個數。

集群內單例 Cluster Singleton:保證某種Actor類在集群內或集群內某種角色群中只有一個實例。活躍的單例是集群的成員,是成員即可被移出集群,被移出的單例其類型在被管理器創建補充之前活躍單例前存在一小段實例缺失時間,這期間單例的集群成員不可達狀態會被失敗檢測器檢測到。

單例模式需要注意的問題:

  • 性能瓶頸
  • 單點故障
  • 不能假設單例是不間斷可用的,因為在活躍單例失效后,新的活躍單例工作前的一小段時間內單例不可用。

單例工具依賴庫“com.typesafe.akka:akka-cluster-tools”。
創建單例依賴類ClusterSingletonManager,是一個actor,需在集群內所有節點上(盡早)啟動,訪問單例依賴類ClusterSingletonProxy

給單例actor發送消息時,查找的ActorRef應是單例代理,而不是單例管理器。

集群節點分身 Cluster Sharding:分發actor到多個節點,邏輯標識符是同一個,不用關心其實際位置。

分布式訂閱發布 Distributed Publish Subscribe:僅通過邏輯路徑實現集群內actor間的訂閱發布通信、點對點通信,不必關心物理位置。

集群客戶端 Cluster Client:與集群通信的外部系統稱為集群的客戶端。

借助akka工具包com.typesafe.akka:akka-cluster-tools,可實現集群客戶端與集群間的通信。在集群中的節點上調用ClusterClientReceptionist(actorSystem).registerService(actorRef)將actorRef實例
注冊為負責連接外部系統通信請求的接待員,在外部系統中利用 val c = system.actorOf(ClusterClient.props(ClusterClientSettings(system).withInitialContacts(<集群接待員地址列表,可配到配置文件>)))獲得能與集群接待員通信的ActorRef,再通過c ! ClusterClient.Send("/user/some-actor-path", msg)發送消息到集群。

集群客戶端配置集群接待員通過指定akka.cluster.client.initial-contacts數組完成,一個接待員地址包含集群成員地址(集群中任意一個可通信成員)和接待員在集群成員ActorSystem中的路徑(系統生成的接待員的服務地址在/system/receptionist),如["akka.tcp://my-cluster@127.0.0.1:2552/system/receptionist"]

集群節點協議不能是akka,需是akka.tcp,配置項akka.remote下需配置屬性netty.tcp,而不是artery(對應akka協議),否則集群客戶端連接時會因Connection reset by peer而失敗(當然,集群節點akka.actor.provider顯然還是cluster)。

集群節點路由 Cluster Aware Routers:允許路由器對集群內的節點進行路由,自動管理路由對象表(routees),涉及相關成員的進群和退群行為。

兩種路對象(routee)管理策略的路由類型,Group和Pool:

  • Goup 群組 共用集群成員節點作為路由對象(routee)。
    router轉發策略有多種,對於消息一致性轉發的router來說轉的消息必須可一致性散列(ConsistentHashable),或者用ConsitentHashEnvolope包裝消息使其變得可一致散列,否則routee收不到消息,而定義routee的接收方法receive時,得到的數據對象是拆包了的,也就說如果路由器轉發的消息經過一致性散列信封包裝,routee得到的消息已被自動去掉信封。

  • Pool 池 每個router各自自動創建、管理自己的的routees。
    池中的routee actor消亡不會引發路由器自動創建一個來替補,路由器將在所有routee actor消亡會隨即消亡,動態路由器,如使用了數量調整策略的路由器,會改變這種行為,動態調整routee。

    akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node定義每個節點上routee的個數上限(默認1個),由於全部routee在節點啟動時即被啟動(而不是按需延遲啟動),因此該上限同時定義了節點上的routee個數。
    router在集群中可以有多個,同一邏輯路徑下也允許有多個router(即非單例router,就像actor)。單節點routee個數配置max-nr-of-instances-per-node是對每個router而言,也就說,如果集群router個數是m,其類型、配置相同,單節點routee個數為n,集群節點數為c,集群中該類型router管理的routee類型實例數為m*n*c

集群管理 Management:通過HTTP、JMX查看管理集群。





集群相關例子(包含集群、集群事件、集群客戶端、集群節點路由、集群內單例、角色):

/*集群有一個單例Master,有路由功能,管理多個WorkerActor,負責接收集群外部消息以及轉發給worker。集群外部系統(集群客戶端)給集群內master發送消息。
*/
//對應.conf配置文件內容在代碼后面
//↓↓↓↓↓↓↓↓↓↓集群節點程序代碼↓↓↓↓↓↓↓↓↓(對應配置文件node.conf)
//程序入口、創建actor system、創建&啟動節點worker、創建
object ClusterNodeMain {
  def main(args: Array[String]): Unit = {
    val conf=ConfigFactory.load("node")
    val system=ActorSystem("mycluster", conf)
    val log = Logging.getLogger(system, this)
    val manager = system.actorOf(
      ClusterSingletonManager.props(Props[Master],
        PoisonPill,
        ClusterSingletonManagerSettings(system).withRole("compute")),
      "masterManager")  //這里的名字要和配置中部署路徑中的保持一致
    log.info(s"created singleton manager, at path: {}", manager.path)
    val proxy = system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/masterManager",
      settings = ClusterSingletonProxySettings(system).withRole("compute")),
      name = "masterProxy")  //單例代理類,准備讓單例接收的消息都應發到代理
    ClusterClientReceptionist(system).registerService(proxy)
    log.info("created singleton proxy, at path: {}", proxy.path)
  }
}
class Master extends Actor with ActorLogging{
  //下述FromConf.xxx從配置文件中讀取部署配置,本Master actor我們以單例模式創建,其路徑為/user/masterManager/singleton,因此部署的配置鍵為"/masterManager/singleton/workerRouter"(配置鍵需省掉/user)
  private val router = context.actorOf(FromConfig.props(Props[WorkerActor]), name = "workerRouter")
  override def receive: Receive = {
    case x =>
      log.info(s"got message on master, msg: $x, actor me: ${self.path}")
      router forward ConsistentHashableEnvelope(x, x) //轉發給路由器,路由器將選擇一個worker讓其接收消息
  }
}
class WorkerActor extends Actor with ActorLogging {
  private val cluster = Cluster(context.system)
  override def preStart(): Unit = {
//    actor成員需向集群訂閱事件才能接收到集群事件消息
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[ReachabilityEvent])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)
  def receive:Receive = {
    case MemberWeaklyUp(member)=>
      log.info(s"Member is WeaklyUp: $member, actor me: $self")
    case MemberUp(member) =>  //集群成員狀態剛設為了Up
      log.info(s"Member is Up: $member, actor me: $self")
    case MemberJoined(member)=>  //集群成員狀態剛設為了Joining
      log.info(s"Member is Joined: $member, actor me: $self")
    case MemberLeft(member)=>  // 狀態剛設為了leaving
      log.info(s"Member is Left: $member, actor me: $self")
    case MemberExited(member)=>  // 成員自己正常退出
      log.info(s"Member is Exited: $member, actor me: $self")
    // 狀態成了Down,down狀態一般由unreachable狀態之后轉移過來,
    // 由編程者自己顯式設置cluster.down(member)
    // (auto-down特性可自動轉移unreachable成員到down,官方不建議在生成環境中啟用)
    case MemberDowned(member)=>
      log.info(s"Member is Downed: $member, actor me: $self")
    case MemberRemoved(member, previousStatus) => //成員被移出集群
      log.info(s"Member is Removed: $member after $previousStatus, actor me: $self")
    case UnreachableMember(member) => //failure-detector檢測器發現了一個通信不可達的成員
      log.info(s"Member detected as unreachable: $member, actor me: $self")
      cluster.down(member.address)
//      context.actorSelection(RootActorPath(member.address) / "user" / "otherActor") ! SomeMessage
    //共7種MemberEvent
    case ReachableMember(member) => //不可達成員重新變得可達
      log.info(s"Member is reachable again: $member, actor me: $self")

    case n:Int =>
      log.info(s"got an int: $n, actor me: ${self.path}")
    case s:String =>
      log.info(s"sender said: $s, actor me: ${self.path}")
    case any=>
      log.info(s"what is it? :$any, actor me: ${self.path}")
  }
}
//↑↑↑↑↑↑↑↑↑↑↑集群節點程序代碼↑↑↑↑↑↑↑↑↑↑在將部署的節點機器上運行(注意修改對應hostname&port配置)

//↓↓↓↓↓↓↓↓↓↓集群客戶端點程序代碼(另一個項目)↓↓↓↓↓↓↓↓↓↓(對應配置文件client.conf)
object ClusterClientMain {
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load("client")
    val system = ActorSystem("myclient", conf) //actor system名字隨意,和集群名無關
    val clusterClient = system.actorOf(ClusterClient.props(ClusterClientSettings(system)), "clusterClient")

    // 向集群某個路徑下的actor發送消息,對應路徑的actor需在集群端提前向集群接待員注冊好服務
    clusterClient ! ClusterClient.Send("/user/masterProxy", 100, localAffinity = true)
    // 注意,不是簡單的:   clusterClient ! 100
    clusterClient ! ClusterClient.Send("/user/masterProxy", 1, localAffinity = true)
    clusterClient ! ClusterClient.Send("/user/masterProxy", "Hi", localAffinity = true)

    println("main done")
  }
}

節點程序和客戶端程序使用的配置文件分別如下:

//↓↓↓↓↓↓↓↓↓↓  node.conf
akka {
  actor {
    provider = "cluster"  //有3種provider:local, remote, cluster,集群成員actor用cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {   //使用artery即可組建集群系統(對應協議akka://),但只有使用netty.tcp(對應協議akka.tcp://)才能和集群外部通信
      hostname = "127.0.0.1"  //節點主機
      port = 2551             //節點actor system的端口。主機和端口根據部部署的機器及想要對外開放的端口而變
    }
  }
  cluster {
    seed-nodes = [  //種子節點必須啟動第一個節點,其他種子節點不要求一定啟動,加入集群的節點不限於必須在種子列表中(但必須設置正確的種子節點以便能連接進集群)
    //節點部署完全不必在同個主機
      "akka.tcp://mycluster@127.0.0.1:2551",
    // "akka.tcp://mycluster@127.0.0.1:2552"
    ]
    roles=["compute"]  //為集群中這一類actor打上一種自定義標簽
    // 官方不建議生產環境中啟用auto-down特性。
    //auto downing is NOT safe for production deployments.
    // you may want to use it during development, read more about it in the docs.
    // auto-down-unreachable-after = 10s
  }
}

akka.extensions=[
  "akka.cluster.client.ClusterClientReceptionist"  //集群客戶端接待員擴展
]

akka.actor.deployment {  //想要部署的actor的路徑作為配置鍵
  "/masterManager/singleton/workerRouter" {  //路由器actor路徑作為配置鍵,配置路由器相關屬性
    router = consistent-hashing-pool
    cluster {
      enabled = on
      max-nr-of-instances-per-node= 2
      allow-local-routees = on
      use-role = "compute"
    }
  }
}

//↓↓↓↓↓↓↓↓↓↓  client.conf
akka {
  actor {
    provider = remote // 我們的客戶端是獨自成普通actor,沒有建新集群,設為remote以提供對外遠程通信
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      //      hostname = ""  // 集群客戶端部署的主機地址(默認本機局域網地址)
      //      port = 0    //集群客戶端actor system的端口,可以不配置(akka系統自動分配端口)
    }
  }
}
akka.cluster.client {
    initial-contacts = [  //接待員地址。 集群成員中創建了集群客戶端接待員的任意節點,地址中actor system名字是集群名,后面的/system/receptionist是固定的(系統自動創建的接待員)
    "akka.tcp://mycluster@127.0.0.1:2551/system/receptionist", #不要求非得是第一個成員或接待員注冊的ActorRef服務所在的節點
    //    "akka.tcp://mycluster@127.0.0.1:2552/system/receptionist"
  ]
}

akka extension

An extension is a singleton instance created per actor system.

集成spring/spring-boot

被spring容器管理的Actor類上必須標注@Scope("prototype")。(@Scope("prototype")表示spring容器中每次需要時生成一個實例,是否影響集群內單例? <== 不影響)

cluster中涉及actor遠程部署,可能會因SpringApplicationContext不能序列化而失敗,可將其設為靜態變量,在spring啟動后手動初始化,以提供spring管理器。

示例代碼:TODO


常見問題

  1. 運行時程序異常終止,提示akka.version屬性沒有配置問題
    異發生在初始化ActorSystem過程中,初始化時會通過加載reference.conf配置文件讀取該屬性,在運行時沒能讀取到該屬性導致異常。akka-actor jar中有reference.conf、version.conf文件,reference.conf在文件首通過“incluce version"語法導入version.conf,version.conf中定義了akka.version屬性,ActorSystem本可讀到,但由於akka系列其他jar包中也有reference.conf,內容不同akka-actor中的,也沒有導入version.conf,而打包時akka-actor的reference.conf(有可能)被其他jar包中的reference.conf覆蓋,從而導致reference.conf中沒有akka.version屬性,進而導致程序異終止。

    解決方案:定義打包時資源文件處理行為,不覆蓋reference.conf,合並所有jar包中reference.conf文件內容到一個文件。如下定義pom.xml中shade插件的行為:

<plugin>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.2.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals><goal>shade</goal></goals>
            <configuration>
                <transformers>
                    <!--合並資源文件,而不是默認的覆蓋-->
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>reference.conf</resource>
                    </transformer>
  1. Spring集成akka-cluster,部署多個節點時主節點打印序列化相關ERROR日志。
ERROR   akka.remote.EndpointWriter               : Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. Transient association error (association remains live)

akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:62) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.EndpointWriter.$anonfun$serializeMessage$1(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) ~[scala-library-2.12.8.jar!/:na]
	at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:768) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.actor.ActorCell.invoke(ActorCell.scala:557) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
Caused by: java.io.NotSerializableException: No configured serialization-bindings for class [org.springframework.context.annotation.AnnotationConfigApplicationContext]
	at akka.serialization.Serialization.serializerFor(Serialization.scala:320) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.serialization.Serialization.findSerializerFor(Serialization.scala:295) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.serialization.DaemonMsgCreateSerializer.serialize(DaemonMsgCreateSerializer.scala:184) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.serialization.DaemonMsgCreateSerializer.$anonfun$toBinary$1(DaemonMsgCreateSerializer.scala:76) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at scala.collection.immutable.List.foreach(List.scala:392) ~[scala-library-2.12.8.jar!/:na]
	at akka.remote.serialization.DaemonMsgCreateSerializer.propsProto$1(DaemonMsgCreateSerializer.scala:75) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.serialization.DaemonMsgCreateSerializer.toBinary(DaemonMsgCreateSerializer.scala:86) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
	at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:52) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]

集成spring方式參考https://www.baeldung.com/akka-with-spring,序列化ERROR日志問題參考https://github.com/akka/akka/issues/15938,其中“patriknw”提出設置akka.actor.serialize-creators=off並設置相關.props(...).withDeploy(Deploy.local),本例在類SpringExt中定義方法def props(beanClass: Class[_ <: Actor]): Props = Props.create(classOf[SpringActorProducer], beanClass)//.withDeploy(Deploy.local),也失敗了,仍然報同樣ERROR。

用靜態變量保存ApplicationContext的方式可暫時解決該問題(因為壓根兒不涉及到ApplicationContext的序列化了)。


(以上知識基於版本akka 2.5.19)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM