Akka-Cluster可以在一部物理機或一組網絡連接的服務器上搭建部署。用Akka開發同一版本的分布式程序可以在任何硬件環境中運行,這樣我們就可以確定以Akka分布式程序作為標准的編程方式了。
在上面兩篇討論里我們介紹了Akka-Remoting。Akka-Remoting其實是一種ActorSystem之間Actor對Actor點對點的溝通協議。通過Akka-Remoting來實現一個ActorSystem中的一個Actor與另一個Actorsystem中的另一個Actor之間的溝通。在Remoting功能之后,Akka又發展了集群Cluster功能。Akka-Cluster是基於Akka-Remoting之上的新一代分布式運算環境,所以Remoting已經成為了Akka-Cluster的內部支持功能,在生產環境中的分布式運算應該盡量使用Akka-Cluster。當然,人們仍然可以在學習和測試環境中使用Akka-Remoting來了解Akka的分布式運算機制和原理。Remoting和Cluster的明顯分別之一就是真正實現了Actor的位置透明化。讓編程人員可以更輕松自然的實現分布式編程。當然,更重要的是相對Akka-Remoting而言,Akka-Cluster提供了一整套更安全、更高效的分布式運算環境。
簡單來說Akka-Cluster將多個JVM連接整合起來,實現消息地址的透明化和統一化使用管理,集成一體化的消息驅動系統。最終目的是能夠把一個大型程序分割成多個子程序,然后部署到很多JVM上去實現程序的分布式並行運算。更重要的是:Cluster的構建過程與Actor編程沒有牽連,當Cluster把多個ActorSystem集合成一個統一系統后,我們可以用在單一ActorSystem里編程的習慣方式編寫分布式運算程序。由於在單一機器上就可以配置多個節點形成一個集群,我們開發的分布式程序可以在單機或多機群上運行,不同的只是如何部署和配置集群環境。
我們首先來認識一些有關Akka-Cluster的基礎概念:
Node:集群節點,也可以說是代表一個獨立的ActorSystem,用hostname:port來表示。一部物理機器上可以構建多個集群節點Node,這時它們有着相同的hostname和不同的port,在不同機器上的Node則可以使用不同的hostname和相同的port。
Cluster:由多個節點Node作為集群成員通過一種集群組織協議形成集群的一個整體。
Leader:集群中的某個成員節點Node。由Akka自動在集群成員中選定,負責集群成員生命周期狀態的具體轉換操作。
Seed-Node:由一或多個集群中的節點組成。一個節點在加入集群之前先向所有用戶指定的Seed-Node發出聯系消息,然后向最先答復的Seed-Node發出加入集群請求。Seed-Node的主要作用是為申請加入集群的節點提供具體的聯絡地址,畢竟申請加入的節點需要一個具體的地址來發送申請加入消息,從這個方面來說:Seed-Node可以是集群中任何已知地址的節點。
Node-Lifecycle-State:一個節點的生命周期里包括以下幾個狀態轉換:
Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed
另外,Akka-Cluster通過交流心跳信號(heart-beat signal)方式可以監測任何節點是否處於無法聯絡Unreachable狀態。
Membership:集群成員組織是通過Gossip溝通協議把多個節點組織起來形成的一個集群整體。
Membership-State: 集群狀態,是一個集群內所有節點共享的數據結構,用於存放群內所有節點狀態。集群狀態是一種CRDT數據結構,提供安全便捷的數據合並操作,方便逐步累加型數據合並更新。
Gossip-Protocal:是Node之間的交流協議。集群內的節點分鄰里相互通過Gossip交流更新集群狀態數據,逐步擴散交流覆蓋整個集群所有節點並形成完整的統一集群狀態數據。
Gossip-Convergence:集群統一狀態。當Gossip交流覆蓋了集群中所有節點,即所有節點都獲得統一的集群狀態,就達到集群統一狀態Convergence。
Failure-Detector fd:所有節點都具備心跳信號交流功能。集群中某個節點可能被多個節點用heartbeat檢測在線是否Reachable/Unreachable。如果集群中任何一個節點處於Unreachable狀態則整個集群無法達至Convergence狀態。
Leader-Actions:當集群達到Convergence后系統自動選定一個Leader節點進行以上描述的節點狀態轉換操作。如果集群內有節點處於Unreachable狀態,無法達到集群Convergence,則無法滿足任何節點狀態轉換請求。
在Akka-Cluster中一個節點加入集群是自動的,只要在配置文件里設置一個Seed-Node清單,否則就必須在Actor程序里用Cluster.join或Cluster.joinSeedNodes方法加人:
/** * Try to join this cluster node with the node specified by 'address'. * A 'Join(selfAddress)' command is sent to the node to join. * * An actor system can only join a cluster once. Additional attempts will be ignored. * When it has successfully joined it must be restarted to be able to join another * cluster or to join the same cluster again. * * The name of the [[akka.actor.ActorSystem]] must be the same for all members of a * cluster. */ def join(address: Address): Unit = clusterCore ! ClusterUserAction.JoinTo(fillLocal(address)) /** * Join the specified seed nodes without defining them in config. * Especially useful from tests when Addresses are unknown before startup time. * * An actor system can only join a cluster once. Additional attempts will be ignored. * When it has successfully joined it must be restarted to be able to join another * cluster or to join the same cluster again. */ def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit = clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
集群節點Leave和Down實現方法如下:
/** * Send command to issue state transition to LEAVING for the node specified by 'address'. * The member will go through the status changes [[MemberStatus]] `Leaving` (not published to * subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`. * * Note that this command can be issued to any member in the cluster, not necessarily the * one that is leaving. The cluster extension, but not the actor system or JVM, of the * leaving member will be shutdown after the leader has changed status of the member to * Exiting. Thereafter the member will be removed from the cluster. Normally this is * handled automatically, but in case of network failures during this process it might * still be necessary to set the node’s status to Down in order to complete the removal. */ def leave(address: Address): Unit = clusterCore ! ClusterUserAction.Leave(fillLocal(address)) /** * Send command to DOWN the node specified by 'address'. * * When a member is considered by the failure detector to be unreachable the leader is not * allowed to perform its duties, such as changing status of new joining members to 'Up'. * The status of the unreachable member must be changed to 'Down', which can be done with * this method. */ def down(address: Address): Unit = clusterCore ! ClusterUserAction.Down(fillLocal(address))
Akka-Cluster的集群節點狀態轉換可以作為事件在Akka的EventBus上發布:
/** * Marker interface for membership events. * Published when the state change is first seen on a node. * The state change was performed by the leader when there was * convergence on the leader node, i.e. all members had seen previous * state. */
sealed trait MemberEvent extends ClusterDomainEvent { def member: Member } /** * Member status changed to Joining. */ final case class MemberJoined(member: Member) extends MemberEvent { if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) } /** * Member status changed to WeaklyUp. * A joining member can be moved to `WeaklyUp` if convergence * cannot be reached, i.e. there are unreachable nodes. * It will be moved to `Up` when convergence is reached. */ final case class MemberWeaklyUp(member: Member) extends MemberEvent { if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member) } /** * Member status changed to Up. */ final case class MemberUp(member: Member) extends MemberEvent { if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) } /** * Member status changed to Leaving. */ final case class MemberLeft(member: Member) extends MemberEvent { if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) } /** * Member status changed to `MemberStatus.Exiting` and will be removed * when all members have seen the `Exiting` status. */ final case class MemberExited(member: Member) extends MemberEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) } /** * Member completely removed from the cluster. * When `previousStatus` is `MemberStatus.Down` the node was removed * after being detected as unreachable and downed. * When `previousStatus` is `MemberStatus.Exiting` the node was removed * after graceful leaving and exiting. */ final case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } /** * Marker interface to facilitate subscription of * both [[UnreachableMember]] and [[ReachableMember]]. */
sealed trait ReachabilityEvent extends ClusterDomainEvent { def member: Member } /** * A member is considered as unreachable by the failure detector. */ final case class UnreachableMember(member: Member) extends ReachabilityEvent /** * A member is considered as reachable by the failure detector * after having been unreachable. * @see [[UnreachableMember]] */ final case class ReachableMember(member: Member) extends ReachabilityEvent
集群的當前狀態值是存放在下面CurrentClusterState結構里的:
/** * Current snapshot state of the cluster. Sent to new subscriber. */ final case class CurrentClusterState( members: immutable.SortedSet[Member] = immutable.SortedSet.empty, unreachable: Set[Member] = Set.empty, seenBy: Set[Address] = Set.empty, leader: Option[Address] = None, roleLeaderMap: Map[String, Option[Address]] = Map.empty) { /** * Java API: get current member list. */ def getMembers: java.lang.Iterable[Member] = { import scala.collection.JavaConverters._ members.asJava } /** * Java API: get current unreachable set. */ def getUnreachable: java.util.Set[Member] = scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava /** * Java API: get current “seen-by” set. */ def getSeenBy: java.util.Set[Address] = scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava /** * Java API: get address of current leader, or null if none */ def getLeader: Address = leader orNull /** * All node roles in the cluster */ def allRoles: Set[String] = roleLeaderMap.keySet /** * Java API: All node roles in the cluster */ def getAllRoles: java.util.Set[String] = scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava /** * get address of current leader, if any, within the role set */ def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None) /** * Java API: get address of current leader within the role set, * or null if no node with that role */ def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull }
用戶可以監聽這些事件的發生:
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
另外,我們還可以用callback方式在狀態轉換前后調用一些運算來進行准備處理和事后處理:
/** * The supplied thunk will be run, once, when current cluster member is `Up`. * Typically used together with configuration option `akka.cluster.min-nr-of-members` * to defer some action, such as starting actors, until the cluster has reached * a certain size. */ def registerOnMemberUp[T](code: ⇒ T): Unit = registerOnMemberUp(new Runnable { def run() = code }) /** * Java API: The supplied callback will be run, once, when current cluster member is `Up`. * Typically used together with configuration option `akka.cluster.min-nr-of-members` * to defer some action, such as starting actors, until the cluster has reached * a certain size. */ def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback) /** * The supplied thunk will be run, once, when current cluster member is `Removed`. * If the cluster has already been shutdown the thunk will run on the caller thread immediately. * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`. */ def registerOnMemberRemoved[T](code: ⇒ T): Unit = registerOnMemberRemoved(new Runnable { override def run(): Unit = code }) /** * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`. * If the cluster has already been shutdown the thunk will run on the caller thread immediately. * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`. */ def registerOnMemberRemoved(callback: Runnable): Unit = { if (_isTerminated.get()) callback.run() else clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback) }
下面我們就用個例子來示范Akka-Cluster的運作過程:
首先需要Akka-Cluster的dependency:build.sbt
name := "cluster-states-demo" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= { val akkaVersion = "2.5.3" Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-cluster" % akkaVersion ) }
然后是基本的配置:cluster.conf
akka { actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://clusterSystem@127.0.0.1:2551"] } }
下面是一個集群狀態轉換事件的監聽Actor:
import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster._ import com.typesafe.config.ConfigFactory class EventLisener extends Actor with ActorLogging { val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember]) //訂閱集群狀態轉換信息
super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self) //取消訂閱
super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info("Member is Joining: {}", member.address) case MemberUp(member) => log.info("Member is Up: {}", member.address) case MemberLeft(member) => log.info("Member is Leaving: {}", member.address) case MemberExited(member) => log.info("Member is Exiting: {}", member.address) case MemberRemoved(member, previousStatus) => log.info( "Member is Removed: {} after {}", member.address, previousStatus) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) cluster.down(member.address) //手工驅除,不用auto-down
case _: MemberEvent => // ignore
} }
下面是EventListener使用測試代碼,增加了Node加人集群后可能進行的前期設置及退出集群后的事后清理:
object ClusterEventsDemo { def main(args: Array[String]): Unit = { //重設port,seed-node-address
val port =
if (args.isEmpty) "0"
else args(0) val addr =
if (args.length < 2) "2551"
else args(1) val seednodeSetting = "akka.cluster.seed-nodes = ["+
"\"akka.tcp://clusterSystem@127.0.0.1:"+ s"${addr}"+"\"]" val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}") .withFallback(ConfigFactory.parseString(seednodeSetting)) .withFallback(ConfigFactory.load("cluster.conf")) val clusterSystem = ActorSystem(name="clusterSystem",config=config) val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener") val cluster = Cluster(clusterSystem) cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... ")) cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ...")) println("actor system started!") scala.io.StdIn.readLine() clusterSystem.terminate() } }
我們在多個terminal上用sbt來測試運行:
1、run "2551" "2551" //這是個seed-node
[INFO] [06/26/2017 21:25:46.743] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Node [akka.tcp://clusterSystem@127.0.0.1:2551] is JOINING, roles [] [INFO] [06/26/2017 21:25:46.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:2551] to [Up] [INFO] [06/26/2017 21:25:46.755] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
2、run "0" "2551" //port=0代表由系統自動選擇端口
[INFO] [06/26/2017 21:26:57.467] [run-main-1e] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Started up successfully actor system started! [INFO] [06/26/2017 21:26:57.735] [clusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551] [INFO] [06/26/2017 21:26:57.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551 [INFO] [06/26/2017 21:26:57.752] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52459 [INFO] [06/26/2017 21:26:57.809] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459
3、run "0" "2551" //port=0代表由系統自動選擇端口
[INFO] [06/26/2017 21:28:22.577] [run-main-1] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Started up successfully actor system started! [INFO] [06/26/2017 21:28:22.736] [clusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551] [INFO] [06/26/2017 21:28:22.747] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551 [INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459 [INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52467 [INFO] [06/26/2017 21:28:24.611] [clusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52467
在terminal2運算cluster.leave(cluster.selfAddress):
[INFO] [06/26/2017 22:40:47.614] [clusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Leaving: akka.tcp://clusterSystem@127.0.0.1:53986 [INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:53986] to [Exiting] [INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Exiting: akka.tcp://clusterSystem@127.0.0.1:53986 [INFO] [06/26/2017 22:40:48.047] [clusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Exiting confirmed [akka.tcp://clusterSystem@127.0.0.1:53986] [INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is removing confirmed Exiting node [akka.tcp://clusterSystem@127.0.0.1:53986] [INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://clusterSystem@127.0.0.1:2551/user/eventListener] Member is Removed: akka.tcp://clusterSystem@127.0.0.1:53986 after Exiting
下面就是本次示范的源代碼:
resources/cluster.conf
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://clusterSystem@127.0.0.1:2551"] } }
ClusterEventsDemo.scala
import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster._ import com.typesafe.config.ConfigFactory class EventLisener extends Actor with ActorLogging { val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember]) //訂閱集群狀態轉換信息
super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self) //取消訂閱
super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info("Member is Joining: {}", member.address) case MemberUp(member) => log.info("Member is Up: {}", member.address) case MemberLeft(member) => log.info("Member is Leaving: {}", member.address) case MemberExited(member) => log.info("Member is Exiting: {}", member.address) case MemberRemoved(member, previousStatus) => log.info( "Member is Removed: {} after {}", member.address, previousStatus) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) cluster.down(member.address) //手工驅除,不用auto-down
case _: MemberEvent => // ignore
} } object ClusterEventsDemo { def main(args: Array[String]): Unit = { //重設port,seed-node-address
val port =
if (args.isEmpty) "0"
else args(0) val addr =
if (args.length < 2) "2551"
else args(1) val seednodeSetting = "akka.cluster.seed-nodes = ["+
"\"akka.tcp://clusterSystem@127.0.0.1:"+ s"${addr}"+"\"]" val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}") .withFallback(ConfigFactory.parseString(seednodeSetting)) .withFallback(ConfigFactory.load("cluster.conf")) val clusterSystem = ActorSystem(name="clusterSystem",config=config) val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener") val cluster = Cluster(clusterSystem) cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... ")) cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ...")) println("actor system started!") scala.io.StdIn.readLine() clusterSystem.terminate() } }