akka構建簡單分布式應用


當程序的要求達到一台計算機的極限時,我們便需要將程序分布式化,讓程序運行在多台計算機上。akka提供了remote actor用來構建分布式應用。

一、remote actor

1.Actor path

  actor的路徑設計采用了類似URL的形式,即scheme://domain:port/path。scheme代表協議(http或者ftp),domain代表域名或者ip地址,port代表端口,path代表路徑。所以表示一個actor的路徑是akka://ServerSys@10.102.141.77:2552/user/SomeActor。路徑表示遠程actor的主機ip是10.102.141.77,端口是2552,actorsystem是ServerSys,Actor的名字是SomeActor。通過Actor path,我們就可以遠程訪問一個actor,進而進行消息的傳遞。

2.Actor引用

當知道遠程actor的url后,我們便可以遠程訪問一個actor。訪問通過引用遠程actor來實現。

val actor = context.actorFor("akka://actorSystemName@10.0.0.1:2552/user/actorName")

一旦得到了actor的引用,你就可以象與本地actor通訊一樣與它進行通迅了

actor ! "Pretty awesome feature"

二、一個簡單例子

有一個本地actor:LocalActor,一個遠程actor:RemoteActor。我們要實現相互之間的通信。LocalActor向RemoteActor發送一個消息"Hi there",RemoteActor返回"Hi there got something"。

1.remote端。

remote端的目錄結構如下

 包含四個文件:application.conf,build.sbt,RemoteNodeApplication.scala,RemoteActor.scala

application.conf:

RemoteSys {
    akka {
          actor {
            provider = "akka.remote.RemoteActorRefProvider"
          }
       remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        netty {
          hostname = "192.168.178.192"
          port = 2552
        }
      }
    }
}
View Code

build.sbt

name := "RemotingExampleRemoteNode"

version := "1.0"

scalaVersion := "2.9.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-actor" % "2.0.2",
"com.typesafe.akka" % "akka-remote" % "2.0.2",
"com.typesafe.akka" % "akka-kernel" % "2.0.2"
)
View Code

RemoteActor.scala

package org.akka.essentials.remotenode
import akka.actor.Actor

class RemoteActor extends Actor {
  def receive: Receive = {
    case message: String =>
      // Get reference to the message sender and reply back
      sender.tell(message + " 192.168.178.192 got something")
  }
}
View Code

RemoteNodeApplication.scala

package org.akka.essentials.remotenode
import akka.kernel.Bootable
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory

object RemoteActorSystem{
  def main(args: Array[String]):Unit = {
    val system = ActorSystem("RemoteNodeApp", ConfigFactory.load().getConfig("RemoteSys"))
    val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")
  }
}
View Code

sbt package進行編譯,然后sbt run運行程序。

Remote端如果要以獨立微內核的形式使用,RemoteNodeApplication.scala如下

RemoteNodeApplication.scala

package org.akka.essentials.remotenode
import akka.kernel.Bootable
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory

class RemoteNodeApplication extends Bootable {
  val system = ActorSystem("RemoteNodeApp", ConfigFactory
    .load().getConfig("RemoteSys"))

  def startup = {
    system.actorOf(Props[RemoteActor], name = "remoteActor")
  }

  def shutdown = {
    system.shutdown()
  }
}
View Code

微內核的使用參考參考文獻4.

2.Local端程序

目錄結構同Remote端。也是包含四個文件:application.conf,build.sbt,LocalActor.scala,LocalNodeApplication.scala。

application.conf

LocalSys {
    akka {
          actor {
            provider = "akka.remote.RemoteActorRefProvider"
          }
    }
}
View Code

build.sbt

name := "RemotingExampleLocalNode"

version := "1.0"

scalaVersion := "2.9.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-actor" % "2.0.2",
"com.typesafe.akka" % "akka-remote" % "2.0.2",
"com.typesafe.akka" % "akka-kernel" % "2.0.2"
)
View Code

LocalActor.scala

package org.akka.essentials.localnode
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Address
import akka.actor.Deploy
import akka.actor.Props
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.RemoteScope
import akka.util.duration.intToDurationInt
import akka.util.Timeout

class LocalActor extends Actor with ActorLogging {

  //Get a reference to the remote actor
  val remoteActor = context.actorFor("akka://RemoteNodeApp@192.168.178.192:2552/user/remoteActor")
  implicit val timeout = Timeout(5 seconds)
  def receive: Receive = {
    case message: String =>
      val future = (remoteActor ? message).mapTo[String]
      val result = Await.result(future, timeout.duration)
      log.info("Message received from Server -> {}", result)
  }
}
View Code

LocalNodeApplication.scala

package org.akka.essentials.localnode
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object LocalNodeApplication {

  def main(args: Array[String]): Unit = {
    // load the configuration
    val config = ConfigFactory.load().getConfig("LocalSys")
    val system = ActorSystem("LocalNodeApp", config)
    val clientActor = system.actorOf(Props[LocalActor])
    clientActor ! "Hello"
    Thread.sleep(4000)
    system.shutdown()
  }
}
View Code

運行結果如下(拖動圖片或者另存為可以看大圖)

 local端和Remote端的代碼見:https://github.com/hequn8128/akka/tree/master/AkkaRemotingExample

參考文獻:

1.akka官方文檔中文版:http://www.gtan.com/akka_doc/index.html

2.akka essential by Munish K.G

3.akka essential code:https://github.com/write2munish/Akka-Essentials/tree/master/AkkaRemotingExample

4.akka微內核:http://www.gtan.com/akka_doc/modules/microkernel.html#microkernel


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM