概述
Akka提供的非常吸引人的特性之一就是輕松構建自定義集群,這也是我要選擇Akka的最基本原因之一。如果你不想敲太多代碼,也可以通過簡單的配置構建一個非常簡單的集群。本文為說明Akka集群構建的學習成本低廉,以Akka官網的例子代碼出發,進行簡單改造后與Spring集成,有關Spring集成的信息你可以選擇閱讀《Spring與Akka的集成》一文。本文所講述的是一款十分簡便的集群監聽器,它通過訂閱集群成員的消息,對整個集群的成員進行管理(管理的方式只是打印一行日志)。
Akka集群規范
根據Akka官網的描述——Akka集群特性提供了容錯的、去中心化的、基於集群成員關系點對點的,不存在單點問題、單點瓶頸的服務。其實現原理為閑聊協議和失敗檢查。
集群概念
- 節點(node):集群中的邏輯成員。允許一台物理機上有多個節點。由元組hostname:port:uid唯一確定。
- 集群(cluster):由成員關系服務構建的一組節點。
- 領導(leader):集群中唯一扮演領導角色的節點。
- 種子節點(seed node):作為其他節點加入集群的連接點的節點。實際上,一個節點可以通過向集群中的任何一個節點發送Join(加入)命令加入集群。
節點狀態
這里以Akka官網提供的成員狀態狀態圖為例,如圖1所示。

圖1
圖1展示了狀態轉換的兩個因素:動作和狀態。
狀態
- joining:節點正在加入集群時的狀態。
- weekly up:配置了akka.cluster.allow-weakly-up-members=on時,啟用的狀態。
- up:集群中節點的正常狀態。
- leaving/exiting:優雅的刪除節點時,節點的狀態。
- down:標記為已下線的狀態。
- removed:墓碑狀態,表示已經不再是集群的成員。
動作
- join:加入集群。
- leave:告知節點優雅的離開集群。
- down:標記集群為已下線。
配置
本節將要展示構建集群所需要的最基本的配置,幾乎不會引入過多的開發成本,一個集群就構建完成了。application.conf文件的內容如下:
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://metadataAkkaSystem@127.0.0.1:2551",
"akka.tcp://metadataAkkaSystem@127.0.0.1:2552"]
#//#snippet
# excluded from snippet
auto-down-unreachable-after = 10s
#//#snippet
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
}
此配置文件與我在《使用Akka的遠程調用》一文中的配置有很多不同:
- provider不再是akka.remote.RemoteActorRefProvider,而是akka.cluster.ClusterActorRefProvider。這說明ActorRef將由akka.cluster.ClusterActorRefProvider提供;
- 增加了cluster配置;
cluster配置詳解
簡單集群監聽器
我們創建一個簡單的集群監聽器SimpleClusterListener(實際上是一個Actor,因為繼承了UntypedActor),它向集群訂閱MemberEvent(成員事件)和UnreachableMember(不可達成員)兩種消息,來對集群成員進行管理(打印),其實現見代碼清單1所示。
代碼清單1
@Named("SimpleClusterListener")
@Scope("prototype")
public class SimpleClusterListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
// subscribe to cluster changes
@Override
public void preStart() {
// #subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
// #subscribe
}
// re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else {
unhandled(message);
}
}
}
運行展示
logger.info("Start simpleClusterListener");
final ActorRef simpleClusterListener = actorSystem.actorOf(springExt.props("SimpleClusterListener"), "simpleClusterListener");
actorMap.put("simpleClusterListener", simpleClusterListener);
logger.info("Started simpleClusterListener");
我們首先啟動第一個種子節點,配置跟第一小節完全一致。我們觀察SimpleClusterListener的日志輸出如下圖所示。

我們再啟動第二個種子節點,其配置的akka.remote.netty.tcp.port為2552,我們觀察SimpleClusterListener的日志輸出如下圖所示。

我們再啟動一個非種子節點,沒有為其指定akka.remote.netty.tcp.port,我們觀察SimpleClusterListener的日志輸出如下圖所示。

可以看到新加入的節點信息被SimpleClusterListener打印出來了,細心的同學可能發現了一些Akka集群中各個節點的狀態遷移信息,第一個種子節點正在加入自身創建的集群時的狀態時JOINING,由於第一個種子節點將自己率先選舉為Leader,因此它還將自己的狀態改變為Up。后面它還將第二個種子節點和第三個節點從JOINING轉換到Up狀態。
我們停止第三個加入的節點,我們觀察SimpleClusterListener的日志輸出如下圖所示。

可以看到其狀態首先被標記為Down,最后被轉換為Removed。
總結
通過以上介紹相信大家對使用Akka構建集群有了基本的認識,是不是很輕松?如果想要繼續了解如何使用Akka構建集群,請閱讀《使用Akka構建集群(二)》。
