在很多應用場景中都會出現在系統中需要某類Actor的唯一實例(only instance)。這個實例在集群環境中可能在任何一個節點上,但保證它是唯一的。Akka的Cluster-Singleton提供對這種Singleton Actor模式的支持,能做到當這個實例所在節點出現問題需要脫離集群時自動在另一個節點上構建一個同樣的Actor,並重新轉交控制。當然,由於涉及了一個新構建的Actor,內部狀態會在這個過程中丟失。Single-Actor的主要應用包括某種對外部只能支持一個接入的程序接口,或者一種帶有由多個其它Actor運算結果產生的內部狀態的累積型Actor(aggregator)。當然,如果使用一種帶有內部狀態的Singleton-Actor,可以考慮使用PersistenceActor來實現內部狀態的自動恢復。如此Cluster-Singleton變成了一種非常實用的模式,可以在許多場合下應用。
Cluster-Singleton模式也恰恰因為它的唯一性特點存在着一些隱憂,需要特別關注。唯一性容易造成的隱憂包括:容易造成超負荷、無法保證穩定在線、無法保證消息投遞。這些需要用戶在編程時增加特別處理。
好了,我們設計個例子來了解Cluster-Singleton,先看看Singleton-Actor的功能:
class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
這個SingletonActor就是一種特殊的Actor,它繼承了PersistentActor,所以需要實現PersistentActor的抽象函數。SingletonActor維護了幾個內部狀態,分別是各類運算的當前累積結果freeHoles,freeTrees,ttlMatches。SingletonActor模擬的是一個種樹場景:當收到Dig指令后產生登記樹坑AddHole事件,在這個事件中更新當前狀態值;當收到Plant指令后產生AddTree事件並更新狀態。因為Cluster-Singleton模式無法保證消息安全投遞所以應該加個回復機制AckDig,AckPlant讓消息發送者可用根據情況補發消息。我們是用Cluster.selfAddress來確認當前集群節點的轉換。
我們需要在所有承載SingletonActor的集群節點上構建部署ClusterSingletonManager,如下:
def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") }
可以看的出來,ClusterSingletonManager也是一種Actor,通過ClusterSingletonManager.props配置其所管理的SingletonActor。我們的目的主要是去求證當前集群節點出現故障需要退出集群時,這個SingletonActor是否能夠自動轉移到其它在線的節點上。ClusterSingletonManager的工作原理是首先在所有選定的集群節點上構建和部署,然后在最先部署的節點上啟動SingletonActor,當這個節點不可使用時(unreachable)自動在次先部署的節點上重新構建部署SingletonActor。
同樣作為一種Actor,ClusterSingletonProxy是通過與ClusterSingletonManager消息溝通來調用SingletonActor的。ClusterSingletonProxy動態跟蹤在線的SingletonActor,為用戶提供它的ActorRef。我們可以通過下面的代碼來具體調用SingletonActor:
object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
我們分不同的時間段通過ClusterSingletonProxy向SingletonActor發送Dig和Plant消息。然后每隔30秒向SingletonActor發送一個Disconnect消息通知它所在節點開始脫離集群。然后我們用下面的代碼來試着運行:
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}
運算結果如下:
[INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551] [INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839] [INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840] [INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841] [INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842] [INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843] [INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0 [INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0 ... [INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ... [INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4 [INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4 [INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5 [INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=5 [INFO] [07/09/2017 20:17:43.672] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=1,ttlMatches=6 [INFO] [07/09/2017 20:17:45.665] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=6 [INFO] [07/09/2017 20:17:46.654] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=6 ... [INFO] [07/09/2017 20:17:53.673] [SingletonClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:55839 is leaving cluster ... [INFO] [07/09/2017 20:17:55.654] [SingletonClusterSystem-akka.actor.default-dispatcher-13] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=4,ttlMatches=9 [INFO] [07/09/2017 20:17:55.664] [SingletonClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=10 [INFO] [07/09/2017 20:17:56.646] [SingletonClusterSystem-akka.actor.default-dispatcher-5] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:57.662] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=4,ttlMatches=10 [INFO] [07/09/2017 20:17:58.652] [SingletonClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=5,ttlMatches=10
從結果顯示里我們可以觀察到隨着節點脫離集群,SingletonActor自動轉換到其它的集群節點上繼續運行。
值得再三注意的是:以此等簡單的編碼就可以實現那么復雜的集群式分布運算程序,說明Akka是一種具有廣闊前景的實用編程工具!
下面是本次示范的完整源代碼:
build.sbt
name := "cluster-singleton" version := "1.0" scalaVersion := "2.11.9" resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" val akkaversion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaversion, "com.typesafe.akka" %% "akka-remote" % akkaversion, "com.typesafe.akka" %% "akka-cluster" % akkaversion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion, "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion, "com.typesafe.akka" %% "akka-persistence" % "2.4.8", "com.typesafe.akka" %% "akka-contrib" % akkaversion, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")
application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://SingletonClusterSystem@127.0.0.1:2551"] log-info = off } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.plugin = "akka.persistence.snapshot-store.local" snapshot-store.local.dir = "target/snapshots" } }
SingletonActor.scala
package clustersingleton.sa import akka.actor._ import akka.cluster._ import akka.persistence._ import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ import akka.persistence.journal.leveldb._ import akka.util.Timeout import akka.pattern._ object SingletonActor { sealed trait Command case object Dig extends Command case object Plant extends Command case object AckDig extends Command //acknowledge
case object AckPlant extends Command //acknowledge
case object Disconnect extends Command //force node to leave cluster
case object CleanUp extends Command //clean up when actor ends
sealed trait Event case object AddHole extends Event case object AddTree extends Event case class State(nHoles: Int, nTrees: Int, nMatches: Int) def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } } class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
SingletonUser.scala
package clustersingleton.frontend import akka.actor._ import clustersingleton.sa.SingletonActor import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
ClusterSingletonDemo.scala
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}