注:Scala Actor是scala 2.10.x版本及以前版本的Actor。
Scala在2.11.x版本中將Akka加入其中,作為其默認的Actor,老版本的Actor已經廢棄。
1、什么是Scala Actor
- Scala中的並發編程思想與Java中的並發編程思想完全不一樣,Scala中的Actor是一種不共享數據,依賴於消息傳遞的一種並發編程模式, 避免了死鎖、資源爭奪等情況。在具體實現的過程中,Scala中的Actor會不斷的循環自己的郵箱,並通過receive偏函數進行消息的模式匹配並進行相應的處理。
-
如果Actor A和 Actor B要相互溝通的話,首先A要給B傳遞一個消息,B會有一個收件箱,然后B會不斷的循環自己的收件箱, 若看見A發過來的消息,B就會解析A的消息並執行,處理完之后就有可能將處理的結果通過郵件的方式發送給A
概念
Scala中的Actor能夠實現並行編程的強大功能,它是基於事件模型的並發機制,Scala是運用消息的發送、接收來實現高並發的。
Actor可以看作是一個個獨立的實體,他們之間是毫無關聯的。但是,他們可以通過消息來通信。一個Actor收到其他Actor的信息后,它可以根據需要作出各種相應。消息的類型可以是任意的,消息的內容也可以是任意的。
java並發編程與Scala Actor編程的區別
對於Java,我們都知道它的多線程實現需要對共享資源(變量、對象等)使用synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。而且,常常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。原因就在於Java中多數使用的是可變狀態的對象資源,對這些資源進行共享來實現多線程編程的話,控制好資源競爭與防止對象狀態被意外修改是非常重要的,而對象狀態的不變性也是較難以保證的。
與Java的基於共享數據和鎖的線程模型不同,Scala的actor包則提供了另外一種不共享任何數據、依賴消息傳遞的模型,從而進行並發編程。
Actor的執行順序
1、首先調用start()方法啟動Actor
2、調用start()方法后其act()方法會被執行
3、向Actor發送消息
4、act方法執行完成之后,程序會調用exit方法
發送消息的方式
! |
發送異步消息,沒有返回值。 |
!? |
發送同步消息,等待返回值。 |
!! |
發送異步消息,返回值是 Future[Any]。 |
注意:Future 表示一個異步操作的結果狀態,可能還沒有實際完成的異步任務的結果。
Any 是所有類的超類,Future[Any]的泛型是異步操作結果的類型。
2、Actor實戰
第一個例子
怎么實現actor並發編程:
1、定義一個class或者是object繼承Actor特質,注意導包import scala.actors.Actor
2、重寫對應的act方法
3、調用Actor的start方法執行Actor
4、當act方法執行完成,整個程序運行結束
import scala.actors.Actor
class Actor1 extends Actor{ override def act(): Unit = { for(i <- 1 to 10){ println("actor1====="+i) } } } object Actor2 extends Actor{ override def act(): Unit = { for(j <- 1 to 10){ println("actor2====="+j) } } } object Actor1{ def main(args: Array[String]): Unit = { val actor = new Actor1 actor.act() Actor2.act() } } |
說明:上面分別調用了兩個單例對象的start()方法,他們的act()方法會被執行,相同與在java中開啟了兩個線程,線程的run()方法會被執行
注意:這兩個Actor是並行執行的,act()方法中的for循環執行完成后actor程序就退出
第二個例子
怎么實現actor發送、接受消息
1、定義一個class或者是object繼承Actor特質,注意導包import scala.actors.Actor
2、重寫對應的act方法
3、調用Actor的start方法執行Actor
4、通過不同發送消息的方式對actor發送消息
5、act方法中通過receive方法接受消息並進行相應的處理
6、act方法執行完成之后,程序退出
import scala.actors.Actor class MyActor2 extends Actor{ override def act(): Unit = { receive{ case "start" => println("starting......") // case _ => println("我沒有匹配到任何消息") } } } object MyActor2{ def main(args: Array[String]): Unit = { val actor = new MyActor2 actor.start() actor ! "start" } } |
第三個例子
怎么實現actor可以不斷地接受消息:
在act方法中可以使用while(true)的方式,不斷的接受消息。
class MyActor3 extends Actor{ override def act(): Unit = { while (true){ receive{ case "start" => println("starting") case "stop" =>println("stopping") } } } } object MyActor3{ def main(args: Array[String]): Unit = { val actor = new MyActor3 actor.start() actor ! "start" actor ! "stop" } } |
說明:在act()方法中加入了while (true) 循環,就可以不停的接收消息
注意:發送start消息和stop的消息是異步的,但是Actor接收到消息執行的過程是同步的按順序執行
第四個例子
使用react方法代替receive方法去接受消息
好處:react方式會復用線程,避免頻繁的線程創建、銷毀和切換。比receive更高效
注意: react 如果要反復執行消息處理,react外層要用loop,不能用while
class MyActor4 extends Actor{ override def act(): Unit = { loop{ react{ case "start" => println("starting") case "stop" => println("stopping") } } } } object MyActor4{ def main(args: Array[String]): Unit = { val actor = new MyActor4 actor.start() actor ! "start" actor ! "stop"
} } |
第五個例子
結合case class樣例類發送消息和接受消息
- 將消息封裝在一個樣例類中
- 通過匹配不同的樣例類去執行不同的操作
- Actor可以返回消息給發送方。通過sender方法向當前消息發送方返回消息
case class AsyncMessage(id:Int,message:String) case class SyncMessage(id:Int,message:String) case class ReplyMessage(id:Int,message:String) class MyActor5 extends Actor{ override def act(): Unit = { loop{ react{ case AsyncMessage(id,message) => { println(s"$id,$message") sender ! ReplyMessage(2,"異步有返回值的消息處理成功") } case SyncMessage(id,message) =>{ println(s"$id,$message") sender ! ReplyMessage(id,"我是同步消息的返回值,等到我返回之后才能繼續下一步的處理") } } } } }
object MyActor5{
def main(args: Array[String]): Unit = { val actor: MyActor5 = new MyActor5 actor.start() actor ! AsyncMessage(1,"helloworld") val asyncMessage: Future[Any] = actor !! AsyncMessage(2,"actorSend") val apply: Any = asyncMessage.apply() println(apply) println("helloworld22222") //同步阻塞消息 val syncMessage: Any = actor !? SyncMessage(3,"我是同步阻塞消息") println(syncMessage) } } |
第六個例子
需求:
用actor並發編程寫一個單機版的WordCount,將多個文件作為輸入,計算完成后將多個任務匯總,得到最終的結果。
大致的思想步驟:
- 通過loop +react 方式去不斷的接受消息
- 利用case class樣例類去匹配對應的操作
- 其中scala中提供了文件讀取的接口Source,通過調用其fromFile方法去獲取文件內容
- 將每個文件的單詞數量進行局部匯總,存放在一個ListBuffer中
- 最后將ListBuffer中的結果進行全局匯總。
import scala.actors.{Actor, Future} import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.io.{BufferedSource, Source} case class FileName(path: String) case class ResultTask(mapWithWord: Map[String, Int]) class WordCount extends Actor { override def act(): Unit = { loop { react { //使用loop + react的方式接受我們的數據 case FileName(path: String) => { //使用Source來讀取文件內容 val file: BufferedSource = Source.fromFile(path) //獲取文件所有內容 val fileContent: String = file.mkString // println(fileContent) //對文件內容進行切分 val split: Array[String] = fileContent.split("\r\n") // println(split.toBuffer) //對每一行進行按照空格進行切分 // val map: Array[Array[String]] = split.map(x => x.split(" ")) //切分之后,將數據進行壓平 // val flatten: Array[String] = map.flatten val flatten: Array[String] = split.flatMap(x => x.split(" ")) val map1: Array[(String, Int)] = flatten.map(x => (x, 1)) // println(map1.toBuffer) val byKey: Map[String, Array[(String, Int)]] = map1.groupBy(x => x._1) val values: Map[String, Int] = byKey.mapValues(x => x.length) sender ! ResultTask(values) } } } } } object WordCount { def main(args: Array[String]): Unit = { //申明一個變量,存放我們的結果數據 val resultTasks = new ListBuffer[ResultTask] //申明一個set集合用於存放我們異步發送的返回消息值 val futureSet: mutable.HashSet[Future[Any]] = new mutable.HashSet[Future[Any]]() //定義我們需要統計的數據文件路徑 val files: Array[String] = Array("F:\\ wordCount\\1.txt", "F:\\wordCount\\2.txt", "F: \\wordCount\\3.txt") //循環遍歷我們的數據文件,然后進行發送 val count: WordCount = new WordCount count.start(); for (f <- files) { val value: Future[Any] = count !! FileName(f) futureSet.add(value) } while (futureSet.size > 0) { //過濾我們的set集合,只取那些有值的set集合 val completeFuture: mutable.HashSet[Future[Any]] = futureSet.filter(x => x.isSet) for (future <- completeFuture) { // 調用apply方法,獲取到我們的future實例,實際上就是ResultTask val futureApply: Any = future.apply() //判斷我們的結果值如果是ResultTask類型的話,那么我們就添加到我們的ListBuffer當中去,表示已經獲取到了返回結果 resultTasks += futureApply.asInstanceOf[ResultTask] //添加完ListBuffer之后,將set集合當中的元素減少,以便於退出while循環 futureSet -= future } } println(resultTasks) val flatten: ListBuffer[(String, Int)] = resultTasks.map(x => x.mapWithWord).flatten val by: Map[String, ListBuffer[(String, Int)]] = flatten.groupBy( x => x._1) println(by) //第一個下划線表示我們累加之后的結果 // 第二個下划線表示我們集合當中每一個元組 // _2 表示元組當中第二個元素 val values: Map[String, Int] = by.mapValues(x => x.foldLeft(0)( _ + _._2)) for((k,v) <- values){ println(k+"====>"+v) } } }