Runnable/Callable
Runnable只有一個沒有返回值的方法
1
2
3
|
trait Runnable {
def run(): Unit
}
|
Callable的方法和run類似,只不過它有一個返回值
1
2
3
|
trait Callable[V] {
def call(): V
}
|
線程
Scala的並發是建立在Java的並發模型上的。
在Sun的JVM上,對於一個IO密集型的任務,我們可以在單機上運行成千上萬的線程。
Thread是通過Runnable構造的。要運行一個Runnable的run方法,你需要調用對應線程的start
方法。
1
2
3
4
5
6
7
8
9
|
scala> val hello =
new
Thread(
new
Runnable {
def run() {
println(
"hello world"
)
}
})
hello: java.lang.Thread = Thread[Thread-
3
,
5
,main]
scala> hello.start
hello world
|
當你看見一個實現Runnable的類,你應該明白它會被放到一個線程里去執行的。
一段單線程的代碼
下面是一段代碼片段,它可以運行,但是會有問題。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import
java.net.{Socket, ServerSocket}
import
java.util.concurrent.{Executors, ExecutorService}
import
java.util.Date
class
NetworkService(port: Int, poolSize: Int)
extends
Runnable {
val serverSocket =
new
ServerSocket(port)
def run() {
while
(
true
) {
// 這里會阻塞直到有連接進來
val socket = serverSocket.accept()
(
new
Handler(socket)).run()
}
}
}
class
Handler(socket: Socket)
extends
Runnable {
def message = (Thread.currentThread.getName() +
"\n"
).getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
(
new
NetworkService(
2020
,
2
)).run
|
每個請求都會把當前線程的名稱main
作為響應。
這段代碼最大的問題在於一次只能夠響應一個請求!
你可以對每個請求都單獨用一個線程來響應。只需要把
1
|
(
new
Handler(socket)).run()
|
改成
1
|
(
new
Thread(
new
Handler(socket))).start()
|
但是如果你想要復用線程或者對於線程的行為要做一些其他的控制呢?
Executors
隨着Java 5的發布,對於線程的管理需要一個更加抽象的接口。
你可以通過Executors
對象的靜態方法來取得一個ExecutorService
對象。這些方法可以讓你使用各種不同的策略來配置一個ExecutorService
,例如線程池。
下面是我們之前的阻塞式網絡服務器,現在改寫成可以支持並發請求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import
java.net.{Socket, ServerSocket}
import
java.util.concurrent.{Executors, ExecutorService}
import
java.util.Date
class
NetworkService(port: Int, poolSize: Int)
extends
Runnable {
val serverSocket =
new
ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
def run() {
try
{
while
(
true
) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute(
new
Handler(socket))
}
}
finally
{
pool.shutdown()
}
}
}
class
Handler(socket: Socket)
extends
Runnable {
def message = (Thread.currentThread.getName() +
"\n"
).getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
(
new
NetworkService(
2020
,
2
)).run
|
從下面的示例中,我們可以大致了解內部的線程是怎么進行復用的。
1
2
3
4
5
6
7
8
9
10
11
|
$ nc localhost
2020
pool-
1
-thread-
1
$ nc localhost
2020
pool-
1
-thread-
2
$ nc localhost
2020
pool-
1
-thread-
1
$ nc localhost
2020
pool-
1
-thread-
2
|
Futures
一個Future
代表一次異步計算的操作。你可以把你的操作包裝在一個Future里,當你需要結果的時候,你只需要簡單調用一個阻塞的get()
方法就好了。一個Executor
返回一個Future
。如果你使用Finagle RPC的話,你可以使用Future
的實例來保存還沒有到達的結果。
FutureTask
是一個可運行的任務,並且被設計成由Executor
進行運行。
1
2
3
4
5
|
val future =
new
FutureTask[String](
new
Callable[String]() {
def call(): String = {
searcher.search(target);
}})
executor.execute(future)
|
現在我需要結果,那就只能阻塞到直到結果返回。
1
|
val blockingResult = future.get()
|
參考 Scala School中關於Finagle的章節有大量使用Future
的示例,也有一些組合使用的例子。Effective Scala中也有關於Futures的內容。
線程安全問題
1
2
3
4
5
|
class
Person(var name: String) {
def set(changedName: String) {
name = changedName
}
}
|
這個程序在多線程的環境下是不安全的。如果兩個線程都有同一個Person示例的引用,並且都調用set
方法,你沒法預料在兩個調用都結束的時候name
會是什么。
在Java的內存模型里,每個處理器都允許在它的L1或者L2 cache里緩存變量,所以兩個在不同處理器上運行的線程對於相同的數據有種不同的視圖。
下面我們來討論一下可以強制線程的數據視圖保持一致的工具。
三個工具
同步
互斥量(Mutex)提供了鎖定資源的語法。當你進入一個互斥量的時候,你會獲得它。在JVM里使用互斥量最常用的方式就是在一個對象上進行同步訪問。在這里,我們會在Person上進行同步訪問。
在JVM里,你可以對任何非null的對象進行同步訪問。
1
2
3
4
5
6
7
|
class
Person(var name: String) {
def set(changedName: String) {
this
.
synchronized
{
name = changedName
}
}
}
|
volatile
隨着Java 5對於內存模型的改變,volatile和synchronized的作用基本相同,除了一點,volatile也可以用在null上。
synchronized
提供了更加細粒度的加鎖控制。而volatile
直接是對每次訪問進行控制。
1
2
3
4
5
|
class
Person(
@volatile
var name: String) {
def set(changedName: String) {
name = changedName
}
}
|
AtomaticReference
同樣的,在Java 5中新增了一系列底層的並發原語。AtomicReference
類就是其中一個。
1
2
3
4
5
6
7
|
import
java.util.concurrent.atomic.AtomicReference
class
Person(val name: AtomicReference[String]) {
def set(changedName: String) {
name.set(changedName)
}
}
|
它們都有額外的消耗嗎?
AutomicReference
是這兩種方式中最耗性能的,因為如果你要取得對應的值,則需要經過方法分派(method dispatch)的過程。
volatile
和synchronized
都是通過Java內置的monitor來實現的。在沒有競爭的情況下,monitor對性能的影響非常小。由於synchronized
允許你對代碼進行更加細粒度的加鎖控制,這樣就可以減小加鎖區,進而減小競爭,因此synchronized
應該是最佳的選擇。
當你進入同步塊,訪問volatile引用,或者引用AtomicReference,Java會強制要求處理器刷新它們的緩存流水線,從而保證數據的一致性。
如果我這里說錯了,請指正出來。這是一個很復雜的主題,對於這個主題肯定需要花費大量的時間來進行討論。
其他來自Java 5的優秀工具
之前提到了AtomicReference
,除了它之外,Java 5還提供了很多其他有用的工具。
CountDownLatch
CountDownLatch
是供多個進程進行通信的一個簡單機制。
1
2
3
4
5
6
|
val doneSignal =
new
CountDownLatch(
2
)
doAsyncWork(
1
)
doAsyncWork(
2
)
doneSignal.await()
println(
"both workers finished!"
)
|
除此之外,它對於單元測試也是很有用的。假設你在做一些異步的工作,並且你想要保證所有的功能都完成了。你只需要讓你的函數都對latch進行countDown
操作,然后在你的測試代碼里進行await
。
AtomicInteger/Long
由於對於Int和Long的自增操作比較常見,所以就增加了AtomicInteger
和AtomicLong
。
AtomicBoolean
我想我沒有必要來解釋這個的作用了。
讀寫鎖(ReadWriteLock)
ReadWriteLock
可以實現讀寫鎖,讀操作只會在寫者加鎖的時候進行阻塞。
我們來構建一個非線程安全的搜索引擎
這是一個簡單的非線程安全的倒排索引。我們這個反向排索引把名字的一部分映射到指定的用戶。
下面是原生的假設只有單線程訪問的寫法。
注意這里的使用mutable.HashMap
的另一個構造函數this()
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import
scala.collection.mutable
case
class
User(name: String, id: Int)
class
InvertedIndex(val userMap: mutable.Map[String, User]) {
def
this
() =
this
(
new
mutable.HashMap[String, User])
def tokenizeName(name: String): Seq[String] = {
name.split(
" "
).map(_.toLowerCase)
}
def add(term: String, user: User) {
userMap += term -> user
}
def add(user: User) {
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
}
|
我把具體怎么根據索引獲取用戶的方法暫時省略掉了,我們后面會來進行補充。
我們來讓它變得安全
在上面的倒排索引的示例里,userMap是沒法保證線程安全的。多個客戶端可以同時嘗試去添加元素,這樣會產生和之前Person
示例里相似的問題。
因為userMap本身不是線程安全的,那么我們怎么能夠保證每次只有一個線程對它進行修改呢?
你需要在添加元素的時候給userMap加鎖。
1
2
3
4
5
6
7
|
def add(user: User) {
userMap.
synchronized
{
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
}
|
不幸的是,上面的做法有點太粗糙了。能在互斥量(mutex)外面做的工作盡量都放在外面做。記住我之前說過,如果沒有競爭的話,加鎖的代價是非常小的。如果你在臨界區盡量少做操作,那么競爭就會非常少。
1
2
3
4
5
6
7
8
9
10
11
|
def add(user: User) {
// tokenizeName was measured to be the most expensive operation.
// tokenizeName 這個操作是最耗時的。
val tokens = tokenizeName(user.name)
tokens.foreach { term =>
userMap.
synchronized
{
add(term, user)
}
}
}
|
SynchronizedMap
我們可以通過使用SynchronizedMap trait來使得一個可變的(mutable)HashMap具有同步機制。
我們可以擴展之前的InvertedIndex,給用戶提供一種構建同步索引的簡單方法。
1
2
3
4
5
|
import
scala.collection.mutable.SynchronizedMap
class
SynchronizedInvertedIndex(userMap: mutable.Map[String, User])
extends
InvertedIndex(userMap) {
def
this
() =
this
(
new
mutable.HashMap[String, User] with SynchronizedMap[String, User])
}
|
如果你去看具體的實現的話,你會發現SynchronizedMap只是在每個方法上都加上了同步訪問,因此它的安全是以犧牲性能為代價的。
Java ConcurrentHashMap
Java里有一個很不錯的線程安全的ConcurrentHashMap。幸運的是,JavaConverter可以使得我們通過Scala的語法來使用它。
實際上,我們可以無縫地把我們新的,線程安全的InvertedIndex作為老的非線程安全的一個擴展。
1
2
3
4
5
6
7
8
|
import
java.util.concurrent.ConcurrentHashMap
import
scala.collection.JavaConverters._
class
ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
extends
InvertedIndex(userMap) {
def
this
() =
this
(
new
ConcurrentHashMap[String, User] asScala)
}
|
現在來加載我們的InvertedIndex
最原始的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
trait UserMaker {
def makeUser(line: String) = line.split(
","
) match {
case
Array(name, userid) => User(name, userid.trim().toInt)
}
}
class
FileRecordProducer(path: String)
extends
UserMaker {
def run() {
Source.fromFile(path,
"utf-8"
).getLines.foreach { line =>
index.add(makeUser(line))
}
}
}
|
對於文件里的每一行字符串,我們通過調用makeUser
來生成一個User,然后通過add
添加到InvertedIndex里。如果我們並發訪問一個InvertedIndex,我們可以並行調用add方法,因為makeUser方法沒有副作用,它本身就是線程安全的。
我們不能並行讀取一個文件,但是我們可以並行構造User,並且並行將它添加到索引里。
解決方案:生產者/消費者
實現非同步計算的,通常采用的方法就是將生產者同消費者分開,並讓它們通過隊列(queue)
來進行通信。讓我們用下面的例子來說明我們是怎么實現搜索引擎的索引的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import
java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
// Concrete producer
class
Producer[T](path: String, queue: BlockingQueue[T])
extends
Runnable {
def run() {
Source.fromFile(path,
"utf-8"
).getLines.foreach { line =>
queue.put(line)
}
}
}
// 抽象的消費者
abstract
class
Consumer[T](queue: BlockingQueue[T])
extends
Runnable {
def run() {
while
(
true
) {
val item = queue.take()
consume(item)
}
}
def consume(x: T)
}
val queue =
new
LinkedBlockingQueue[String]()
//一個生產者線程
val producer =
new
Producer[String](
"users.txt"
, q)
new
Thread(producer).start()
trait UserMaker {
def makeUser(line: String) = line.split(
","
) match {
case
Array(name, userid) => User(name, userid.trim().toInt)
}
}
class
IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String])
extends
Consumer[String](queue) with UserMaker {
def consume(t: String) = index.add(makeUser(t))
}
// 假設我們的機器有8個核
val cores =
8
val pool = Executors.newFixedThreadPool(cores)
// 每個核設置一個消費者
for
(i <- i to cores) {
pool.submit(
new
IndexerConsumer[String](index, q))
}
|
原文鏈接: Scala School 翻譯: ImportNew.com - 朱偉傑
譯文鏈接: http://www.importnew.com/4750.html