scala當中的Actor並發編程


注:Scala Actor是scala 2.10.x版本及以前版本的Actor。

Scala在2.11.x版本中將Akka加入其中,作為其默認的Actor,老版本的Actor已經廢棄。

1、什么是Scala Actor

  1. Scala中的並發編程思想與Java中的並發編程思想完全不一樣,Scala中的Actor是一種不共享數據,依賴於消息傳遞的一種並發編程模式, 避免了死鎖、資源爭奪等情況。在具體實現的過程中,Scala中的Actor會不斷的循環自己的郵箱,並通過receive偏函數進行消息的模式匹配並進行相應的處理。 
  2. 如果Actor A和 Actor B要相互溝通的話,首先A要給B傳遞一個消息,B會有一個收件箱,然后B會不斷的循環自己的收件箱, 若看見A發過來的消息,B就會解析A的消息並執行,處理完之后就有可能將處理的結果通過郵件的方式發送給A

       

概念

Scala中的Actor能夠實現並行編程的強大功能,它是基於事件模型的並發機制,Scala是運用消息的發送、接收來實現高並發的

Actor可以看作是一個個獨立的實體,他們之間是毫無關聯的。但是,他們可以通過消息來通信。一個Actor收到其他Actor的信息后,它可以根據需要作出各種相應。消息的類型可以是任意的,消息的內容也可以是任意的。

java並發編程與Scala Actor編程的區別

對於Java,我們都知道它的多線程實現需要對共享資源(變量、對象等)使用synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。而且,常常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。原因就在於Java中多數使用的是可變狀態的對象資源,對這些資源進行共享來實現多線程編程的話,控制好資源競爭與防止對象狀態被意外修改是非常重要的,而對象狀態的不變性也是較難以保證的。

Java的基於共享數據和鎖的線程模型不同,Scala的actor包則提供了另外一種不共享任何數據、依賴消息傳遞的模型,從而進行並發編程。

   

Actor的執行順序

1、首先調用start()方法啟動Actor

2、調用start()方法后其act()方法會被執行

3、向Actor發送消息

4、act方法執行完成之后,程序會調用exit方法

   

   

發送消息的方式

!

發送異步消息,沒有返回值。

!?

發送同步消息,等待返回值。

!!

發送異步消息,返回值是 Future[Any]

注意:Future 表示一個異步操作的結果狀態,可能還沒有實際完成的異步任務的結果。

Any 是所有類的超類,Future[Any]的泛型是異步操作結果的類型。

2、Actor實戰

第一個例子

怎么實現actor並發編程:

1、定義一個class或者是object繼承Actor特質,注意導包import scala.actors.Actor

2、重寫對應的act方法

3、調用Actor的start方法執行Actor

4、當act方法執行完成,整個程序運行結束

import scala.actors.Actor

   

class Actor1 extends Actor{

override def act(): Unit = {

for(i <- 1 to 10){

println("actor1====="+i)

}

}

}

object Actor2 extends Actor{

override def act(): Unit = {

for(j <- 1 to 10){

println("actor2====="+j)

}

}

}

object Actor1{

def main(args: Array[String]): Unit = {

val actor = new Actor1

actor.act()

Actor2.act()

}

}

說明:上面分別調用了兩個單例對象的start()方法,他們的act()方法會被執行,相同與在java中開啟了兩個線程,線程的run()方法會被執行

注意:這兩個Actor是並行執行的,act()方法中的for循環執行完成后actor程序就退出

第二個例子

怎么實現actor發送、接受消息

1、定義一個class或者是object繼承Actor特質,注意導包import scala.actors.Actor

2、重寫對應的act方法

3、調用Actor的start方法執行Actor

4、通過不同發送消息的方式對actor發送消息

5、act方法中通過receive方法接受消息並進行相應的處理

6、act方法執行完成之后,程序退出

   

import scala.actors.Actor

class MyActor2 extends Actor{

override def act(): Unit = {

receive{

case "start" => println("starting......")

// case _ => println("我沒有匹配到任何消息")

}

}

}

object MyActor2{

def main(args: Array[String]): Unit = {

val actor = new MyActor2

actor.start()

actor ! "start"

}

}

第三個例子

怎么實現actor可以不斷地接受消息:

在act方法中可以使用while(true)的方式,不斷的接受消息。

class MyActor3 extends Actor{

override def act(): Unit = {

while (true){

receive{

case "start" => println("starting")

case "stop" =>println("stopping")

}

}

}

}

object MyActor3{

def main(args: Array[String]): Unit = {

val actor = new MyActor3

actor.start()

actor ! "start"

actor ! "stop"

}

}

說明:在act()方法中加入了while (true) 循環,就可以不停的接收消息

注意:發送start消息和stop的消息是異步的,但是Actor接收到消息執行的過程是同步的按順序執行

   

第四個例子

使用react方法代替receive方法去接受消息

好處:react方式會復用線程,避免頻繁的線程創建、銷毀和切換。比receive更高效

注意: react 如果要反復執行消息處理,react外層要用loop,不能用while

   

class MyActor4 extends Actor{

override def act(): Unit = {

loop{

react{

case "start" => println("starting")

case "stop" => println("stopping")

}

}

}

}

object MyActor4{

def main(args: Array[String]): Unit = {

val actor = new MyActor4

actor.start()

actor ! "start"

actor ! "stop"

   

}

}

第五個例子

結合case class樣例類發送消息和接受消息

   

  1. 將消息封裝在一個樣例類中
  2. 通過匹配不同的樣例類去執行不同的操作
  3. Actor可以返回消息給發送方。通過sender方法向當前消息發送方返回消息

   

   

case class AsyncMessage(id:Int,message:String)

case class SyncMessage(id:Int,message:String)

case class ReplyMessage(id:Int,message:String)

class MyActor5 extends Actor{

override def act(): Unit = {

loop{

react{

case AsyncMessage(id,message) => {

println(s"$id,$message")

sender ! ReplyMessage(2,"異步有返回值的消息處理成功")

}

case SyncMessage(id,message) =>{

println(s"$id,$message")

sender ! ReplyMessage(id,"我是同步消息的返回值,等到我返回之后才能繼續下一步的處理")

}

}

}

}

}

   

object MyActor5{

   

def main(args: Array[String]): Unit = {

val actor: MyActor5 = new MyActor5

actor.start()

actor ! AsyncMessage(1,"helloworld")

val asyncMessage: Future[Any] = actor !! AsyncMessage(2,"actorSend")

val apply: Any = asyncMessage.apply()

println(apply)

println("helloworld22222")

//同步阻塞消息

val syncMessage: Any = actor !? SyncMessage(3,"我是同步阻塞消息")

println(syncMessage)

}

}

第六個例子

需求:

用actor並發編程寫一個單機版的WordCount,將多個文件作為輸入,計算完成后將多個任務匯總,得到最終的結果。

   

大致的思想步驟:

  1. 通過loop +react 方式去不斷的接受消息
  2. 利用case class樣例類去匹配對應的操作
  3. 其中scala中提供了文件讀取的接口Source,通過調用其fromFile方法去獲取文件內容
  4. 將每個文件的單詞數量進行局部匯總,存放在一個ListBuffer中
  5. 最后將ListBuffer中的結果進行全局匯總。

    

import scala.actors.{Actor, Future}

import scala.collection.mutable

import scala.collection.mutable.ListBuffer

import scala.io.{BufferedSource, Source}

   

case class FileName(path: String)

case class ResultTask(mapWithWord: Map[String, Int])

   

class WordCount extends Actor {

override def act(): Unit = {

loop {

react {

//使用loop + react的方式接受我們的數據

case FileName(path: String) => {

//使用Source來讀取文件內容

val file: BufferedSource = Source.fromFile(path)

//獲取文件所有內容

val fileContent: String = file.mkString

// println(fileContent)

//對文件內容進行切分

val split: Array[String] = fileContent.split("\r\n")

// println(split.toBuffer)

//對每一行進行按照空格進行切分

// val map: Array[Array[String]] = split.map(x => x.split(" "))

//切分之后,將數據進行壓平

// val flatten: Array[String] = map.flatten

val flatten: Array[String] = split.flatMap(x => x.split(" "))

val map1: Array[(String, Int)] = flatten.map(x => (x, 1))

// println(map1.toBuffer)

val byKey: Map[String, Array[(String, Int)]] = map1.groupBy(x => x._1)

val values: Map[String, Int] = byKey.mapValues(x => x.length)

sender ! ResultTask(values)

}

}

}

}

}

   

object WordCount {

def main(args: Array[String]): Unit = {

//申明一個變量,存放我們的結果數據

val resultTasks = new ListBuffer[ResultTask]

//申明一個set集合用於存放我們異步發送的返回消息值

val futureSet: mutable.HashSet[Future[Any]] = new mutable.HashSet[Future[Any]]()

//定義我們需要統計的數據文件路徑

val files: Array[String] = Array("F:\\ wordCount\\1.txt", "F:\\wordCount\\2.txt", "F: \\wordCount\\3.txt")

//循環遍歷我們的數據文件,然后進行發送

val count: WordCount = new WordCount

count.start();

for (f <- files) {





val value: Future[Any] = count !! FileName(f)

futureSet.add(value)

}

while (futureSet.size > 0) {

//過濾我們的set集合,只取那些有值的set集合

val completeFuture: mutable.HashSet[Future[Any]] = futureSet.filter(x => x.isSet)

for (future <- completeFuture) {

// 調用apply方法,獲取到我們的future實例,實際上就是ResultTask

val futureApply: Any = future.apply()

//判斷我們的結果值如果是ResultTask類型的話,那么我們就添加到我們的ListBuffer當中去,表示已經獲取到了返回結果

resultTasks += futureApply.asInstanceOf[ResultTask]

//添加完ListBuffer之后,將set集合當中的元素減少,以便於退出while循環

futureSet -= future

}

}

println(resultTasks)

val flatten: ListBuffer[(String, Int)] = resultTasks.map(x => x.mapWithWord).flatten

val by: Map[String, ListBuffer[(String, Int)]] = flatten.groupBy( x => x._1)

println(by)

//第一個下划線表示我們累加之后的結果

// 第二個下划線表示我們集合當中每一個元組

// _2 表示元組當中第二個元素

val values: Map[String, Int] = by.mapValues(x => x.foldLeft(0)( _ + _._2))

for((k,v) <- values){

println(k+"====>"+v)

}

}

}

 

 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM