原文網址:http://blog.csdn.net/undoner/article/details/12849661
在這篇文章里,我們首先闡述什么是同步,不同步有什么問題,然后討論可以采取哪些措施控制同步,接下來我們會仿照回顧網絡通信時那樣,構建一個服務器端的“線程池”,JDK為我們提供了一個很大的concurrent工具包,最后我們會對里面的內容進行探索。
為什么要線程同步?
說到線程同步,大部分情況下, 我們是在針對“單對象多線程”的情況進行討論,一般會將其分成兩部分,一部分是關於“共享變量”,一部分關於“執行步驟”。
共享變量
當我們在線程對象(Runnable)中定義了全局變量,run方法會修改該變量時,如果有多個線程同時使用該線程對象,那么就會造成全局變量的值被同時修改,造成錯誤。我們來看下面的代碼:
1 class MyRunner implements Runnable
2 {
3 public int sum = 0;
4
5 public void run()
6 {
7 System.out.println(Thread.currentThread().getName() + " Start.");
8 for (int i = 1; i <= 100; i++)
9 {
10 sum += i;
11 }
12 try {
13 Thread.sleep(500);
14 } catch (InterruptedException e) {
15 e.printStackTrace();
16 }
17 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
18 System.out.println(Thread.currentThread().getName() + " End.");
19 }
20 }
21
22
23 private static void sharedVaribleTest() throws InterruptedException
24 {
25 MyRunner runner = new MyRunner();
26 Thread thread1 = new Thread(runner);
27 Thread thread2 = new Thread(runner);
28 thread1.setDaemon(true);
29 thread2.setDaemon(true);
30 thread1.start();
31 thread2.start();
32 thread1.join();
33 thread2.join();
34 }
這個示例中,線程用來計算1到100的和是多少,我們知道正確結果是5050(好像是高斯小時候玩過這個?),但是上述程序返回的結果是10100,原因是兩個線程同時對sum進行操作。
執行步驟
我們在多個線程運行時,可能需要某些操作合在一起作為“原子操作”,即在這些操作可以看做是“單線程”的,例如我們可能希望輸出結果的樣子是這樣的:
1 線程1:步驟1 2 線程1:步驟2 3 線程1:步驟3 4 線程2:步驟1 5 線程2:步驟2 6 線程2:步驟3
如果同步控制不好,出來的樣子可能是這樣的:
線程1:步驟1 線程2:步驟1 線程1:步驟2 線程2:步驟2 線程1:步驟3 線程2:步驟3
這里我們也給出一個示例代碼:
1 class MyNonSyncRunner implements Runnable
2 {
3 public void run() {
4 System.out.println(Thread.currentThread().getName() + " Start.");
5 for(int i = 1; i <= 5; i++)
6 {
7 System.out.println(Thread.currentThread().getName() + " Running step " + i);
8 try
9 {
10 Thread.sleep(50);
11 }
12 catch(InterruptedException ex)
13 {
14 ex.printStackTrace();
15 }
16 }
17 System.out.println(Thread.currentThread().getName() + " End.");
18 }
19 }
20
21
22 private static void syncTest() throws InterruptedException
23 {
24 MyNonSyncRunner runner = new MyNonSyncRunner();
25 Thread thread1 = new Thread(runner);
26 Thread thread2 = new Thread(runner);
27 thread1.setDaemon(true);
28 thread2.setDaemon(true);
29 thread1.start();
30 thread2.start();
31 thread1.join();
32 thread2.join();
33 }
如何控制線程同步
既然線程同步有上述問題,那么我們應該如何去解決呢?針對不同原因造成的同步問題,我們可以采取不同的策略。
控制共享變量
我們可以采取3種方式來控制共享變量。
將“單對象多線程”修改成“多對象多線程”
上文提及,同步問題一般發生在“單對象多線程”的場景中,那么最簡單的處理方式就是將運行模型修改成“多對象多線程”的樣子,針對上面示例中的同步問題,修改后的代碼如下:
1 private static void sharedVaribleTest2() throws InterruptedException
2 {
3 Thread thread1 = new Thread(new MyRunner());
4 Thread thread2 = new Thread(new MyRunner());
5 thread1.setDaemon(true);
6 thread2.setDaemon(true);
7 thread1.start();
8 thread2.start();
9 thread1.join();
10 thread2.join();
11 }
我們可以看到,上述代碼中兩個線程使用了兩個不同的Runnable實例,它們在運行過程中,就不會去訪問同一個全局變量。
將“全局變量”降級為“局部變量”
既然是共享變量造成的問題,那么我們可以將共享變量改為“不共享”,即將其修改為局部變量。這樣也可以解決問題,同樣針對上面的示例,這種解決方式的代碼如下:
1 class MyRunner2 implements Runnable
2 {
3 public void run()
4 {
5 System.out.println(Thread.currentThread().getName() + " Start.");
6 int sum = 0;
7 for (int i = 1; i <= 100; i++)
8 {
9 sum += i;
10 }
11 try {
12 Thread.sleep(500);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
17 System.out.println(Thread.currentThread().getName() + " End.");
18 }
19 }
20
21
22 private static void sharedVaribleTest3() throws InterruptedException
23 {
24 MyRunner2 runner = new MyRunner2();
25 Thread thread1 = new Thread(runner);
26 Thread thread2 = new Thread(runner);
27 thread1.setDaemon(true);
28 thread2.setDaemon(true);
29 thread1.start();
30 thread2.start();
31 thread1.join();
32 thread2.join();
33 }
我們可以看出,sum變量已經由全局變量變為run方法內部的局部變量了。
使用ThreadLocal機制
ThreadLocal是JDK引入的一種機制,它用於解決線程間共享變量,使用ThreadLocal聲明的變量,即使在線程中屬於全局變量,針對每個線程來講,這個變量也是獨立的。
我們可以用這種方式來改造上面的代碼,如下所示:
1 class MyRunner3 implements Runnable
2 {
3 public ThreadLocal<Integer> tl = new ThreadLocal<Integer>();
4
5 public void run()
6 {
7 System.out.println(Thread.currentThread().getName() + " Start.");
8 for (int i = 0; i <= 100; i++)
9 {
10 if (tl.get() == null)
11 {
12 tl.set(new Integer(0));
13 }
14 int sum = ((Integer)tl.get()).intValue();
15 sum+= i;
16 tl.set(new Integer(sum));
17 try {
18 Thread.sleep(10);
19 } catch (InterruptedException e) {
20 e.printStackTrace();
21 }
22 }
23
24 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
25 System.out.println(Thread.currentThread().getName() + " End.");
26 }
27 }
28
29
30 private static void sharedVaribleTest4() throws InterruptedException
31 {
32 MyRunner3 runner = new MyRunner3();
33 Thread thread1 = new Thread(runner);
34 Thread thread2 = new Thread(runner);
35 thread1.setDaemon(true);
36 thread2.setDaemon(true);
37 thread1.start();
38 thread2.start();
39 thread1.join();
40 thread2.join();
41 }
綜上三種方案,第一種方案會降低多線程執行的效率,因此,我們推薦使用第二種或者第三種方案。
控制執行步驟
說到執行步驟,我們可以使用synchronized關鍵字來解決它。
1 class MySyncRunner implements Runnable
2 {
3 public void run() {
4 synchronized(this)
5 {
6 System.out.println(Thread.currentThread().getName() + " Start.");
7 for(int i = 1; i <= 5; i++)
8 {
9 System.out.println(Thread.currentThread().getName() + " Running step " + i);
10 try
11 {
12 Thread.sleep(50);
13 }
14 catch(InterruptedException ex)
15 {
16 ex.printStackTrace();
17 }
18 }
19 System.out.println(Thread.currentThread().getName() + " End.");
20 }
21 }
22 }
23
24
25 private static void syncTest2() throws InterruptedException
26 {
27 MySyncRunner runner = new MySyncRunner();
28 Thread thread1 = new Thread(runner);
29 Thread thread2 = new Thread(runner);
30 thread1.setDaemon(true);
31 thread2.setDaemon(true);
32 thread1.start();
33 thread2.start();
34 thread1.join();
35 thread2.join();
36 }
在線程同步的話題上,synchronized是一個非常重要的關鍵字。它的原理和數據庫中事務鎖的原理類似。我們在使用過程中,應該盡量縮減synchronized覆蓋的范圍,原因有二:1)被它覆蓋的范圍是串行的,效率低;2)容易產生死鎖。我們來看下面的示例:
1 private static void syncTest3() throws InterruptedException
2 {
3 final List<Integer> list = new ArrayList<Integer>();
4
5 Thread thread1 = new Thread()
6 {
7 public void run()
8 {
9 System.out.println(Thread.currentThread().getName() + " Start.");
10 Random r = new Random(100);
11 synchronized(list)
12 {
13 for (int i = 0; i < 5; i++)
14 {
15 list.add(new Integer(r.nextInt()));
16 }
17 System.out.println("The size of list is " + list.size());
18 }
19 try
20 {
21 Thread.sleep(500);
22 }
23 catch(InterruptedException ex)
24 {
25 ex.printStackTrace();
26 }
27 System.out.println(Thread.currentThread().getName() + " End.");
28 }
29 };
30
31 Thread thread2 = new Thread()
32 {
33 public void run()
34 {
35 System.out.println(Thread.currentThread().getName() + " Start.");
36 Random r = new Random(100);
37 synchronized(list)
38 {
39 for (int i = 0; i < 5; i++)
40 {
41 list.add(new Integer(r.nextInt()));
42 }
43 System.out.println("The size of list is " + list.size());
44 }
45 try
46 {
47 Thread.sleep(500);
48 }
49 catch(InterruptedException ex)
50 {
51 ex.printStackTrace();
52 }
53 System.out.println(Thread.currentThread().getName() + " End.");
54 }
55 };
56
57 thread1.start();
58 thread2.start();
59 thread1.join();
60 thread2.join();
61 }
我們應該把需要同步的內容集中在一起,盡量不包含其他不相關的、消耗大量資源的操作,示例中線程休眠的操作顯然不應該包括在里面。
構造線程池
我們在Java回顧之網絡通信中,已經構建了一個Socket連接池,這里我們在此基礎上,構建一個線程池,完成基本的啟動、休眠、喚醒、停止操作。
基本思路還是以數組的形式保持一系列線程,通過Socket通信,客戶端向服務器端發送命令,當服務器端接收到命令后,根據收到的命令對線程數組中的線程進行操作。
Socket客戶端的代碼保持不變,依然采用構建Socket連接池時的代碼,我們主要針對服務器端進行改造。
首先,我們需要定義一個線程對象,它用來執行我們的業務操作,這里簡化起見,只讓線程進行休眠。
1 enum ThreadStatus
2 {
3 Initial,
4 Running,
5 Sleeping,
6 Stopped
7 }
8
9 enum ThreadTask
10 {
11 Start,
12 Stop,
13 Sleep,
14 Wakeup
15 }
16
17
18 class MyThread extends Thread
19 {
20 public ThreadStatus status = ThreadStatus.Initial;
21 public ThreadTask task;
22 public void run()
23 {
24 status = ThreadStatus.Running;
25 while(true)
26 {
27 try {
28 Thread.sleep(3000);
29 if (status == ThreadStatus.Sleeping)
30 {
31 System.out.println(Thread.currentThread().getName() + " 進入休眠狀態。");
32 this.wait();
33 }
34 } catch (InterruptedException e) {
35 System.out.println(Thread.currentThread().getName() + " 運行過程中出現錯誤。");
36 status = ThreadStatus.Stopped;
37 }
38 }
39 }
40 }
然后,我們需要定義一個線程管理器,它用來對線程池中的線程進行管理,代碼如下:
1 class MyThreadManager
2 {
3 public static void manageThread(MyThread[] threads, ThreadTask task)
4 {
5 for (int i = 0; i < threads.length; i++)
6 {
7 synchronized(threads[i])
8 {
9 manageThread(threads[i], task);
10 }
11 }
12 System.out.println(getThreadStatus(threads));
13 }
14
15 public static void manageThread(MyThread thread, ThreadTask task)
16 {
17 if (task == ThreadTask.Start)
18 {
19 if (thread.status == ThreadStatus.Running)
20 {
21 return;
22 }
23 if (thread.status == ThreadStatus.Stopped)
24 {
25 thread = new MyThread();
26 }
27 thread.status = ThreadStatus.Running;
28 thread.start();
29
30 }
31 else if (task == ThreadTask.Stop)
32 {
33 if (thread.status != ThreadStatus.Stopped)
34 {
35 thread.interrupt();
36 thread.status = ThreadStatus.Stopped;
37 }
38 }
39 else if (task == ThreadTask.Sleep)
40 {
41 thread.status = ThreadStatus.Sleeping;
42 }
43 else if (task == ThreadTask.Wakeup)
44 {
45 thread.notify();
46 thread.status = ThreadStatus.Running;
47 }
48 }
49
50 public static String getThreadStatus(MyThread[] threads)
51 {
52 StringBuffer sb = new StringBuffer();
53 for (int i = 0; i < threads.length; i++)
54 {
55 sb.append(threads[i].getName() + "的狀態:" + threads[i].status).append("\r\n");
56 }
57 return sb.toString();
58 }
59 }
最后,是我們的服務器端,它不斷接受客戶端的請求,每收到一個連接請求,服務器端會新開一個線程,來處理后續客戶端發來的各種操作指令。
1 public class MyThreadPool {
2
3 public static void main(String[] args) throws IOException
4 {
5 MyThreadPool pool = new MyThreadPool(5);
6 }
7
8 private int threadCount;
9 private MyThread[] threads = null;
10
11
12 public MyThreadPool(int count) throws IOException
13 {
14 this.threadCount = count;
15 threads = new MyThread[count];
16 for (int i = 0; i < threads.length; i++)
17 {
18 threads[i] = new MyThread();
19 threads[i].start();
20 }
21 Init();
22 }
23
24 private void Init() throws IOException
25 {
26 ServerSocket serverSocket = new ServerSocket(5678);
27 while(true)
28 {
29 final Socket socket = serverSocket.accept();
30 Thread thread = new Thread()
31 {
32 public void run()
33 {
34 try
35 {
36 System.out.println("檢測到一個新的Socket連接。");
37 BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
38 PrintStream ps = new PrintStream(socket.getOutputStream());
39 String line = null;
40 while((line = br.readLine()) != null)
41 {
42 System.out.println(line);
43 if (line.equals("Count"))
44 {
45 System.out.println("線程池中有5個線程");
46 }
47 else if (line.equals("Status"))
48 {
49 String status = MyThreadManager.getThreadStatus(threads);
50 System.out.println(status);
51 }
52 else if (line.equals("StartAll"))
53 {
54 MyThreadManager.manageThread(threads, ThreadTask.Start);
55 }
56 else if (line.equals("StopAll"))
57 {
58 MyThreadManager.manageThread(threads, ThreadTask.Stop);
59 }
60 else if (line.equals("SleepAll"))
61 {
62 MyThreadManager.manageThread(threads, ThreadTask.Sleep);
63 }
64 else if (line.equals("WakeupAll"))
65 {
66 MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
67 }
68 else if (line.equals("End"))
69 {
70 break;
71 }
72 else
73 {
74 System.out.println("Command:" + line);
75 }
76 ps.println("OK");
77 ps.flush();
78 }
79 }
80 catch(Exception ex)
81 {
82 ex.printStackTrace();
83 }
84 }
85 };
86 thread.start();
87 }
88 }
89 }
探索JDK中的concurrent工具包
為了簡化開發人員在進行多線程開發時的工作量,並減少程序中的bug,JDK提供了一套concurrent工具包,我們可以用它來方便的開發多線程程序。
線程池
我們在上面實現了一個非常“簡陋”的線程池,concurrent工具包中也提供了線程池,而且使用非常方便。
concurrent工具包中的線程池分為3類:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。
首先我們來定義一個Runnable的對象
1 class MyRunner implements Runnable
2 {
3 public void run() {
4 System.out.println(Thread.currentThread().getName() + "運行開始");
5 for(int i = 0; i < 1; i++)
6 {
7 try
8 {
9 System.out.println(Thread.currentThread().getName() + "正在運行");
10 Thread.sleep(200);
11 }
12 catch(Exception ex)
13 {
14 ex.printStackTrace();
15 }
16 }
17 System.out.println(Thread.currentThread().getName() + "運行結束");
18 }
19 }
可以看出,它的功能非常簡單,只是輸出了線程的執行過程。
ScheduledThreadPool
這和我們平時使用的ScheduledTask比較類似,或者說很像Timer,它可以使得一個線程在指定的一段時間內開始運行,並且在間隔另外一段時間后再次運行,直到線程池關閉。
示例代碼如下:
1 private static void scheduledThreadPoolTest()
2 {
3 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
4
5 MyRunner runner = new MyRunner();
6
7 final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
8 final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
9
10 scheduler.schedule(new Runnable()
11 {
12 public void run()
13 {
14 handler1.cancel(true);
15 handler2.cancel(true);
16 scheduler.shutdown();
17 }
18 }, 30, TimeUnit.SECONDS
19 );
20 }
FixedThreadPool
這是一個指定容量的線程池,即我們可以指定在同一時間,線程池中最多有多個線程在運行,超出的線程,需要等線程池中有空閑線程時,才能有機會運行。
來看下面的代碼:
1 private static void fixedThreadPoolTest()
2 {
3 ExecutorService exec = Executors.newFixedThreadPool(3);
4 for(int i = 0; i < 5; i++)
5 {
6 MyRunner runner = new MyRunner();
7 exec.execute(runner);
8 }
9 exec.shutdown();
10 }
注意它的輸出結果:
pool-1-thread-1運行開始 pool-1-thread-1正在運行 pool-1-thread-2運行開始 pool-1-thread-2正在運行 pool-1-thread-3運行開始 pool-1-thread-3正在運行 pool-1-thread-1運行結束 pool-1-thread-1運行開始 pool-1-thread-1正在運行 pool-1-thread-2運行結束 pool-1-thread-2運行開始 pool-1-thread-2正在運行 pool-1-thread-3運行結束 pool-1-thread-1運行結束 pool-1-thread-2運行結束
可以看到從始至終,最多有3個線程在同時運行。
CachedThreadPool
這是另外一種線程池,它不需要指定容量,只要有需要,它就會創建新的線程。
它的使用方式和FixedThreadPool非常像,來看下面的代碼:
1 private static void cachedThreadPoolTest()
2 {
3 ExecutorService exec = Executors.newCachedThreadPool();
4 for(int i = 0; i < 5; i++)
5 {
6 MyRunner runner = new MyRunner();
7 exec.execute(runner);
8 }
9 exec.shutdown();
10 }
它的執行結果如下:
pool-1-thread-1運行開始 pool-1-thread-1正在運行 pool-1-thread-2運行開始 pool-1-thread-2正在運行 pool-1-thread-3運行開始 pool-1-thread-3正在運行 pool-1-thread-4運行開始 pool-1-thread-4正在運行 pool-1-thread-5運行開始 pool-1-thread-5正在運行 pool-1-thread-1運行結束 pool-1-thread-2運行結束 pool-1-thread-3運行結束 pool-1-thread-4運行結束 pool-1-thread-5運行結束
可以看到,它創建了5個線程。
處理線程返回值
在有些情況下,我們需要使用線程的返回值,在上述的所有代碼中,線程這是執行了某些操作,沒有任何返回值。
如何做到這一點呢?我們可以使用JDK中的Callable<T>和CompletionService<T>,前者返回單個線程的結果,后者返回一組線程的結果。
返回單個線程的結果
還是直接看代碼吧:
1 private static void callableTest() throws InterruptedException, ExecutionException
2 {
3 ExecutorService exec = Executors.newFixedThreadPool(1);
4 Callable<String> call = new Callable<String>()
5 {
6 public String call()
7 {
8 return "Hello World.";
9 }
10 };
11 Future<String> result = exec.submit(call);
12 System.out.println("線程的返回值是" + result.get());
13 exec.shutdown();
14 }
執行結果如下:
線程的返回值是Hello World.
返回線程池中每個線程的結果
這里需要使用CompletionService<T>,代碼如下:
1 private static void completionServiceTest() throws InterruptedException, ExecutionException
2 {
3 ExecutorService exec = Executors.newFixedThreadPool(10);
4 CompletionService<String> service = new ExecutorCompletionService<String>(exec);
5 for (int i = 0; i < 10; i++)
6 {
7 Callable<String> call = new Callable<String>()
8 {
9 public String call() throws InterruptedException
10 {
11 return Thread.currentThread().getName();
12 }
13 };
14 service.submit(call);
15 }
16
17 Thread.sleep(1000);
18 for(int i = 0; i < 10; i++)
19 {
20 Future<String> result = service.take();
21 System.out.println("線程的返回值是" + result.get());
22 }
23 exec.shutdown();
24 }
執行結果如下:
線程的返回值是pool-2-thread-1 線程的返回值是pool-2-thread-2 線程的返回值是pool-2-thread-3 線程的返回值是pool-2-thread-5 線程的返回值是pool-2-thread-4 線程的返回值是pool-2-thread-6 線程的返回值是pool-2-thread-8 線程的返回值是pool-2-thread-7 線程的返回值是pool-2-thread-9 線程的返回值是pool-2-thread-10
實現生產者-消費者模型
對於生產者-消費者模型來說,我們應該都不會陌生,通常我們都會使用某種數據結構來實現它。在concurrent工具包中,我們可以使用BlockingQueue來實現生產者-消費者模型,如下:
1 public class BlockingQueueSample {
2
3 public static void main(String[] args)
4 {
5 blockingQueueTest();
6 }
7
8 private static void blockingQueueTest()
9 {
10 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
11 final int maxSleepTimeForSetter = 10;
12 final int maxSleepTimerForGetter = 10;
13
14 Runnable setter = new Runnable()
15 {
16 public void run()
17 {
18 Random r = new Random();
19 while(true)
20 {
21 int value = r.nextInt(100);
22 try
23 {
24 queue.put(new Integer(value));
25 System.out.println(Thread.currentThread().getName() + "---向隊列中插入值" + value);
26 Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
27 }
28 catch(Exception ex)
29 {
30 ex.printStackTrace();
31 }
32 }
33 }
34 };
35
36 Runnable getter = new Runnable()
37 {
38 public void run()
39 {
40 Random r = new Random();
41 while(true)
42 {
43 try
44 {
45 if (queue.size() == 0)
46 {
47 System.out.println(Thread.currentThread().getName() + "---隊列為空");
48 }
49 else
50 {
51 int value = queue.take().intValue();
52 System.out.println(Thread.currentThread().getName() + "---從隊列中獲取值" + value);
53 }
54 Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
55 }
56 catch(Exception ex)
57 {
58 ex.printStackTrace();
59 }
60 }
61 }
62 };
63
64 ExecutorService exec = Executors.newFixedThreadPool(2);
65 exec.execute(setter);
66 exec.execute(getter);
67 }
68 }
我們定義了兩個線程,一個線程向Queue中添加數據,一個線程從Queue中取數據。我們可以通過控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,來使得程序得出不同的結果。
可能的執行結果如下:
pool-1-thread-1---向隊列中插入值88 pool-1-thread-2---從隊列中獲取值88 pool-1-thread-1---向隊列中插入值75 pool-1-thread-2---從隊列中獲取值75 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-1---向隊列中插入值50 pool-1-thread-2---從隊列中獲取值50 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-2---隊列為空 pool-1-thread-1---向隊列中插入值51 pool-1-thread-1---向隊列中插入值92 pool-1-thread-2---從隊列中獲取值51 pool-1-thread-2---從隊列中獲取值92
因為Queue中的值和Thread的休眠時間都是隨機的,所以執行結果也不是固定的。
使用信號量來控制線程
JDK提供了Semaphore來實現“信號量”的功能,它提供了兩個方法分別用於獲取和釋放信號量:acquire和release,示例代碼如下:
1 private static void semaphoreTest()
2 {
3 ExecutorService exec = Executors.newFixedThreadPool(10);
4 final Semaphore semp = new Semaphore(2);
5
6 for (int i = 0; i < 10; i++)
7 {
8 Runnable runner = new Runnable()
9 {
10 public void run()
11 {
12 try
13 {
14 semp.acquire();
15 System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在執行。");
16 Thread.sleep(5000);
17 semp.release();
18 }
19 catch(Exception ex)
20 {
21 ex.printStackTrace();
22 }
23 }
24 };
25 exec.execute(runner);
26 }
27
28 exec.shutdown();
29 }
執行結果如下:
Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在執行。 Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在執行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在執行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在執行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在執行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在執行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在執行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在執行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在執行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在執行。
可以看出,盡管線程池中創建了10個線程,但是同時運行的,只有2個線程。
控制線程池中所有線程的執行步驟
在前面,我們已經提到,可以用synchronized關鍵字來控制單個線程中的執行步驟,那么如果我們想要對線程池中的所有線程的執行步驟進行控制的話,應該如何實現呢?
我們有兩種方式,一種是使用CyclicBarrier,一種是使用CountDownLatch。
CyclicBarrier使用了類似於Object.wait的機制,它的構造函數中需要接收一個整型數字,用來說明它需要控制的線程數目,當在線程的run方法中調用它的await方法時,它會保證所有的線程都執行到這一步,才會繼續執行后面的步驟。
示例代碼如下:
1 class MyRunner2 implements Runnable
2 {
3 private CyclicBarrier barrier = null;
4 public MyRunner2(CyclicBarrier barrier)
5 {
6 this.barrier = barrier;
7 }
8
9 public void run() {
10 Random r = new Random();
11 try
12 {
13 for (int i = 0; i < 3; i++)
14 {
15 Thread.sleep(r.nextInt(10) * 1000);
16 System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");
17 barrier.await();
18 }
19 }
20 catch(Exception ex)
21 {
22 ex.printStackTrace();
23 }
24 }
25
26 }
27
28 private static void cyclicBarrierTest()
29 {
30 CyclicBarrier barrier = new CyclicBarrier(3);
31
32 ExecutorService exec = Executors.newFixedThreadPool(3);
33 for (int i = 0; i < 3; i++)
34 {
35 exec.execute(new MyRunner2(barrier));
36 }
37 exec.shutdown();
38 }
執行結果如下:
Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。 Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。 Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。 Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。 Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。
可以看出,thread-2到第1次等待點時,一直等到thread-1到達后才繼續執行。
CountDownLatch則是采取類似”倒計時計數器”的機制來控制線程池中的線程,它有CountDown和Await兩個方法。示例代碼如下:
1 private static void countdownLatchTest() throws InterruptedException
2 {
3 final CountDownLatch begin = new CountDownLatch(1);
4 final CountDownLatch end = new CountDownLatch(5);
5 ExecutorService exec = Executors.newFixedThreadPool(5);
6 for (int i = 0; i < 5; i++)
7 {
8 Runnable runner = new Runnable()
9 {
10 public void run()
11 {
12 Random r = new Random();
13 try
14 {
15 begin.await();
16 System.out.println(Thread.currentThread().getName() + "運行開始");
17 Thread.sleep(r.nextInt(10)*1000);
18 System.out.println(Thread.currentThread().getName() + "運行結束");
19 }
20 catch(Exception ex)
21 {
22 ex.printStackTrace();
23 }
24 finally
25 {
26 end.countDown();
27 }
28 }
29 };
30 exec.execute(runner);
31 }
32 begin.countDown();
33 end.await();
34 System.out.println(Thread.currentThread().getName() + "運行結束");
35 exec.shutdown();
36 }
執行結果如下:
pool-1-thread-1運行開始 pool-1-thread-5運行開始 pool-1-thread-2運行開始 pool-1-thread-3運行開始 pool-1-thread-4運行開始 pool-1-thread-2運行結束 pool-1-thread-1運行結束 pool-1-thread-3運行結束 pool-1-thread-5運行結束 pool-1-thread-4運行結束 main運行結束

