scala中java並發編程


 

 

Runnable/Callable

Runnable只有一個沒有返回值的方法

1
2
3
trait Runnable {
   def run(): Unit
}

Callable的方法和run類似,只不過它有一個返回值

1
2
3
trait Callable[V] {
   def call(): V
}

 

線程

Scala的並發是建立在Java的並發模型上的。

在Sun的JVM上,對於一個IO密集型的任務,我們可以在單機上運行成千上萬的線程。

Thread是通過Runnable構造的。要運行一個Runnable的run方法,你需要調用對應線程的start方法。

1
2
3
4
5
6
7
8
9
scala> val hello = new Thread( new Runnable {
   def run() {
     println( "hello world" )
   }
})
hello: java.lang.Thread = Thread[Thread- 3 , 5 ,main]
 
scala> hello.start
hello world

當你看見一個實現Runnable的類,你應該明白它會被放到一個線程里去執行的。

 

一段單線程的代碼

下面是一段代碼片段,它可以運行,但是會有問題。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
   val serverSocket = new ServerSocket(port)
 
   def run() {
     while ( true ) {
       // 這里會阻塞直到有連接進來
       val socket = serverSocket.accept()
       ( new Handler(socket)).run()
     }
   }
}
 
class Handler(socket: Socket) extends Runnable {
   def message = (Thread.currentThread.getName() + "\n" ).getBytes
 
   def run() {
     socket.getOutputStream.write(message)
     socket.getOutputStream.close()
   }
}
 
( new NetworkService( 2020 , 2 )).run

每個請求都會把當前線程的名稱main作為響應。

這段代碼最大的問題在於一次只能夠響應一個請求!

你可以對每個請求都單獨用一個線程來響應。只需要把

1
( new Handler(socket)).run()

改成

1
( new Thread( new Handler(socket))).start()

但是如果你想要復用線程或者對於線程的行為要做一些其他的控制呢?

 

Executors

隨着Java 5的發布,對於線程的管理需要一個更加抽象的接口。

你可以通過Executors對象的靜態方法來取得一個ExecutorService對象。這些方法可以讓你使用各種不同的策略來配置一個ExecutorService,例如線程池。

下面是我們之前的阻塞式網絡服務器,現在改寫成可以支持並發請求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
   val serverSocket = new ServerSocket(port)
   val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
 
   def run() {
     try {
       while ( true ) {
         // This will block until a connection comes in.
         val socket = serverSocket.accept()
         pool.execute( new Handler(socket))
       }
     } finally {
       pool.shutdown()
     }
   }
}
 
class Handler(socket: Socket) extends Runnable {
   def message = (Thread.currentThread.getName() + "\n" ).getBytes
 
   def run() {
     socket.getOutputStream.write(message)
     socket.getOutputStream.close()
   }
}
 
( new NetworkService( 2020 , 2 )).run

從下面的示例中,我們可以大致了解內部的線程是怎么進行復用的。

1
2
3
4
5
6
7
8
9
10
11
$ nc localhost 2020
pool- 1 -thread- 1
 
$ nc localhost 2020
pool- 1 -thread- 2
 
$ nc localhost 2020
pool- 1 -thread- 1
 
$ nc localhost 2020
pool- 1 -thread- 2

 

Futures

一個Future代表一次異步計算的操作。你可以把你的操作包裝在一個Future里,當你需要結果的時候,你只需要簡單調用一個阻塞的get()方法就好了。一個Executor返回一個Future。如果你使用Finagle RPC的話,你可以使用Future的實例來保存還沒有到達的結果。

FutureTask是一個可運行的任務,並且被設計成由Executor進行運行。

1
2
3
4
5
val future = new FutureTask[String]( new Callable[String]() {
   def call(): String = {
     searcher.search(target);
}})
executor.execute(future)

現在我需要結果,那就只能阻塞到直到結果返回。

1
val blockingResult = future.get()

參考 Scala School中關於Finagle的章節有大量使用Future的示例,也有一些組合使用的例子。Effective Scala中也有關於Futures的內容。

 

線程安全問題

1
2
3
4
5
class Person(var name: String) {
   def set(changedName: String) {
     name = changedName
   }
}

這個程序在多線程的環境下是不安全的。如果兩個線程都有同一個Person示例的引用,並且都調用set方法,你沒法預料在兩個調用都結束的時候name會是什么。

在Java的內存模型里,每個處理器都允許在它的L1或者L2 cache里緩存變量,所以兩個在不同處理器上運行的線程對於相同的數據有種不同的視圖。

下面我們來討論一下可以強制線程的數據視圖保持一致的工具。

 

三個工具

同步

互斥量(Mutex)提供了鎖定資源的語法。當你進入一個互斥量的時候,你會獲得它。在JVM里使用互斥量最常用的方式就是在一個對象上進行同步訪問。在這里,我們會在Person上進行同步訪問。

在JVM里,你可以對任何非null的對象進行同步訪問。

1
2
3
4
5
6
7
class Person(var name: String) {
   def set(changedName: String) {
     this . synchronized {
       name = changedName
     }
   }
}

volatile

隨着Java 5對於內存模型的改變,volatile和synchronized的作用基本相同,除了一點,volatile也可以用在null上。

synchronized提供了更加細粒度的加鎖控制。而volatile直接是對每次訪問進行控制。

1
2
3
4
5
class Person( @volatile var name: String) {
   def set(changedName: String) {
     name = changedName
   }
}

AtomaticReference

同樣的,在Java 5中新增了一系列底層的並發原語。AtomicReference類就是其中一個。

1
2
3
4
5
6
7
import java.util.concurrent.atomic.AtomicReference
 
class Person(val name: AtomicReference[String]) {
   def set(changedName: String) {
     name.set(changedName)
   }
}

 

它們都有額外的消耗嗎?

AutomicReference是這兩種方式中最耗性能的,因為如果你要取得對應的值,則需要經過方法分派(method dispatch)的過程。

volatilesynchronized都是通過Java內置的monitor來實現的。在沒有競爭的情況下,monitor對性能的影響非常小。由於synchronized允許你對代碼進行更加細粒度的加鎖控制,這樣就可以減小加鎖區,進而減小競爭,因此synchronized應該是最佳的選擇。

當你進入同步塊,訪問volatile引用,或者引用AtomicReference,Java會強制要求處理器刷新它們的緩存流水線,從而保證數據的一致性。

如果我這里說錯了,請指正出來。這是一個很復雜的主題,對於這個主題肯定需要花費大量的時間來進行討論。

 

其他來自Java 5的優秀工具

之前提到了AtomicReference,除了它之外,Java 5還提供了很多其他有用的工具。

CountDownLatch

CountDownLatch是供多個進程進行通信的一個簡單機制。

1
2
3
4
5
6
val doneSignal = new CountDownLatch( 2 )
doAsyncWork( 1 )
doAsyncWork( 2 )
 
doneSignal.await()
println( "both workers finished!" )

除此之外,它對於單元測試也是很有用的。假設你在做一些異步的工作,並且你想要保證所有的功能都完成了。你只需要讓你的函數都對latch進行countDown操作,然后在你的測試代碼里進行await

AtomicInteger/Long

由於對於Int和Long的自增操作比較常見,所以就增加了AtomicIntegerAtomicLong

AtomicBoolean

我想我沒有必要來解釋這個的作用了。

讀寫鎖(ReadWriteLock)

ReadWriteLock可以實現讀寫鎖,讀操作只會在寫者加鎖的時候進行阻塞。

 

我們來構建一個非線程安全的搜索引擎

這是一個簡單的非線程安全的倒排索引。我們這個反向排索引把名字的一部分映射到指定的用戶。

下面是原生的假設只有單線程訪問的寫法。

注意這里的使用mutable.HashMap的另一個構造函數this()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import scala.collection.mutable
 
case class User(name: String, id: Int)
 
class InvertedIndex(val userMap: mutable.Map[String, User]) {
 
   def this () = this ( new mutable.HashMap[String, User])
 
   def tokenizeName(name: String): Seq[String] = {
     name.split( " " ).map(_.toLowerCase)
   }
 
   def add(term: String, user: User) {
     userMap += term -> user
   }
 
   def add(user: User) {
     tokenizeName(user.name).foreach { term =>
       add(term, user)
     }
   }
}

我把具體怎么根據索引獲取用戶的方法暫時省略掉了,我們后面會來進行補充。

 

我們來讓它變得安全

在上面的倒排索引的示例里,userMap是沒法保證線程安全的。多個客戶端可以同時嘗試去添加元素,這樣會產生和之前Person示例里相似的問題。

因為userMap本身不是線程安全的,那么我們怎么能夠保證每次只有一個線程對它進行修改呢?

你需要在添加元素的時候給userMap加鎖。

1
2
3
4
5
6
7
def add(user: User) {
   userMap. synchronized {
     tokenizeName(user.name).foreach { term =>
       add(term, user)
     }
   }
}

不幸的是,上面的做法有點太粗糙了。能在互斥量(mutex)外面做的工作盡量都放在外面做。記住我之前說過,如果沒有競爭的話,加鎖的代價是非常小的。如果你在臨界區盡量少做操作,那么競爭就會非常少。

1
2
3
4
5
6
7
8
9
10
11
def add(user: User) {
   // tokenizeName was measured to be the most expensive operation.
   // tokenizeName 這個操作是最耗時的。
   val tokens = tokenizeName(user.name)
 
   tokens.foreach { term =>
     userMap. synchronized {
       add(term, user)
     }
   }
}

 

SynchronizedMap

我們可以通過使用SynchronizedMap trait來使得一個可變的(mutable)HashMap具有同步機制。

我們可以擴展之前的InvertedIndex,給用戶提供一種構建同步索引的簡單方法。

1
2
3
4
5
import scala.collection.mutable.SynchronizedMap
 
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
   def this () = this ( new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你去看具體的實現的話,你會發現SynchronizedMap只是在每個方法上都加上了同步訪問,因此它的安全是以犧牲性能為代價的。

 

Java ConcurrentHashMap

Java里有一個很不錯的線程安全的ConcurrentHashMap。幸運的是,JavaConverter可以使得我們通過Scala的語法來使用它。

實際上,我們可以無縫地把我們新的,線程安全的InvertedIndex作為老的非線程安全的一個擴展。

1
2
3
4
5
6
7
8
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
 
class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
     extends InvertedIndex(userMap) {
 
   def this () = this ( new ConcurrentHashMap[String, User] asScala)
}

 

現在來加載我們的InvertedIndex

最原始的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
trait UserMaker {
   def makeUser(line: String) = line.split( "," ) match {
     case Array(name, userid) => User(name, userid.trim().toInt)
   }
}
 
class FileRecordProducer(path: String) extends UserMaker {
   def run() {
     Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
       index.add(makeUser(line))
     }
   }
}

對於文件里的每一行字符串,我們通過調用makeUser來生成一個User,然后通過add添加到InvertedIndex里。如果我們並發訪問一個InvertedIndex,我們可以並行調用add方法,因為makeUser方法沒有副作用,它本身就是線程安全的。

我們不能並行讀取一個文件,但是我們可以並行構造User,並且並行將它添加到索引里。

 

解決方案:生產者/消費者

實現非同步計算的,通常采用的方法就是將生產者同消費者分開,並讓它們通過隊列(queue)來進行通信。讓我們用下面的例子來說明我們是怎么實現搜索引擎的索引的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 
// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
   def run() {
     Source.fromFile(path, "utf-8" ).getLines.foreach { line =>
       queue.put(line)
     }
   }
}
 
// 抽象的消費者
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
   def run() {
     while ( true ) {
       val item = queue.take()
       consume(item)
     }
   }
 
   def consume(x: T)
}
 
val queue = new LinkedBlockingQueue[String]()
 
//一個生產者線程
 
val producer = new Producer[String]( "users.txt" , q)
new Thread(producer).start()
 
trait UserMaker {
   def makeUser(line: String) = line.split( "," ) match {
     case Array(name, userid) => User(name, userid.trim().toInt)
   }
}
 
class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
   def consume(t: String) = index.add(makeUser(t))
}
 
// 假設我們的機器有8個核
 
val cores = 8
val pool = Executors.newFixedThreadPool(cores)
 
// 每個核設置一個消費者
 
for (i <- i to cores) {
   pool.submit( new IndexerConsumer[String](index, q))
}

 

原文鏈接: Scala School 翻譯: ImportNew.com 朱偉傑
譯文鏈接: http://www.importnew.com/4750.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM