添加需要的maven依賴
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>ssl-config-akka_2.11</artifactId>
<version>0.1.2</version>
</dependency>
<!-- 添加scala的依賴 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.5.23</version>
</dependency>
<!-- 添加akka的actor依賴 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.23</version>
</dependency>
Actor (線程)
Actor信息傳遞
import scala.actors.{Actor, Future}
object ActorDM {
def main(args: Array[String]): Unit = {
// 創建對象MyActor
val myActor1 = new MyActor()
// 啟動線程
myActor1.start()
// 發送異步消息,沒有返回值
myActor1 ! "this is one info!"
// 發送同步消息,阻塞等待返回值
val result: Any = myActor1 !? "this is two info!"
println(result)
// 發送異步消息, 有返回值
val future: Future[Any] = myActor1 !! "this is three info"
// 等待3秒,
Thread.sleep(3000)
// 判斷是否有值返回
if(future.isSet){
// 取出返回的值
println(future.apply())
}else{
println("None")
}
// 發送對象
myActor1 ! new AC("ac name")
// 關閉線程
myActor1 ! "stop"
}
case class AC(name: String) {}
class MyActor extends Actor {
// 重寫act方法,類似java的Thread的run方法
override def act(): Unit = {
var flag: Boolean = true
while (flag){
receive{
// 接收字符串
case str: String => {
if(str.equals("stop")){
flag = false
}
println(s"接收的信息是: $str")
sender ! s"發送的 $str 已收到!"
}
// 接收 AC()對象
case AC(name) => println(s"AC name = $name")
}
}
}
}
}
接收的信息是: this is one info!
接收的信息是: this is two info!
發送的 this is two info! 已收到!
接收的信息是: this is three info
發送的 this is three info 已收到!
AC name = ac name
接收的信息是: stop
Actor信息互傳
def main(args: Array[String]): Unit = {
val teacher = new Teacher()
teacher.start()
val student = new Student(teacher)
student.start()
student ! Request("Hi teacher!")
}
case class Request(question: String) {}
case class Response(answer: String) {}
// student線程, 傳入teacher
class Student(teacher: Teacher) extends Actor {
override def act(): Unit = {
while (true) {
receive {
// 接收學生的問題, 將問題內容發送給老師.
case Request(question) => teacher ! Request(question)
// 接收老師的回答, 打印回答信息
case Response(answer) => println(s"teacher answer is : $answer")
}
}
}
}
class Teacher extends Actor {
override def act(): Unit = {
while (true) {
receive {
// 接收問題, 回應發送者.
case Request(question) => sender ! Response("I am teacher this is my answer!")
}
}
}
}
teacher answer is I am teacher this is my answer!
AkkA
信息發送
import akka.actor.{Actor, ActorSystem, Props}
class HelloActor extends Actor{
// 重寫接受消息的偏函數,其功能是接受消息並處理
override def receive: Receive = {
case 1 => println("this is first line")
case 2 => println("this is two line")
case 3 => {
println("stop actorRef")
context.stop(self) // 停止自己的actorRef
println("stop ActorSystem")
context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)
}
}
}
object ActorDM {
/**
* 創建線程池對象MyFactory,用來創建actor的對象的
*/
private val MyFactory = ActorSystem("myFactory") //里面的"myFactory"參數為線程池的名稱
/**
* 通過MyFactory.actorOf方法來創建一個actor,注意,Props方法的第一個參數需要傳遞我們自定義的HelloActor類,
* 第二個參數是給actor起個名字
*/
private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")
def main(args: Array[String]): Unit = {
var flag = true
while(flag){
/**
* 使用helloActorRef來給自己發送消息,helloActorRef有一個叫做感嘆號("!")的方法來發送消息
*/
for(num <- 1 to 3){
if (num < 3){
helloActorRef ! num
}else if(num == 3){
flag = false
println("程序即將結束!")
helloActorRef ! num
}
}
/**
* 為了不讓while的運行速度在receive方法之上,我們可以讓他休眠0.1秒
*/
Thread.sleep(100)
}
}
}
信息交互
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
// 傳入一個 ActorRef 對象
class User1(val user2: ActorRef) extends Actor {
override def receive: Receive = {
case "開始" => user2 ! "到你了"
case "到你了" => { // 再次發送信息給user2
println("User1: 我的完成了!")
user2 ! "到你了"
}
}
}
class User2 extends Actor {
override def receive: Receive = {
case "到你了" => {
println("User2: 將軍!")
Thread.sleep(2000)
// 反饋信息給 user1
sender() ! "到你了"
}
}
}
object AkkaActorDM extends App {
// 創建 actorSystem的工廠,用來生產ActorRef對象!
private val actorSystem = ActorSystem("local_Actor")
// 創建user2 的ActorRef對象
private val user2 = actorSystem.actorOf(Props[User2], "user2")
// Props(new User1(user2)) 來創建需要傳參的user1類
private val user1 = actorSystem.actorOf(Props(new User1(user2)), "user1")
// 發送開始信號
user1 ! "開始"
}
服務端和客戶端交互程序
Message.scala
case class ServerMessage(str: String) {}
case class ClientMessage(msg: Any) {}
ServerAKKA.scala
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
// 繼承 akka 的trait Actor
class ServerAKKA extends Actor {
// 反復調用,接收發送的信息
override def receive: Receive = {
case "start" => println("服務已啟動!")
case ClientMessage(msg) => { // 接收客戶端發送的信息
if (msg.equals("stop")) {
context.stop(self) // 停止自己的actorRef
context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)
}
println(s"來自客戶端的信息是: " + msg)
// 返回信息響應客戶端
sender ! ServerMessage(s"你發送的 [ $msg ] 信息服務器已收到!")
}
case _ => println("Other info!")
}
}
object ServerAKKA {
def main(args: Array[String]): Unit = {
// 服務器IP
val host = "192.168.1.104"
// 服務器端口
val port = "8888"
// 設置配置字符串
val strConf =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
// 解析字符串
val config: Config = ConfigFactory.parseString(strConf)
// 創建使用伴生對象的apply方法創建ActorSystem
val actorSystem = ActorSystem("ServerAKKA", config)
//通過ServerAKKA類型,反射創建實例
val server = actorSystem.actorOf(Props[ServerAKKA], "server")
server ! "start"
}
}
ClientAKKA.scala
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
class ClientAKKA extends Actor {
var server: ActorSelection = _
// 在Actor構造方法后執行,但是在receive方法之前執行, 只執行一次,做一些初始化的操作.
override def preStart(): Unit = {
val serverName = "server"
// 連接服務器的鏈接,啟動服務器時控制台會打印連接的地址
// akka.tcp://服務器ActorSystem名@服務器IP:服務器端口/user/Actor名
server = context.actorSelection(s"akka.tcp://ServerAKKA@192.168.1.104:8888/user/$serverName")
}
override def receive: Receive = {
// 接收服務器的信息
case ServerMessage(str) => {
println(s"來自服務器的信息: " + str)
}
// 接收客戶端的信息
case ClientMessage(msg) => {
server ! ClientMessage(msg)
}
}
}
object ClientAKKA {
def main(args: Array[String]): Unit = {
// 客戶端IP
val host = "192.168.1.104"
// 客戶端端口, 同一機器時不能使用服務器的端口(已經占用)
val port = "8889"
val strConf =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val conf = ConfigFactory.parseString(strConf)
val actorSystem = ActorSystem("ClientAKKA", conf)
val client = actorSystem.actorOf(Props[ClientAKKA], "client")
// 不斷從控制台接收輸入發送到服務器.
var flag = true
while (flag) {
Thread.sleep(1000)
val info = StdIn.readLine("請輸入需要發送的信息:")
if (!info.equals("stop")) {
client ! ClientMessage(info)
} else {
flag = false
}
}
}
}
定時任務
package AKKA
import akka.actor.{Actor, ActorSystem, Props}
import scala.util.Random
case class Task()
class TimingTask extends Actor {
var random = new Random()
override def receive: Receive = {
case "start" => {
// 導入需要的包
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
// 設置定時任務, 發送Task給自己
context.system.scheduler.schedule(0 millis, 5000 millis, self, Task)
println("定時任務開始")
}
case Task => {
println("本次的隨機數為: " + random.nextInt())
}
}
}
object TimingTask {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("ActorSystem")
val actor = actorSystem.actorOf(Props[TimingTask], "actor")
actor ! "start"
}
}
