Scala學習之路 (十)Scala的Actor


一、Scala中的並發編程

1、Java中的並發編程

①Java中的並發編程基本上滿足了事件之間相互獨立,但是事件能夠同時發生的場景的需要。 

②Java中的並發編程是基於共享數據和加鎖的一種機制,即會有一個共享的數據,然后有若干個線程去訪問這個共享的數據(主要是對這個共享的數據進行修改),同時Java利用加鎖的機制(即synchronized)來確保同一時間只有一個線程對我們的共享數據進行訪問,進而保證共享數據的一致性。 

③Java中的並發編程存在資源爭奪和死鎖等多種問題,因此程序越大問題越麻煩。 

2、Scala中的並發編程

①Scala中的並發編程思想與Java中的並發編程思想完全不一樣,Scala中的Actor是一種不共享數據,依賴於消息傳遞的一種並發編程模式, 避免了死鎖、資源爭奪等情況。在具體實現的過程中,Scala中的Actor會不斷的循環自己的郵箱,並通過receive偏函數進行消息的模式匹配並進行相應的處理。 

②如果Actor A和 Actor B要相互溝通的話,首先A要給B傳遞一個消息,B會有一個收件箱,然后B會不斷的循環自己的收件箱, 若看見A發過來的消息,B就會解析A的消息並執行,處理完之后就有可能將處理的結果通過郵件的方式發送給A。

二、Scala中的Actor

1、什么是Actor

一個actor是一個容器,它包含 狀態, 行為,信箱,子Actor 和 監管策略,所有這些包含在一個ActorReference(Actor引用)里。一個actor需要與外界隔離才能從actor模型中獲益,所以actor是以actor引用的形式展現給外界的

2、ActorSystem的層次結構

如果一個Actor中的業務邏輯非常復雜,為了降低代碼的復雜度,可以將其拆分成多個子任務(在一個actor的內部可以創建一個或多個actor,actor的創建者也是該actor的監控者) 

一個ActorSystem應該被正確規划,例如哪一個Actor負責監控,監控什么等等:

    • 負責分發的actor管理接受任務的actor
    • 擁有重要數據的actor,找出所有可能丟失數據的子actor,並且處理他們的錯誤。

3、ActorPath

ActorPath是通過字符串描述Actor的層級關系,並唯一標識一個Actor的方法。

ActorPath包含協議,位置 和 Actor層級關系

//本地path
"akka://my-sys/user/service-a/worker1"   

//遠程path  akka.tcp://(ActorSystem的名稱)@(遠程地址的IP):(遠程地址的端口)/user/(Actor的名稱)
"akka.tcp://my-sys@host.example.com:5678/user/service-b" 

//akka集群
"cluster://my-cluster/service-c"

 遠程地址不清楚是多少的話,可以在遠程的服務啟動的時候查看

4、獲取Actor Reference

獲取Actor引用的方式有兩種:創建 和 查找。 

要創建Actor,可以調用ActorSystem.actorOf(..),它創建的actor在guardian actor之下,接着可以調用ActorContext的actorOf(…) 在剛才創建的Actor內生成一個actor樹。這些方法會返回新創建的actor的引用,每一個actor都可以通過訪問ActorContext來獲得自己(self),子Actor(children,child)和父actor(parent)。


要查找Actor Reference,可以調用ActorSystem或ActorContext的actorSelection(“path”),在查找ActorRef時,可以使用相對路徑或絕對路徑,如果是相對路徑,可以用 .. 來表示parent actor。

actorOf / actorSelection / actorFor的區別

  • actorOf 創建一個新的actor,創建的actor為調用該方法所屬的context的直接子actor。

  • actorSelection 查找現有actor,並不會創建新的actor。

  • actorFor 查找現有actor,不創建新的actor,已過時。

5、Actor和ActorSystem

Actor:
就是用來做消息傳遞的
用來接收和發送消息的,一個actor就相當於是一個老師或者是學生。
如果我們想要多個老師,或者學生,就需要創建多個actor實例。
ActorSystem:
用來創建和管理actor,並且還需要監控Actor。ActorSystem是單例的(object)
在同一個進程里面,只需要一個ActorSystem就可以了

三、Actor的示例

1、示例說明

2、代碼實現

MyResourceManager.scala(服務端)

package com.rpc

import akka.actor._
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable

class MyResourceManager(var resourceManagerHostName:String, var resourceManagerPort:Int) extends Actor {
  /**
    * 定義一個Map,接受MyNodeManager的注冊信息,key是主機名,
    * value是NodeManagerInfo對象,里面存儲主機名、CPU和內存信息
    * */
  var registerMap = new mutable.HashMap[String,NodeManagerInfo]()
  /**
    * 定義一個Set,接受MyNodeManager的注冊信息,key是主機名,
    * value是NodeManagerInfo對象,里面存儲主機名、CPU和內存信息
    * 實際上和上面的Map里面存檔內容一樣,容易變歷,可以不用寫,主要是模仿后面Spark里面的內容
    * 方便到時理解Spark源碼
    * */
  var registerSet = new mutable.HashSet[NodeManagerInfo]()


  override def preStart(): Unit = {
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, 5000 millis, self,CheckTimeOut)
  }

  //對MyNodeManager傳過來的信息進行匹配
  override def receive: Receive = {
    //匹配到NodeManager的注冊信息進行對應處理
    case NodeManagerRegisterMsg(nodeManagerID,cpu,memory) => {
      //將注冊信息實例化為一個NodeManagerInfo對象
      val registerMsg = new NodeManagerInfo(nodeManagerID,cpu,memory)
      //將注冊信息存儲到registerMap和registerSet里面,key是主機名,value是NodeManagerInfo對象
      registerMap.put(nodeManagerID,registerMsg)
      registerSet += registerMsg
      //注冊成功之后,反饋個MyNodeManager一個成功的信息
      sender() ! new RegisterFeedbackMsg("注冊成功!" + resourceManagerHostName+":"+resourceManagerPort)
    }
    //匹配到心跳信息做相應處理
    case HeartBeat(nodeManagerID) => {
      //獲取當前時間
      val time:Long = System.currentTimeMillis()
      //根據nodeManagerID獲取NodeManagerInfo對象
      val info = registerMap(nodeManagerID)
      info.lastHeartBeatTime = time
      //更新registerMap和registerSet里面nodeManagerID對應的NodeManagerInfo對象信息(最后一次心跳時間)
      registerMap(nodeManagerID) = info
      registerSet += info
    }
    //檢測超時,對超時的數據從集合中刪除
    case CheckTimeOut => {
      var time = System.currentTimeMillis()
      registerSet
        .filter( nm => time - nm.lastHeartBeatTime > 10000)
        .foreach(deadnm => {
          registerSet -= deadnm
          registerMap.remove(deadnm.nodeManagerID)
        })
      println("當前注冊成功的節點數:" + registerMap.size)
    }
  }
}

object MyResourceManager {
  def main(args: Array[String]): Unit = {
    /**
      * 傳參:
      *   ResourceManager的主機地址、端口號
      * */
    val RM_HOSTNAME = args(0)
    val RM_PORT = args(1).toInt

    val str:String =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =localhost
        |akka.remote.netty.tcp.port=19888
      """.stripMargin
    val conf: Config = ConfigFactory.parseString(str)
    val actorSystem = ActorSystem(Conf.RMAS,conf)
    actorSystem.actorOf(Props(new MyResourceManager(RM_HOSTNAME,RM_PORT)),Conf.RMA)
  }
}
View Code

MyNodeManager.scala(客戶端)

package com.rpc

import java.util.UUID

import akka.actor._
import com.typesafe.config.{Config, ConfigFactory}

class MyNodeManager(resourceManagerHostName:String,resourceManagerPort:Int,cpu:Int,memory:Int) extends Actor{
  //MyNodeManager的UUID
  var nodeManagerID:String = _
  var rmref:ActorSelection = _
  override def preStart(): Unit = {
    //獲取MyResourceManager的Actor的引用
    rmref = context.actorSelection(s"akka.tcp://${Conf.RMAS}@${resourceManagerHostName}:${resourceManagerPort}/user/${Conf.RMA}")
    //生成隨機的UUID
    nodeManagerID = UUID.randomUUID().toString
    /**
      * 向MyResourceManager發送注冊信息
      * */
    rmref ! NodeManagerRegisterMsg(nodeManagerID,cpu,memory)

  }
  //進行信息匹配
  override def receive: Receive = {
    //匹配到注冊成功之后MyResourceManager反饋回的信息,進行相應處理
    case RegisterFeedbackMsg(feedbackMsg) => {
      /**
        * initialDelay: FiniteDuration, 多久以后開始執行
        * interval:     FiniteDuration, 每隔多長時間執行一次
        * receiver:     ActorRef, 給誰發送這個消息
        * message:      Any  發送的消息是啥
        */
      //定時任務需要導入的工具包
      import scala.concurrent.duration._
      import context.dispatcher
      //定時向自己發送信息
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendMessage)
    }
    //匹配到SendMessage信息之后做相應處理
    case SendMessage => {
      //向MyResourceManager發送心跳信息
      rmref ! HeartBeat(nodeManagerID)
      println(Thread.currentThread().getId + ":" + System.currentTimeMillis())
    }
  }
}

object MyNodeManager {
  def main(args: Array[String]): Unit = {
    /**
      * 傳參:
      *   NodeManager的主機地址、端口號、CPU、內存
      *   ResourceManager的主機地址、端口號
      * */
    val NM_HOSTNAME = args(0)
    val NM_PORT = args(1)
    val NM_CPU:Int = args(2).toInt
    val NM_MEMORY:Int = args(3).toInt

    val RM_HOSTNAME = args(4)
    val RM_PORT = args(5).toInt

    val str:String =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = ${NM_HOSTNAME}
        |akka.remote.netty.tcp.port = ${NM_PORT}
      """.stripMargin
    val conf: Config = ConfigFactory.parseString(str)
    val actorSystem = ActorSystem(Conf.NMAS,conf)
    actorSystem.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NM_CPU,NM_MEMORY)),Conf.NMA)
  }
}
View Code

Conf.scala(配置文件)

package com.rpc

//避免硬編碼
object Conf {
  //ResourceManagerActorSystem
  val RMAS = "MyRMActorSystem"
  //ResourceManagerActor
  val RMA = "MyRMActor"
  //NodeManagerActorSystem
  val NMAS = "MyNMActorSystem"
  //NodeManagerActor
  val NMA = "MyNMactor"
}
View Code

Message.scala

package com.rpc
//NodeManager注冊信息
case class NodeManagerRegisterMsg(val nodeManagerID:String, var cpu:Int, var memory:Int)
//ResourceManager接收到注冊信息成功之后的返回信息
case class RegisterFeedbackMsg(val feedbackMsg: String)
//NodeManager的心跳信息
case class HeartBeat(val nodeManagerID:String)
//NodeManager注冊信息
class NodeManagerInfo(val nodeManagerID:String, var cpu:Int, var memory:Int){
  //定義一個屬性,存儲上一次的心跳時間
  var lastHeartBeatTime:Long = _
}

case object SendMessage
case object CheckTimeOut
View Code

3、運行

(1)運行MyResourceManager

運行結果

發現報錯數組越界,原因是在啟動時需要傳入2個參數

重新啟動,啟動成功

2、運行MyNodeManager

報相同的錯誤,不過此處需要傳入6個參數

重新啟動,啟動成功

3、觀察MyResourceManager

發現有一個節點連接成功

4、再啟動一個MyNodeManager觀察情況

先修改MyNodeManager配置里面的端口

再啟動

啟動成功之后觀察MyResourceManager,此時有2個節點連接成功

5、關閉一個節點,觀察情況

集合中連接超時的成功刪除

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM