通過上篇關於Cluster-Singleton的介紹,我們了解了Akka為分布式程序提供的編程支持:基於消息驅動的運算模式特別適合分布式程序編程,我們不需要特別的努力,只需要按照普通的Actor編程方式就可以實現集群分布式程序了。Cluster-Singleton可以保證無論集群節點出了任何問題,只要集群中還有節點在線,都可以持續的安全運算。Cluster-Singleton這種模式保證了某種Actor的唯一實例可以安全穩定地在集群環境下運行。還有一種情況就是如果有許多特別占用資源的Actor需要同時運行,而這些Actor同時占用的資源遠遠超過一台服務器的容量,如此我們必須把這些Actor分布到多台服務器上,或者是一個由多台服務器組成的集群環境,這時就需要Cluster-Sharding模式來幫助解決這樣的問題了。
我把通過使用Cluster-Sharding后達到的一些目的和大家分享一下,大家一起來分析分析到底這些達成的目標里是否包括了Actor在集群節點間的分布:
首先我有個Actor,它的名稱是一個自編碼,由Cluster-Sharding在集群中某個節點上構建。由於在一個集群環境里所以這個Actor到底在哪個節點上,具體地址是什么我都不知道,我只需要用這個自編碼就可以和它溝通。如果我有許多自編碼的消耗資源的Actor,我可以通過自編碼中的分片(shard)編號來指定在其它的分片(shard)里構建這些Actor。Akka-Cluster還可以根據整個集群中節點的增減按當前集群節點情況進行分片在集群節點調動來重新配載(rebalance),包括在某些節點因故脫離集群時把節點上的所有Actor在其它在線節點上重新構建。這樣看來,這個Actor的自編碼應該是Cluster-Sharding的應用核心元素了。按慣例我們還是用例子來示范Cluster-Sharding的使用。我們需要分片(sharding)的Actor就是前幾篇討論里提到的Calculator:
package clustersharding.entity import akka.actor._ import akka.cluster._ import akka.persistence._ import scala.concurrent.duration._ import akka.cluster.sharding._ object Calculator { sealed trait Command case class Num(d: Double) extends Command case class Add(d: Double) extends Command case class Sub(d: Double) extends Command case class Mul(d: Double) extends Command case class Div(d: Double) extends Command case object ShowResult extends Command sealed trait Event case class SetResult(d: Any) extends Event def getResult(res: Double, cmd: Command) = cmd match { case Num(x) => x case Add(x) => res + x case Sub(x) => res - x case Mul(x) => res * x case Div(x) => { val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
res / x } case _ => new ArithmeticException("Invalid Operation!") } case class State(result: Double) { def updateState(evt: Event): State = evt match { case SetResult(n) => copy(result = n.asInstanceOf[Double]) } } case object Disconnect extends Command //exit cluster
def props = Props(new Calcultor) } class Calcultor extends PersistentActor with ActorLogging { import Calculator._ val cluster = Cluster(context.system) var state: State = State(0) override def persistenceId: String = self.path.parent.name+"-"+self.path.name override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_,st: State) => state = state.copy(result = st.result) } override def receiveCommand: Receive = { case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt)) case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt)) case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt)) case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt)) case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt)) case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}") case Disconnect => log.info(s"${cluster.selfAddress} is leaving cluster!!!") cluster.leave (cluster.selfAddress) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting calculator: ${reason.getMessage}") super.preRestart(reason, message) } } class CalcSupervisor extends Actor { def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: ArithmeticException => SupervisorStrategy.Resume } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ decider.orElse(SupervisorStrategy.defaultDecider) } val calcActor = context.actorOf(Calculator.props,"calculator") override def receive: Receive = { case msg@ _ => calcActor.forward(msg) } }
我們看到:Calculator是一個普通的PersisitentActor,內部狀態可以實現持久化,Actor重啟時可以恢復狀態。CalcSupervisor是Calculator的監管,這樣做是為了實現新的監管策略SupervisorStrategy。
Calculator就是我們准備集群分片(sharding)的目標enitity。一種Actor的分片是通過Akka的Cluster-Sharding的ClusterSharding.start方法在集群中構建的。我們需要在所有將承載分片的節點上運行這個方法來部署分片:
/** * Register a named entity type by defining the [[akka.actor.Props]] of the entity actor and * functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * * @param typeName the name of the entity type * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] * @param extractEntityId partial function to extract the entity id and the message to send to the * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream * @param extractShardId function to determine the shard id for an incoming message, only messages * that passed the `extractEntityId` will be used * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { val allocationStrategy = new LeastShardAllocationStrategy( settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) }
start返回了ShardRegion,是個ActorRef類型。ShardRegion是一個特殊的Actor,負責管理可能多個分片(shard)內稱為Entity的Actor實例。這些分片可能是分布在不同的集群節點上的,外界通過ShardRegion與其轄下Entities溝通。從start函數參數entityProps我們看到:每個分片中只容許一個種類的Actor;具體的Entity實例是由另一個內部Actor即shard構建的,shard可以在一個分片中構建多個Entity實例。多shard多entity的特性可以從extractShardId,extractEntityId這兩個方法中得到一些信息。我們說過Actor自編碼即entity-id是Cluster-Sharding的核心元素。在entity-id這個自編碼中還包含了shard-id,所以用戶可以通過entity-id的編碼規則來設計整個分片系統包括每個ShardRegion下shard和entity的數量。當ShardRegion得到一個entity-id后,首先從中抽取shard-id,如果shard-id在集群中不存在的話就按集群各節點負載情況在其中一個節點上構建新的shard;然后再用entity-id在shard-id分片中查找entity,如果不存在就構建一個新的entity實例。整個shard和entity的構建過程都是通過用戶提供的函數extractShardId和extractEntityId實現的,Cluster-Sharding就是通過這兩個函數按用戶的要求來構建和使用shard和entity的。這個自編碼無需按一定的順序,只需要保證唯一性。下面是一個編碼例子:
object CalculatorShard { import Calculator._ case class CalcCommands(eid: String, msg: Command) //user should use it to talk to shardregion
val shardName = "calcShard" val getEntityId: ShardRegion.ExtractEntityId = { case CalcCommands(id,msg) => (id,msg) } val getShardId: ShardRegion.ExtractShardId = { case CalcCommands(id,_) => id.head.toString } def entityProps = Props(new CalcSupervisor) }
用戶是用CalcCommands與ShardRegion溝通的。這是一個專門為與分片系統溝通而設的包嵌消息類型,包嵌的信息里除了Calculator正常支持的Command消息外,還包括了目標Entity實例的編號eid。這個eid的第一個字節代表shard-id,這樣我們可以直接指定目標entity所在分片或者隨意任選一個shard-id如:Random.NextInt(9).toString。由於每個分片只含一種類型的Actor,不同的entity-id代表多個同類Actor實例的同時存在,就像前面討論的Router一樣:所有實例針對不同的輸入進行相同功能的運算處理。一般來說用戶會通過某種算法任意產生entity-id,希望能做到各分片中entity的均衡部署,Cluster-Sharding可以根據具體的集群負載情況自動調整分片在集群節點層面上的部署。
下面的代碼示范了如何在一個集群節點上部署分片:
package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard object CalcShards { def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}") .withFallback(ConfigFactory.load("sharding")) // Create an Akka system
val system = ActorSystem("ShardingSystem", config) startupSharding(port,system) } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } def startupSharding(port: Int, system: ActorSystem) = { startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store")) ClusterSharding(system).start( typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId ) } }
具體的部署代碼在startupSharding方法里。下面這段代碼示范了如何使用分片里的entity:
package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory object ClusterShardingDemo extends App { CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0) Thread.sleep(1000) val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding")) CalcShards.startupSharding(0,shardingSystem) Thread.sleep(1000) val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName) calcRegion ! CalcCommands("1012",Calculator.Num(13.0)) //shard 1, entity 1012
calcRegion ! CalcCommands("1012",Calculator.Add(12.0)) calcRegion ! CalcCommands("1012",Calculator.ShowResult) //shows address too
calcRegion ! CalcCommands("1012",Calculator.Disconnect) //disengage cluster
calcRegion ! CalcCommands("2012",Calculator.Num(10.0)) //shard 2, entity 2012
calcRegion ! CalcCommands("2012",Calculator.Mul(3.0)) calcRegion ! CalcCommands("2012",Calculator.Div(2.0)) calcRegion ! CalcCommands("2012",Calculator.Div(0.0)) //divide by zero
Thread.sleep(15000) calcRegion ! CalcCommands("1012",Calculator.ShowResult) //check if restore result on another node
calcRegion ! CalcCommands("2012",Calculator.ShowResult) }
以上代碼里人為選定了分片和entity-id,其中包括了從集群中抽出一個節點的操作。運算結果如下:
[INFO] [07/15/2017 09:32:49.414] [ShardingSystem-akka.actor.default-dispatcher-20] [akka.tcp://ShardingSystem@127.0.0.1:50456/system/sharding/calcShard/1/1012/calculator] Result on ShardingSystem@127.0.0.1:50456 is: 25.0 [INFO] [07/15/2017 09:32:49.414] [ShardingSystem-akka.actor.default-dispatcher-20] [akka.tcp://ShardingSystem@127.0.0.1:50456/system/sharding/calcShard/1/1012/calculator] akka.tcp://ShardingSystem@127.0.0.1:50456 is leaving cluster!!! [WARN] [07/15/2017 09:32:49.431] [ShardingSystem-akka.actor.default-dispatcher-18] [akka://ShardingSystem/system/sharding/calcShard/2/2012/calculator] / by zero [INFO] [07/15/2017 09:33:01.320] [ShardingSystem-akka.actor.default-dispatcher-4] [akka.tcp://ShardingSystem@127.0.0.1:50464/system/sharding/calcShard/2/2012/calculator] Result on ShardingSystem@127.0.0.1:50464 is: 15.0 [INFO] [07/15/2017 09:33:01.330] [ShardingSystem-akka.actor.default-dispatcher-18] [akka.tcp://ShardingSystem@127.0.0.1:50457/system/sharding/calcShard/1/1012/calculator] Result on ShardingSystem@127.0.0.1:50457 is: 25.0
結果顯示entity1012在節點50456退出集群后被轉移到節點50457上,並行保留了狀態。
下面是本次示范的源代碼:
build.sbt
name := "cluster-sharding" version := "1.0" scalaVersion := "2.11.9" resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" val akkaversion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaversion, "com.typesafe.akka" %% "akka-remote" % akkaversion, "com.typesafe.akka" %% "akka-cluster" % akkaversion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion, "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion, "com.typesafe.akka" %% "akka-persistence" % "2.4.8", "com.typesafe.akka" %% "akka-contrib" % akkaversion, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")
resources/sharding.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ShardingSystem@127.0.0.1:2551"] log-info = off } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.plugin = "akka.persistence.snapshot-store.local" snapshot-store.local.dir = "target/snapshots" } }
Calculator.scala
package clustersharding.entity import akka.actor._ import akka.cluster._ import akka.persistence._ import scala.concurrent.duration._ import akka.cluster.sharding._ object Calculator { sealed trait Command case class Num(d: Double) extends Command case class Add(d: Double) extends Command case class Sub(d: Double) extends Command case class Mul(d: Double) extends Command case class Div(d: Double) extends Command case object ShowResult extends Command sealed trait Event case class SetResult(d: Any) extends Event def getResult(res: Double, cmd: Command) = cmd match { case Num(x) => x case Add(x) => res + x case Sub(x) => res - x case Mul(x) => res * x case Div(x) => { val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
res / x } case _ => new ArithmeticException("Invalid Operation!") } case class State(result: Double) { def updateState(evt: Event): State = evt match { case SetResult(n) => copy(result = n.asInstanceOf[Double]) } } case object Disconnect extends Command //exit cluster
def props = Props(new Calcultor) } class Calcultor extends PersistentActor with ActorLogging { import Calculator._ val cluster = Cluster(context.system) var state: State = State(0) override def persistenceId: String = self.path.parent.name+"-"+self.path.name override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_,st: State) => state = state.copy(result = st.result) } override def receiveCommand: Receive = { case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt)) case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt)) case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt)) case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt)) case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt)) case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}") case Disconnect => log.info(s"${cluster.selfAddress} is leaving cluster!!!") cluster.leave (cluster.selfAddress) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting calculator: ${reason.getMessage}") super.preRestart(reason, message) } } class CalcSupervisor extends Actor { def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: ArithmeticException => SupervisorStrategy.Resume } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ decider.orElse(SupervisorStrategy.defaultDecider) } val calcActor = context.actorOf(Calculator.props,"calculator") override def receive: Receive = { case msg@ _ => calcActor.forward(msg) } } object CalculatorShard { import Calculator._ case class CalcCommands(eid: String, msg: Command) //user should use it to talk to shardregion
val shardName = "calcShard" val getEntityId: ShardRegion.ExtractEntityId = { case CalcCommands(id,msg) => (id,msg) } val getShardId: ShardRegion.ExtractShardId = { case CalcCommands(id,_) => id.head.toString } def entityProps = Props(new CalcSupervisor) }
CalcShard.scala
package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard object CalcShards { def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}") .withFallback(ConfigFactory.load("sharding")) // Create an Akka system
val system = ActorSystem("ShardingSystem", config) startupSharding(port,system) } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } def startupSharding(port: Int, system: ActorSystem) = { startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store")) ClusterSharding(system).start( typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId ) } }
ClusterShardingDemo.scala
package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory object ClusterShardingDemo extends App { CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0) Thread.sleep(1000) val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding")) CalcShards.startupSharding(0,shardingSystem) Thread.sleep(1000) val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName) calcRegion ! CalcCommands("1012",Calculator.Num(13.0)) //shard 1, entity 1012
calcRegion ! CalcCommands("1012",Calculator.Add(12.0)) calcRegion ! CalcCommands("1012",Calculator.ShowResult) //shows address too
calcRegion ! CalcCommands("1012",Calculator.Disconnect) //disengage cluster
calcRegion ! CalcCommands("2012",Calculator.Num(10.0)) //shard 2, entity 2012
calcRegion ! CalcCommands("2012",Calculator.Mul(3.0)) calcRegion ! CalcCommands("2012",Calculator.Div(2.0)) calcRegion ! CalcCommands("2012",Calculator.Div(0.0)) //divide by zero
Thread.sleep(15000) calcRegion ! CalcCommands("1012",Calculator.ShowResult) //check if restore result on another node
calcRegion ! CalcCommands("2012",Calculator.ShowResult) }