Actor和AKKA的使用


添加需要的maven依賴

<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>ssl-config-akka_2.11</artifactId>
    <version>0.1.2</version>
</dependency>

<!-- 添加scala的依賴 -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-slf4j_2.11</artifactId>
    <version>2.5.23</version>
</dependency>

<!-- 添加akka的actor依賴 -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.11</artifactId>
    <version>2.5.23</version>
</dependency>

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.23</version>
</dependency>

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-remote_2.11</artifactId>
    <version>2.5.23</version>
</dependency>

Actor (線程)

Actor信息傳遞

import scala.actors.{Actor, Future}

object ActorDM {
  def main(args: Array[String]): Unit = {

    // 創建對象MyActor
    val myActor1 = new MyActor()
    // 啟動線程
    myActor1.start()

    // 發送異步消息,沒有返回值
    myActor1 ! "this is one info!"

    // 發送同步消息,阻塞等待返回值
    val result: Any = myActor1 !? "this is two info!"
    println(result)

    // 發送異步消息, 有返回值
    val future: Future[Any] = myActor1 !! "this is three info"
    // 等待3秒,
    Thread.sleep(3000)
    // 判斷是否有值返回
    if(future.isSet){
      // 取出返回的值
      println(future.apply())
    }else{
      println("None")
    }

    // 發送對象
    myActor1 ! new AC("ac name")

    // 關閉線程
    myActor1 ! "stop"


  }

  case class AC(name: String) {}

  class MyActor extends Actor {
    
    // 重寫act方法,類似java的Thread的run方法
    override def act(): Unit = {

      var flag: Boolean = true

      while (flag){
        receive{
          // 接收字符串
          case str: String => {
            if(str.equals("stop")){
              flag = false
            }
            println(s"接收的信息是: $str")
            sender ! s"發送的 $str 已收到!"
          }
            // 接收 AC()對象
          case AC(name) => println(s"AC name = $name")
        }
      }
    }
  }
}
接收的信息是: this is one info!
接收的信息是: this is two info!
發送的 this is two info! 已收到!
接收的信息是: this is three info
發送的 this is three info 已收到!
AC name = ac name
接收的信息是: stop

Actor信息互傳

def main(args: Array[String]): Unit = {
    val teacher = new Teacher()
    teacher.start()
    val student = new Student(teacher)
    student.start()

    student ! Request("Hi teacher!")

}

case class Request(question: String) {}

case class Response(answer: String) {}

// student線程, 傳入teacher
class Student(teacher: Teacher) extends Actor {
    override def act(): Unit = {
        while (true) {
            receive {
                // 接收學生的問題, 將問題內容發送給老師.
                case Request(question) => teacher ! Request(question)
                // 接收老師的回答, 打印回答信息
                case Response(answer) => println(s"teacher answer is : $answer")
            }
        }
    }
}

class Teacher extends Actor {
    override def act(): Unit = {
        while (true) {
            receive {
                // 接收問題, 回應發送者.
                case Request(question) => sender ! Response("I am teacher this is my answer!")
            }
        }
    }
}
teacher answer is I am teacher this is my answer!

AkkA

信息發送

import akka.actor.{Actor, ActorSystem, Props}

class HelloActor extends Actor{
  // 重寫接受消息的偏函數,其功能是接受消息並處理
  override def receive: Receive = {
    case 1 => println("this is first line")
    case 2 => println("this is two line")
    case 3 => {
      println("stop actorRef")
      context.stop(self) // 停止自己的actorRef

      println("stop ActorSystem")
      context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)

    }
  }
}

object ActorDM {
  /**
    * 創建線程池對象MyFactory,用來創建actor的對象的
    */
  private val MyFactory = ActorSystem("myFactory")    //里面的"myFactory"參數為線程池的名稱
  /**
    *     通過MyFactory.actorOf方法來創建一個actor,注意,Props方法的第一個參數需要傳遞我們自定義的HelloActor類,
    *     第二個參數是給actor起個名字
    */
  private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")

  def main(args: Array[String]): Unit = {
    var flag = true
    while(flag){
      /**
        * 使用helloActorRef來給自己發送消息,helloActorRef有一個叫做感嘆號("!")的方法來發送消息
        */
      for(num <- 1 to 3){
        if (num < 3){
          helloActorRef ! num
        }else if(num == 3){
          flag = false
          println("程序即將結束!")
          helloActorRef ! num
        }
      }

      /**
        * 為了不讓while的運行速度在receive方法之上,我們可以讓他休眠0.1秒
        */
      Thread.sleep(100)
    }
  }
}

信息交互

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

// 傳入一個 ActorRef 對象
class User1(val user2: ActorRef) extends Actor {
  override def receive: Receive = {
    case "開始" => user2 ! "到你了"
    case "到你了" => { // 再次發送信息給user2
      println("User1: 我的完成了!")
      user2 ! "到你了"
    }
  }
}

class User2 extends Actor {
  override def receive: Receive = {
    case "到你了" => {
      println("User2: 將軍!")
      Thread.sleep(2000)
      // 反饋信息給 user1
      sender() ! "到你了"
    }
  }
}

object AkkaActorDM extends App {
  
  //  創建 actorSystem的工廠,用來生產ActorRef對象!
  private val actorSystem = ActorSystem("local_Actor")
  
  // 創建user2 的ActorRef對象 
  private val user2 = actorSystem.actorOf(Props[User2], "user2")
  
  // Props(new User1(user2)) 來創建需要傳參的user1類
  private val user1 = actorSystem.actorOf(Props(new User1(user2)), "user1")
  
  // 發送開始信號
  user1 ! "開始"
}

服務端和客戶端交互程序

Message.scala

case class ServerMessage(str: String) {}

case class ClientMessage(msg: Any) {}

ServerAKKA.scala

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

// 繼承 akka 的trait Actor
class ServerAKKA extends Actor {
  // 反復調用,接收發送的信息
  override def receive: Receive = {
    case "start" => println("服務已啟動!")
    case ClientMessage(msg) => { // 接收客戶端發送的信息
      if (msg.equals("stop")) {
        context.stop(self) // 停止自己的actorRef
        context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)
      }
      println(s"來自客戶端的信息是: " + msg)
      // 返回信息響應客戶端
      sender ! ServerMessage(s"你發送的 [ $msg ] 信息服務器已收到!")
    }
    case _ => println("Other info!")
  }
}

object ServerAKKA {
  def main(args: Array[String]): Unit = {
    // 服務器IP
    val host = "192.168.1.104"
    // 服務器端口
    val port = "8888"

    // 設置配置字符串
    val strConf =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
     """.stripMargin

    // 解析字符串
    val config: Config = ConfigFactory.parseString(strConf)

    // 創建使用伴生對象的apply方法創建ActorSystem
    val actorSystem = ActorSystem("ServerAKKA", config)

    //通過ServerAKKA類型,反射創建實例
    val server = actorSystem.actorOf(Props[ServerAKKA], "server")

    server ! "start"

  }
}

ClientAKKA.scala

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn


class ClientAKKA extends Actor {

  var server: ActorSelection = _

  // 在Actor構造方法后執行,但是在receive方法之前執行, 只執行一次,做一些初始化的操作.
  override def preStart(): Unit = {

    val serverName = "server"

    // 連接服務器的鏈接,啟動服務器時控制台會打印連接的地址
    // akka.tcp://服務器ActorSystem名@服務器IP:服務器端口/user/Actor名
    server = context.actorSelection(s"akka.tcp://ServerAKKA@192.168.1.104:8888/user/$serverName")

  }

  override def receive: Receive = {
    // 接收服務器的信息
    case ServerMessage(str) => {
      println(s"來自服務器的信息: " + str)
    }
    // 接收客戶端的信息
    case ClientMessage(msg) => {
      server ! ClientMessage(msg)
    }
  }
}

object ClientAKKA {
  def main(args: Array[String]): Unit = {
    // 客戶端IP
    val host = "192.168.1.104"
    // 客戶端端口, 同一機器時不能使用服務器的端口(已經占用)
    val port = "8889"

    val strConf =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val conf = ConfigFactory.parseString(strConf)

    val actorSystem = ActorSystem("ClientAKKA", conf)

    val client = actorSystem.actorOf(Props[ClientAKKA], "client")

    // 不斷從控制台接收輸入發送到服務器.
    var flag = true
    while (flag) {
      Thread.sleep(1000)
      val info = StdIn.readLine("請輸入需要發送的信息:")
      if (!info.equals("stop")) {
        client ! ClientMessage(info)
      } else {
        flag = false
      }
    }

  }
}

定時任務

package AKKA

import akka.actor.{Actor, ActorSystem, Props}

import scala.util.Random


case class Task()

class TimingTask extends Actor {
  var random = new Random()

  override def receive: Receive = {
    case "start" => {
      // 導入需要的包
      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.duration._
      // 設置定時任務, 發送Task給自己
      context.system.scheduler.schedule(0 millis, 5000 millis, self, Task)

      println("定時任務開始")
    }
    case Task => {
      println("本次的隨機數為: " + random.nextInt())
    }
  }
}

object TimingTask {

  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("ActorSystem")
    val actor = actorSystem.actorOf(Props[TimingTask], "actor")
    actor ! "start"
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM