scala支持Java的多線程模型, 也繼承了多線程固有的資源競爭和死鎖問題.
作為一種函數式編程語言, scala的actor消息模型提供了一種更便捷更安全的並發編程方案.
線程模型
scala的線程模型來自於Java. 首先我們要拓展一個Runable或Callable, 並重寫run方法
trait Runnable {
def run(): Unit
}
Callable與Runable類似,但是有一個返回值:
trait Callable[V] {
def call(): V
}
Thread需要一個Runable實例作為參數來創建:
scala> val thread = new Thread(new Runnable {
| def run() {
| println("hello world")
| }
| })
thread: Thread = Thread[Thread-2,5,main]
scala> thread.start()
hello world
線程同步
synchronized是JVM中最簡單的使用互斥鎖的方式:
class User {
var name: String = "";
def setName(nameArg :String) {
this.synchronized {
this.name = nameArg;
}
}
}
當線程開始執行obj.synchronized塊中的代碼前, 它將嘗試獲得對象obj的鎖, 若獲取失敗則線程進入阻塞狀態.
當某個線程獲得了對象的鎖后, 其它線程就無法訪問或修改該對象. 當obj.synchronized塊中的代碼執行完成時, 線程會解除鎖, 另一個線程就可以加鎖並訪問對象了.
Future模型
scala提供了Promise-Future-Callback異步模型:
-
Future 表示一個還沒有完成的任務的結果, Future對象可以在任務完成前訪問
-
Promise 表示一個還沒有執行的任務, 可以通過Promise標記任務的狀態
-
Callback 回調用於在任務完成或其它情況下執行的操作
Future
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureDemo extends App {
val f = Future {
println("working on future task")
Thread.sleep(100)
1+1
}
println("waiting for future task complete")
val result = Await.result(f, 1 second)
println(result)
}
執行異步任務需要上下文, ExecutionContext.Implicits.global是使用當前的全局上下文作為隱式上下文.
引入.duration._允許我們使用1 second, 200 milli, 2 minute這樣的時間間隔字面值.
上述示例中Await.result使用阻塞的方式等待Future任務完成, 若Future超時未完成則拋出TimeoutException異常.
多次運行上述示例就會發現, 兩條提示輸出順序是不確定的. 這是因為Future中的代碼是在獨立線程中執行的.
更好的方式是采用回調的方式來處理Future結果:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object FutureDemo2 extends App {
val f = Future {
1 + 2
}
f.onComplete{
case Success(value) => println(value)
case Failure(e) => e.printStackTrace
}
}
或者定義onSuccess和onFailure兩個回調.
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
object FutureDemo2 extends App {
val f = Future {
1 + 2
}
f.onSuccess {
case value => println(value)
}
f.onFailure {
case e => e.printStackTrace
}
}
Actor模型
Actor是一個基於消息機制的並發模型, 自Scala 2.11之后Akka Actor已成為Scala事實上的Actor標准.
akka不是scala的默認包, 這里我們使用SBT來管理外部包依賴. 關於sbt的使用可以參見作者的另一篇博文Scala構建工具SBT.
在build.sbt中添加下列代碼, 引入akka依賴.
scalaVersion := "2.12.1"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies +=
"com.typesafe.akka" %% "akka-actor" % "2.4.17"
更多關於引入akka的內容可以參見akka官網.
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class HelloActor extends Actor {
def receive() = {
case "hello" => println("Hi, I am an actor.");
case _ => println("?");
}
}
object Main extends App {
val system = ActorSystem("HelloSystem");
val helloActor = system.actorOf(Props[HelloActor], name = "helloactor");
helloActor ! "hello";
helloActor ! "bye";
system.shutdown();
}
自定義類繼承Actor並重寫receive方法處理不同類型的消息. 這里使用String類進行模式匹配, 使用case class進行模式匹配可以傳遞更多信息.
Actor需要ActorSystem的事件循環提供支持, 初始化一個ActorSystem后事件循環開始運行.最后必須執行system.shutdown();否則scala程序會一直運行下去.
!是用於發送消息的操作符, helloActor ! "hello";將消息"hello"發送給了helloActor.
receive方法的返回值類型是PartialFunction[Any, Unit]. 所有發送給Actor的消息都將被receive返回的偏函數處理.
偏函數的返回值類型為Unit, 也就是說處理消息時必須依賴副作用而不能有返回值; 偏函數的參數類型為Any, 也就是說所有消息在傳入的時候都會發生類型丟失.
非類型化的消息便於設計消息轉發, 負載均衡和代理Actor等機制, 且因為基於模式匹配的消息處理, 非類型化並不會產生問題.
基於事件循環的非阻塞機制已經被廣為使用, 這里簡單說明Actor與線程的問題.Actor並非與線程一一對應, 一個線程可以為多個Actor服務. ActorSystem會根據實際情況選擇線程數.
