scala actor多線程(十五)


Scala的Actor類似於Java中的多線程編程。但是不同的是,Scala的Actor提供的模型與多線程有所不同。Scala的Actor盡可能地避免鎖和共享狀態,從而避免多線程並發時出現資源爭用的情況,進而提升多線程編程的性能。此外,Scala Actor的這種模型還可以避免死鎖等一系列傳統多線程編程的問題。

Spark中使用的分布式多線程框架,是Akka。Akka也實現了類似Scala Actor的模型,其核心概念同樣也是Actor。因此只要掌握了Scala Actor,那么在Spark源碼研究時,至少即可看明白Akka Actor相關的代碼。但是,換一句話說,由於Spark內部有大量的Akka Actor的使用,因此對於Scala Actor也至少必須掌握,這樣才能學習Spark源碼。

1、Actor的創建、啟動和消息收發(案例:Actor Hello World)

// Scala提供了Actor trait來讓我們更方便地進行actor多線程編程,就Actor trait就類似於Java中的Thread和Runnable一樣,是基礎的多線程基類和接口。我們只要重寫Actor trait的act方法,即可實現自己的線程執行體,與Java中重寫run方法類似。
// 此外,使用start()方法啟動actor;使用!符號,向actor發送消息;actor內部使用receive和模式匹配接收消息

// 案例:Actor Hello World
object test{
  def main(args: Array[String]): Unit = {
    import scala.actors.Actor //已棄用,改成import akka.actor.Actor,需加模塊
    class HelloActor extends Actor {
      def act() {
        while (true) {
          receive {
            case name: String => println("Hello, " + name)
          }
        }
      }
    }

    val helloActor = new HelloActor
    helloActor.start()
    helloActor ! "leo"
  }
}

 

2、收發case class類型的消息(案例:用戶注冊登錄后台接口)

// Scala的Actor模型與Java的多線程模型之間,很大的一個區別就是,Scala Actor天然支持線程之間的精准通信;即一個actor可以給其他actor直接發送消息。這個功能是非常強大和方便的。
// 要給一個actor發送消息,需要使用“actor ! 消息”的語法。在scala中,通常建議使用樣例類,即case class來作為消息進行發送。然后在actor接收消息之后,可以使用scala強大的模式匹配功能來進行不同消息的處理。
// 案例:用戶注冊登錄后台接口
case class Login(username: String, password: String)
case class Register(username: String, password: String)
class UserManageActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Login(username, password) => println("login, username is " + username + ", password is " + password)
        case Register(username, password) => println("register, username is " + username + ", password is " + password)
      }
    }
  }
}
val userManageActor = new UserManageActor
userManageActor.start()
userManageActor ! Register("leo", "1234"); userManageActor ! Login("leo", "1234")

 

3、Actor之間互相收發消息(案例:打電話)

import scala.actors.Actor

case class Message(content: String, sender: Actor)
class LeoTelephoneActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Message(content, sender) => { println("leo telephone: " + content); sender ! "I'm leo, please call me after 10 minutes." }
      }
    }
  }
}
class JackTelephoneActor(val leoTelephoneActor: Actor) extends Actor {
  def act() {
    leoTelephoneActor ! Message("Hello, Leo, I'm Jack.", this)
    receive {
      case response: String => println("jack telephone: " + response)
    }
  }
}
object test{
  def main(args: Array[String]): Unit = {
    val leo=new LeoTelephoneActor
    val jack=new JackTelephoneActor(leo)
    leo.start
    jack.start
  }
}

 

4、同步消息和Future

// 默認情況下,消息都是異步的;但是如果希望發送的消息是同步的,即對方接受后,一定要給自己返回結果,那么可以使用!?的方式發送消息。即val reply = actor !? message。

// 如果要異步發送一個消息,但是在后續要獲得消息的返回值,那么可以使用Future。即!!語法。val future = actor !! message。val reply = future()。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM