Akka Cluster簡介與基本環境搭建


  akka集群是高容錯、去中心化、不存在單點故障以及不存在單點瓶頸的集群。它使用gossip協議通信以及具備故障自動檢測功能。

Gossip收斂
  集群中每一個節點被其他節點監督(默認的最大數量為5)。集群中的節點互相監督着,某節點所監督的狀態也正在被其他監督着。通過gossip協議,節點向其他節點傳遞自己所見節點的最新狀態(Up、Joining等等),同時節點也在接收來自其他節點的信息,這些信息包括哪些節點以及這些節點對應的狀態,並這些節點加入到自己的seen表里去,表示自己已經看見了這些節點的最新狀態了,當所有的節點都把其他節點“看見”了后,我們可以說"Gossip收斂"完成了。

  根據以上陳述,當集群中某節點不可達(unreachable)時,gossip收斂不能完成。那些不可達的節點需要變成可達狀態(reachable)或者down狀態,收斂才能進行。
  akka集群不存在leader選舉,但是存在leader節點,但是leader節點可以轉移,leader負責執行leader action,當每次收斂完成后,leader需要做三件事:

  • 將處於joining狀態節點變更為Up狀態, 即joining->up
  • leaving->exiting
  • exiting->removed

failure Detector

  集群中,一個節點被其他節點監督(默認最大數量為5),任何一個節點被探測到不可達時,那么這個消息將被通過gossip協議傳播到其他節點去,其他節點也將此節點標為不可達。同時故障檢測機制也會將節點從不可達標記為可達,同時擴散給其他節點。
關於評判一個節點是否可達的方式是利用歷史數據中每次心跳時間間隔的平均值與心跳次數為均方差去構建一個正太分布,F是這個分布的密度分布函數,利用以下公式:

`phi = -log10(1 - F(timeSinceLastHeartbeat))`
  phi反應了當前網絡的好壞情況,當`akka.cluster.failure-detector.threshold`閾值配置不當時,並不是等待某個心跳檢測超時時,才會把節點標記為不可達。其值默認為18,想要得到更高的靈敏度,需要把閾值設置降低。
## 實踐   編程方式構建集群   `akka.tcp://myCluster@127.0.0.1:2551`節點:

application.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = []
  }
}
package nathan

import akka.actor.{Actor, ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).join(Address(protocol = "akka.tcp",system = "myCluster",host = "127.0.0.1",port = 2551))
}

  上述代碼Cluster(actorSystem).join(address)是以address為基礎創建集群,集群的名稱為"myCluster",其中包含"akka.tcp://myCluster@127.0.0.1:2551"的節點。集群的名稱為其第一個加入的節點的名字決定,其他后加入的節點的名稱應當與其保持一致。當這個單節點集群創建完畢后,這個單節點就成為seedNode,也就是說,其他節點通過向種子節點發出Join指令,就可以加入集群。

  akka.tcp://myCluster@127.0.0.1:2552節點
application.conf

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}
package nathan

import akka.actor.{ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).joinSeedNodes(List(Address(protocol = "akka.tcp",system = "myCluster1",host = "127.0.0.1",port = 2551)))
}

  Cluster(actorSystem).joinSeedNodes(List(address))代碼作用向某個種子節點發出Join命令以加入集群。這里填寫的種子節點越多越好,這樣消息在集群中擴散可以更快。

監聽集群節點狀態

  集群時間有如下幾種有如下幾種:MemberJoinedMemberWeaklyUpMemberUpMemberLeftMemberExitedMemberRemovedLeaderChangedRoleLeaderChangedUnreachableMemberReachableMember等等。

class ListenClusterActor extends Actor {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)
  override def receive: Receive = {
    case MemberJoined(member) =>
      println("join:" + member)
    case MemberUp(member) =>
      println("up:" + member)
    case MemberExited(member) =>
      println("exited:" + member)
    case MemberRemoved(member,previousStatus) =>
      println("removed:" + member+" before status:"+previousStatus)
    case UnreachableMember(member) =>
      println("unreachable:" + member)
  }
}

當其他節點加入集群時和離開時,打印如下:

join:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Joining)
up:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Up)
exited:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Exiting)
removed:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Removed) before status:Exiting


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM