Akka-Cluster(0)- 分布式應用開發的一些想法


  當我初接觸akka-cluster的時候,我有一個夢想,希望能充分利用actor自由分布、獨立運行的特性實現某種分布式程序。這種程序的計算任務可以進行人為的分割后再把細分的任務分派給分布在多個服務器上的actor上去運算。這些服務器都處於同一集群環境里,它們都是akka-cluster中的節點(node)。akka-cluster的節點數量只需要通過系統配置方式按照計算能力要求隨意增減,在集群上運行的分布式程序可以在不修改軟件的情況下自動調整actors在各節點上的分布,重新平衡程序運算負載,不受任何影響繼續運行。

   在前面akka系列的博客里也介紹了一些akka-cluster的情況,最近在“集群環境內編程模式(PICE)”的專題系列里又討論了如何在集群環境里通過protobuf-gRPC把多個不同類型的數據庫服務集成起來。因為集群中的數據庫服務是用akka-stream連接的,我們把程序與數據一起作為stream的流元素用Flow發送給相應的數據庫服務進行處理。這時一個想法就產生了:當數據庫服務接收了一項服務要求后(假設數據處理多是耗時、耗資源的任務)可以對任務進行分割,然后把這些小任務再分發給所屬集群內的多個節點上去運算,再按計算要求收集,匯總結果。那么如果能按用戶數量和運算任務的規模來任意添減服務器數量就能滿足任何規模的運算需求了。最重要的是這種集群節點規模調整必須是某種配置方式,即通過修改配置文件,但不需要修改軟件代碼。這些需要恰恰又是akka-cluster的特殊能力。所以決定開個akka-cluster的專題系列來具體討論集群環境下的分布式軟件開發模式。

akka-cluster提供的以下幾種方式比較符合我們的要求:

1、distributed pub/sub - 分布式發布訂閱模式

2、cluster-singleton - 單例actor模式

3、cluster-load-balancing - 集群負載均衡模式

4、cluster-sharding - 集群分片模式

在這個系列下面的博客里我們會逐個模式討論它們在具體編程的使用細節。但首先探討一下如何通過配置文件來定義akka-cluster節點,實現集群規模調整。

集群節點(cluster node)的生命周期會經歷以下階段:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

下面我們就用運行在不同集群節點的actor,通過訂閱系統的集群成員狀態轉換消息來觀察每個節點的狀態轉變:

class EventListener extends Actor with ActorLogging { import EventListner._ val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(subscriber = self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember]) super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self) super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info("{} is JOINING...", member.address) case MemberUp(member) => log.info("{} is UP!", member.address) case MemberWeaklyUp(member) => log.info("{} is weakly UP!", member.address) case MemberLeft(member) => log.info("{} is LEAVING...", member.address) case MemberExited(member) => log.info("{} is EXITING...", member.address) case MemberRemoved(member, prevStatus) => log.info("{} is REMOVED! from state {}", member.address, prevStatus) case UnreachableMember(member) => log.info("{} is UNREACHABLE!", member.address) case ReachableMember(member) => log.info("{} is REACHABLE!", member.address) case UnreachableDataCenter(datacenter) => log.info("Data Center {} is UNREACHABLE!", datacenter) case ReachableDataCenter(datacenter) => log.info("Data Center {} is REACHABLE!", datacenter) case Leave => cluster.leave(cluster.selfAddress) log.info("{} is asked to leave cluster.",cluster.selfAddress) case Down => cluster.down(cluster.selfAddress) log.info("{} is asked to shutdown cluster.",cluster.selfAddress) } }

Leave和Down是自定義消息類型:

object EventListner { trait Messages {} case object Leave extends Messages case object Down extends Messages def props = Props(new EventListener) ... }

akka-cluster最基本的配置文件內容如下:

akka { actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "localhost" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@localhost:2551"] } }

實際上hostname,port,seed-nodes這些參數都可以在程序里配置,如果有需要,我們只要在配置文件里注明這是一個集群模式的程序就行了,其它參數放到程序里去定義:

akka { actor { provider = "cluster" } }

然后我們可以在程序里配置缺失的集群參數:

object EventListner { trait Messages {} case object Leave extends Messages case object Down extends Messages def props = Props(new EventListener) def create(host: String = "localhost", port: Int = 0, seednode: String = "") = { var config = ConfigFactory.parseString(s"akka.remote.netty.tcp.hostname=${host}") .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}")) if (seednode.length > 0) { val strConfig = "akka.cluster.seed-nodes=[\"" + seednode + "\"]" val configSeed = ConfigFactory.parseString(strConfig) config = config.withFallback(configSeed) } config = config.withFallback(ConfigFactory.load("akka-cluster-config")) val clusterSystem = ActorSystem(name="ClusterSystem",config=config) clusterSystem.actorOf(Props[EventListener]) } }

在create函數里ConfigFactory.parseString可以把一個字符串轉換成集群配置參數,多個參數可以用withFallback來補充定義。

以下是EventListener的測試程序:

import EventListner._ object EventDemo extends App { val listner1 = EventListner.create(port = 2551)  //seed node
 scala.io.StdIn.readLine() val listner2 = EventListner.create()    //port=0 random port
 scala.io.StdIn.readLine() val listner3 = EventListner.create()    //port=0 random port
 scala.io.StdIn.readLine() listner3 ! Leave scala.io.StdIn.readLine() listner2 ! Down scala.io.StdIn.readLine() listner1 ! Leave scala.io.StdIn.readLine() }

第一個運行的必須是seednode,因為每個節點在啟動時都需要連接seednode。下面是每個階段的輸出結果:

[INFO] [10/22/2018 18:50:40.888] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Started up successfully
[INFO] [10/22/2018 18:50:40.931] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Node [akka.tcp://ClusterSystem@localhost:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [10/22/2018 18:50:40.933] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Cluster Node [akka.tcp://ClusterSystem@localhost:2551] dc [default] is the new leader
[INFO] [10/22/2018 18:50:40.943] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:2551] to [Up]
[INFO] [10/22/2018 18:50:41.037] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:2551 is UP!
[INFO] [10/22/2018 18:50:47.363] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is JOINING...
[INFO] [10/22/2018 18:50:47.930] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51679] to [Up]
[INFO] [10/22/2018 18:50:47.931] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!
[INFO] [10/22/2018 18:50:48.109] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is UP!
[INFO] [10/22/2018 18:50:53.765] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:53.930] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is JOINING...
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Up]
[INFO] [10/22/2018 18:50:54.929] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is UP!
[INFO] [10/22/2018 18:52:00.806] [ClusterSystem-akka.actor.default-dispatcher-32] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is asked to leave cluster.
[INFO] [10/22/2018 18:52:00.807] [ClusterSystem-akka.actor.default-dispatcher-28] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Marked address [akka.tcp://ClusterSystem@localhost:51681] as [Leaving]
[INFO] [10/22/2018 18:52:00.808] [ClusterSystem-akka.actor.default-dispatcher-42] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is asked to shutdown cluster.
[INFO] [10/22/2018 18:52:00.809] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Marking node [akka.tcp://ClusterSystem@localhost:51679] as [Down]
[INFO] [10/22/2018 18:52:00.810] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:00.933] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is LEAVING...
[INFO] [10/22/2018 18:52:01.101] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down myself
[INFO] [10/22/2018 18:52:01.102] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Shutting down...
[INFO] [10/22/2018 18:52:01.104] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51679] - Successfully shut down
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:2551 is REMOVED! from state Up
[INFO] [10/22/2018 18:52:01.110] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51679 is REMOVED! from state Down
[INFO] [10/22/2018 18:52:01.111] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@localhost:51679/user/$a] akka.tcp://ClusterSystem@localhost:51681 is REMOVED! from state Leaving
[INFO] [10/22/2018 18:52:02.925] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:2551] - Leader is moving node [akka.tcp://ClusterSystem@localhost:51681] to [Exiting]
[INFO] [10/22/2018 18:52:02.926] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@localhost:2551/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting, starting coordinated shutdown
[INFO] [10/22/2018 18:52:02.927] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@localhost:51681/user/$a] akka.tcp://ClusterSystem@localhost:51681 is EXITING...
[INFO] [10/22/2018 18:52:02.934] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@localhost:51681] - Exiting completed

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM