Scala並發編程


scala支持Java的多線程模型, 也繼承了多線程固有的資源競爭和死鎖問題.

作為一種函數式編程語言, scala的actor消息模型提供了一種更便捷更安全的並發編程方案.

線程模型

scala的線程模型來自於Java. 首先我們要拓展一個Runable或Callable, 並重寫run方法

trait Runnable {
  def run(): Unit
}

Callable與Runable類似,但是有一個返回值:

trait Callable[V] {
  def call(): V
}

Thread需要一個Runable實例作為參數來創建:

scala> val thread = new Thread(new Runnable {
     |   def run() {
     |     println("hello world")
     |   }
     | })
thread: Thread = Thread[Thread-2,5,main]

scala> thread.start()
hello world

線程同步

synchronized是JVM中最簡單的使用互斥鎖的方式:

class User {
  var name: String = "";
  def setName(nameArg :String) {
    this.synchronized {
      this.name = nameArg;
    }
  }
}

當線程開始執行obj.synchronized塊中的代碼前, 它將嘗試獲得對象obj的鎖, 若獲取失敗則線程進入阻塞狀態.

當某個線程獲得了對象的鎖后, 其它線程就無法訪問或修改該對象. 當obj.synchronized塊中的代碼執行完成時, 線程會解除鎖, 另一個線程就可以加鎖並訪問對象了.

Future模型

scala提供了Promise-Future-Callback異步模型:

  • Future 表示一個還沒有完成的任務的結果, Future對象可以在任務完成前訪問

  • Promise 表示一個還沒有執行的任務, 可以通過Promise標記任務的狀態

  • Callback 回調用於在任務完成或其它情況下執行的操作

Future

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FutureDemo extends App {

  val f = Future {
    println("working on future task")
    Thread.sleep(100)
    1+1
  }
  
  println("waiting for future task complete")
  val result = Await.result(f, 1 second)
  println(result)
}

執行異步任務需要上下文, ExecutionContext.Implicits.global是使用當前的全局上下文作為隱式上下文.

引入.duration._允許我們使用1 second, 200 milli, 2 minute這樣的時間間隔字面值.

上述示例中Await.result使用阻塞的方式等待Future任務完成, 若Future超時未完成則拋出TimeoutException異常.

多次運行上述示例就會發現, 兩條提示輸出順序是不確定的. 這是因為Future中的代碼是在獨立線程中執行的.

更好的方式是采用回調的方式來處理Future結果:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object FutureDemo2 extends App {

  val f = Future {
    1 + 2
  }

  
  f.onComplete{
    case Success(value) => println(value)
    case Failure(e) => e.printStackTrace
  }

}

或者定義onSuccessonFailure兩個回調.

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global

object FutureDemo2 extends App {

  val f = Future {
    1 + 2
  }

  
  f.onSuccess {
    case value => println(value)
  }

  f.onFailure {
    case e => e.printStackTrace
  }

}

Actor模型

Actor是一個基於消息機制的並發模型, 自Scala 2.11之后Akka Actor已成為Scala事實上的Actor標准.

akka不是scala的默認包, 這里我們使用SBT來管理外部包依賴. 關於sbt的使用可以參見作者的另一篇博文Scala構建工具SBT.

build.sbt中添加下列代碼, 引入akka依賴.

scalaVersion := "2.12.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies +=
  "com.typesafe.akka" %% "akka-actor" % "2.4.17"

更多關於引入akka的內容可以參見akka官網.

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor {
  def receive() = {
    case "hello" => println("Hi, I am an actor.");
    case _       => println("?");
  }
}

object Main extends App {
  val system = ActorSystem("HelloSystem");
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor");
  helloActor ! "hello";
  helloActor ! "bye";
  system.shutdown();
}

自定義類繼承Actor並重寫receive方法處理不同類型的消息. 這里使用String類進行模式匹配, 使用case class進行模式匹配可以傳遞更多信息.

Actor需要ActorSystem的事件循環提供支持, 初始化一個ActorSystem后事件循環開始運行.最后必須執行system.shutdown();否則scala程序會一直運行下去.

!是用於發送消息的操作符, helloActor ! "hello";將消息"hello"發送給了helloActor.

receive方法的返回值類型是PartialFunction[Any, Unit]. 所有發送給Actor的消息都將被receive返回的偏函數處理.

偏函數的返回值類型為Unit, 也就是說處理消息時必須依賴副作用而不能有返回值; 偏函數的參數類型為Any, 也就是說所有消息在傳入的時候都會發生類型丟失.

非類型化的消息便於設計消息轉發, 負載均衡和代理Actor等機制, 且因為基於模式匹配的消息處理, 非類型化並不會產生問題.

基於事件循環的非阻塞機制已經被廣為使用, 這里簡單說明Actor與線程的問題.Actor並非與線程一一對應, 一個線程可以為多個Actor服務. ActorSystem會根據實際情況選擇線程數.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM