Java回顧之多線程同步


  第一篇:Java回顧之I/O

  第二篇:Java回顧之網絡通信

  第三篇:Java回顧之多線程

 

  在這篇文章里,我們關注線程同步的話題。這是比多線程更復雜,稍不留意,我們就會“掉到坑里”,而且和單線程程序不同,多線程的錯誤是否每次都出現,也是不固定的,這給調試也帶來了很大的挑戰。

  在這篇文章里,我們首先闡述什么是同步,不同步有什么問題,然后討論可以采取哪些措施控制同步,接下來我們會仿照回顧網絡通信時那樣,構建一個服務器端的“線程池”,JDK為我們提供了一個很大的concurrent工具包,最后我們會對里面的內容進行探索。

  為什么要線程同步?

  說到線程同步,大部分情況下, 我們是在針對“單對象多線程”的情況進行討論,一般會將其分成兩部分,一部分是關於“共享變量”,一部分關於“執行步驟”。

  共享變量

  當我們在線程對象(Runnable)中定義了全局變量,run方法會修改該變量時,如果有多個線程同時使用該線程對象,那么就會造成全局變量的值被同時修改,造成錯誤。我們來看下面的代碼:

共享變量造成同步問題
 1 class MyRunner implements Runnable
 2 {
 3     public int sum = 0;
 4     
 5     public void run() 
 6     {
 7         System.out.println(Thread.currentThread().getName() + " Start.");
 8         for (int i = 1; i <= 100; i++)
 9         {
10             sum += i;
11         }
12         try {
13             Thread.sleep(500);
14         } catch (InterruptedException e) {
15             e.printStackTrace();
16         }
17         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
18         System.out.println(Thread.currentThread().getName() + " End.");
19     }
20 }
21 
22 
23 private static void sharedVaribleTest() throws InterruptedException
24 {
25     MyRunner runner = new MyRunner();
26     Thread thread1 = new Thread(runner);
27     Thread thread2 = new Thread(runner);
28     thread1.setDaemon(true);
29     thread2.setDaemon(true);
30     thread1.start();
31     thread2.start();
32     thread1.join();
33     thread2.join();
34 }

  這個示例中,線程用來計算1到100的和是多少,我們知道正確結果是5050(好像是高斯小時候玩過這個?),但是上述程序返回的結果是10100,原因是兩個線程同時對sum進行操作。

  執行步驟

  我們在多個線程運行時,可能需要某些操作合在一起作為“原子操作”,即在這些操作可以看做是“單線程”的,例如我們可能希望輸出結果的樣子是這樣的:

1 線程1:步驟1
2 線程1:步驟2
3 線程1:步驟3
4 線程2:步驟1
5 線程2:步驟2
6 線程2:步驟3

  如果同步控制不好,出來的樣子可能是這樣的:

線程1:步驟1
線程2:步驟1
線程1:步驟2
線程2:步驟2
線程1:步驟3
線程2:步驟3

  這里我們也給出一個示例代碼:

執行步驟混亂帶來的同步問題
 1 class MyNonSyncRunner implements Runnable
 2 {
 3     public void run() {
 4         System.out.println(Thread.currentThread().getName() + " Start.");
 5         for(int i = 1; i <= 5; i++)
 6         {
 7             System.out.println(Thread.currentThread().getName() + " Running step " + i);
 8             try
 9             {
10                 Thread.sleep(50);
11             }
12             catch(InterruptedException ex)
13             {
14                 ex.printStackTrace();
15             }
16         }
17         System.out.println(Thread.currentThread().getName() + " End.");
18     }
19 }
20 
21 
22 private static void syncTest() throws InterruptedException
23 {
24     MyNonSyncRunner runner = new MyNonSyncRunner();
25     Thread thread1 = new Thread(runner);
26     Thread thread2 = new Thread(runner);
27     thread1.setDaemon(true);
28     thread2.setDaemon(true);
29     thread1.start();
30     thread2.start();
31     thread1.join();
32     thread2.join();
33 }

  如何控制線程同步

  既然線程同步有上述問題,那么我們應該如何去解決呢?針對不同原因造成的同步問題,我們可以采取不同的策略。

  控制共享變量

  我們可以采取3種方式來控制共享變量。

  將“單對象多線程”修改成“多對象多線程”  

  上文提及,同步問題一般發生在“單對象多線程”的場景中,那么最簡單的處理方式就是將運行模型修改成“多對象多線程”的樣子,針對上面示例中的同步問題,修改后的代碼如下:

解決共享變量問題方案一
 1 private static void sharedVaribleTest2() throws InterruptedException
 2 {
 3     Thread thread1 = new Thread(new MyRunner());
 4     Thread thread2 = new Thread(new MyRunner());
 5     thread1.setDaemon(true);
 6     thread2.setDaemon(true);
 7     thread1.start();
 8     thread2.start();
 9     thread1.join();
10     thread2.join();
11 }

  我們可以看到,上述代碼中兩個線程使用了兩個不同的Runnable實例,它們在運行過程中,就不會去訪問同一個全局變量。

  將“全局變量”降級為“局部變量”

  既然是共享變量造成的問題,那么我們可以將共享變量改為“不共享”,即將其修改為局部變量。這樣也可以解決問題,同樣針對上面的示例,這種解決方式的代碼如下:

解決共享變量問題方案二
 1 class MyRunner2 implements Runnable
 2 {
 3     public void run() 
 4     {
 5         System.out.println(Thread.currentThread().getName() + " Start.");
 6         int sum = 0;
 7         for (int i = 1; i <= 100; i++)
 8         {
 9             sum += i;
10         }
11         try {
12             Thread.sleep(500);
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
17         System.out.println(Thread.currentThread().getName() + " End.");
18     }
19 }
20 
21 
22 private static void sharedVaribleTest3() throws InterruptedException
23 {
24     MyRunner2 runner = new MyRunner2();
25     Thread thread1 = new Thread(runner);
26     Thread thread2 = new Thread(runner);
27     thread1.setDaemon(true);
28     thread2.setDaemon(true);
29     thread1.start();
30     thread2.start();
31     thread1.join();
32     thread2.join();
33 }

  我們可以看出,sum變量已經由全局變量變為run方法內部的局部變量了。

  使用ThreadLocal機制

  ThreadLocal是JDK引入的一種機制,它用於解決線程間共享變量,使用ThreadLocal聲明的變量,即使在線程中屬於全局變量,針對每個線程來講,這個變量也是獨立的。

  我們可以用這種方式來改造上面的代碼,如下所示:

解決共享變量問題方案三
 1 class MyRunner3 implements Runnable
 2 {
 3     public ThreadLocal<Integer> tl = new ThreadLocal<Integer>();
 4     
 5     public void run() 
 6     {
 7         System.out.println(Thread.currentThread().getName() + " Start.");
 8         for (int i = 0; i <= 100; i++)
 9         {
10             if (tl.get() == null)
11             {
12                 tl.set(new Integer(0));
13             }
14             int sum = ((Integer)tl.get()).intValue();
15             sum+= i;
16             tl.set(new Integer(sum));
17             try {
18                 Thread.sleep(10);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             }
22         }
23         
24         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
25         System.out.println(Thread.currentThread().getName() + " End.");
26     }
27 }
28 
29 
30 private static void sharedVaribleTest4() throws InterruptedException
31 {
32     MyRunner3 runner = new MyRunner3();
33     Thread thread1 = new Thread(runner);
34     Thread thread2 = new Thread(runner);
35     thread1.setDaemon(true);
36     thread2.setDaemon(true);
37     thread1.start();
38     thread2.start();
39     thread1.join();
40     thread2.join();
41 }

  綜上三種方案,第一種方案會降低多線程執行的效率,因此,我們推薦使用第二種或者第三種方案。

  控制執行步驟

  說到執行步驟,我們可以使用synchronized關鍵字來解決它。

執行步驟問題解決方案
 1 class MySyncRunner implements Runnable
 2 {
 3     public void run() {
 4         synchronized(this)
 5         {
 6             System.out.println(Thread.currentThread().getName() + " Start.");
 7             for(int i = 1; i <= 5; i++)
 8             {
 9                 System.out.println(Thread.currentThread().getName() + " Running step " + i);
10                 try
11                 {
12                     Thread.sleep(50);
13                 }
14                 catch(InterruptedException ex)
15                 {
16                     ex.printStackTrace();
17                 }
18             }
19             System.out.println(Thread.currentThread().getName() + " End.");
20         }
21     }
22 }
23 
24 
25 private static void syncTest2() throws InterruptedException
26 {
27     MySyncRunner runner = new MySyncRunner();
28     Thread thread1 = new Thread(runner);
29     Thread thread2 = new Thread(runner);
30     thread1.setDaemon(true);
31     thread2.setDaemon(true);
32     thread1.start();
33     thread2.start();
34     thread1.join();
35     thread2.join();
36 }

  在線程同步的話題上,synchronized是一個非常重要的關鍵字。它的原理和數據庫中事務鎖的原理類似。我們在使用過程中,應該盡量縮減synchronized覆蓋的范圍,原因有二:1)被它覆蓋的范圍是串行的,效率低;2)容易產生死鎖。我們來看下面的示例:

synchronized示例
 1 private static void syncTest3() throws InterruptedException
 2 {
 3     final List<Integer> list = new ArrayList<Integer>();
 4     
 5     Thread thread1 = new Thread()
 6     {
 7         public void run()
 8         {
 9             System.out.println(Thread.currentThread().getName() + " Start.");
10             Random r = new Random(100);
11             synchronized(list)
12             {
13                 for (int i = 0; i < 5; i++)
14                 {
15                     list.add(new Integer(r.nextInt()));
16                 }
17                 System.out.println("The size of list is " + list.size());
18             }
19             try
20             {
21                 Thread.sleep(500);
22             }
23             catch(InterruptedException ex)
24             {
25                 ex.printStackTrace();
26             }
27             System.out.println(Thread.currentThread().getName() + " End.");
28         }
29     };
30     
31     Thread thread2 = new Thread()
32     {
33         public void run()
34         {
35             System.out.println(Thread.currentThread().getName() + " Start.");
36             Random r = new Random(100);
37             synchronized(list)
38             {
39                 for (int i = 0; i < 5; i++)
40                 {
41                     list.add(new Integer(r.nextInt()));
42                 }
43                 System.out.println("The size of list is " + list.size());
44             }
45             try
46             {
47                 Thread.sleep(500);
48             }
49             catch(InterruptedException ex)
50             {
51                 ex.printStackTrace();
52             }
53             System.out.println(Thread.currentThread().getName() + " End.");
54         }
55     };
56     
57     thread1.start();
58     thread2.start();
59     thread1.join();
60     thread2.join();
61 }

  我們應該把需要同步的內容集中在一起,盡量不包含其他不相關的、消耗大量資源的操作,示例中線程休眠的操作顯然不應該包括在里面。

  構造線程池

  我們在Java回顧之網絡通信中,已經構建了一個Socket連接池,這里我們在此基礎上,構建一個線程池,完成基本的啟動、休眠、喚醒、停止操作。

  基本思路還是以數組的形式保持一系列線程,通過Socket通信,客戶端向服務器端發送命令,當服務器端接收到命令后,根據收到的命令對線程數組中的線程進行操作。

  Socket客戶端的代碼保持不變,依然采用構建Socket連接池時的代碼,我們主要針對服務器端進行改造。

  首先,我們需要定義一個線程對象,它用來執行我們的業務操作,這里簡化起見,只讓線程進行休眠。

定義線程對象
 1 enum ThreadStatus
 2 {
 3     Initial,
 4     Running,
 5     Sleeping,
 6     Stopped
 7 }
 8 
 9 enum ThreadTask
10 {
11     Start,
12     Stop,
13     Sleep,
14     Wakeup
15 }
16 
17 
18 class MyThread extends Thread
19 {
20     public ThreadStatus status = ThreadStatus.Initial;
21     public ThreadTask task;
22     public void run()
23     {
24         status = ThreadStatus.Running;
25         while(true)
26         {
27             try {
28                 Thread.sleep(3000);
29                 if (status == ThreadStatus.Sleeping)
30                 {
31                     System.out.println(Thread.currentThread().getName() + " 進入休眠狀態。");
32                     this.wait();
33                 }
34             } catch (InterruptedException e) {
35                 System.out.println(Thread.currentThread().getName() + " 運行過程中出現錯誤。");
36                 status = ThreadStatus.Stopped;
37             }
38         }
39     }
40 }

  然后,我們需要定義一個線程管理器,它用來對線程池中的線程進行管理,代碼如下:

定義線程池管理對象
 1 class MyThreadManager
 2 {
 3     public static void manageThread(MyThread[] threads, ThreadTask task)
 4     {
 5         for (int i = 0; i < threads.length; i++)
 6         {
 7             synchronized(threads[i])
 8             {
 9                 manageThread(threads[i], task);
10             }
11         }
12         System.out.println(getThreadStatus(threads));
13     }
14     
15     public static void manageThread(MyThread thread, ThreadTask task)
16     {
17         if (task == ThreadTask.Start)
18         {
19             if (thread.status == ThreadStatus.Running)
20             {
21                 return;
22             }
23             if (thread.status == ThreadStatus.Stopped)
24             {
25                 thread = new MyThread();
26             }
27             thread.status = ThreadStatus.Running;
28             thread.start();
29             
30         }
31         else if (task == ThreadTask.Stop)
32         {
33             if (thread.status != ThreadStatus.Stopped)
34             {
35                 thread.interrupt();
36                 thread.status = ThreadStatus.Stopped;
37             }
38         }
39         else if (task == ThreadTask.Sleep)
40         {
41             thread.status = ThreadStatus.Sleeping;
42         }
43         else if (task == ThreadTask.Wakeup)
44         {
45             thread.notify();
46             thread.status = ThreadStatus.Running;
47         }
48     }
49     
50     public static String getThreadStatus(MyThread[] threads)
51     {
52         StringBuffer sb = new StringBuffer();
53         for (int i = 0; i < threads.length; i++)
54         {
55             sb.append(threads[i].getName() + "的狀態:" + threads[i].status).append("\r\n");
56         }
57         return sb.toString();
58     }
59 }

  最后,是我們的服務器端,它不斷接受客戶端的請求,每收到一個連接請求,服務器端會新開一個線程,來處理后續客戶端發來的各種操作指令。

定義服務器端線程池對象
 1 public class MyThreadPool {
 2 
 3     public static void main(String[] args) throws IOException
 4     {
 5         MyThreadPool pool = new MyThreadPool(5);
 6     }
 7     
 8     private int threadCount;
 9     private MyThread[] threads = null;
10     
11     
12     public MyThreadPool(int count) throws IOException
13     {
14         this.threadCount = count;
15         threads = new MyThread[count];
16         for (int i = 0; i < threads.length; i++)
17         {
18             threads[i] = new MyThread();
19             threads[i].start();
20         }
21         Init();
22     }
23     
24     private void Init() throws IOException
25     {
26         ServerSocket serverSocket = new ServerSocket(5678);
27         while(true)
28         {
29             final Socket socket = serverSocket.accept();
30             Thread thread = new Thread()
31             {
32                 public void run()
33                 {
34                     try
35                     {
36                         System.out.println("檢測到一個新的Socket連接。");
37                         BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
38                         PrintStream ps = new PrintStream(socket.getOutputStream());
39                         String line = null;
40                         while((line = br.readLine()) != null)
41                         {
42                             System.out.println(line);
43                             if (line.equals("Count"))
44                             {
45                                 System.out.println("線程池中有5個線程");
46                             }
47                             else if (line.equals("Status"))
48                             {
49                                 String status = MyThreadManager.getThreadStatus(threads);
50                                 System.out.println(status);
51                             }
52                             else if (line.equals("StartAll"))
53                             {
54                                 MyThreadManager.manageThread(threads, ThreadTask.Start);
55                             }
56                             else if (line.equals("StopAll"))
57                             {
58                                 MyThreadManager.manageThread(threads, ThreadTask.Stop);
59                             }
60                             else if (line.equals("SleepAll"))
61                             {
62                                 MyThreadManager.manageThread(threads, ThreadTask.Sleep);
63                             }
64                             else if (line.equals("WakeupAll"))
65                             {
66                                 MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
67                             }
68                             else if (line.equals("End"))
69                             {
70                                 break;
71                             }
72                             else
73                             {
74                                 System.out.println("Command:" + line);
75                             }
76                             ps.println("OK");
77                             ps.flush();
78                         }
79                     }
80                     catch(Exception ex)
81                     {
82                         ex.printStackTrace();
83                     }
84                 }
85             };
86             thread.start();
87         }
88     }
89 }

  探索JDK中的concurrent工具包

  為了簡化開發人員在進行多線程開發時的工作量,並減少程序中的bug,JDK提供了一套concurrent工具包,我們可以用它來方便的開發多線程程序。

  線程池  

  我們在上面實現了一個非常“簡陋”的線程池,concurrent工具包中也提供了線程池,而且使用非常方便。

  concurrent工具包中的線程池分為3類:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。

  首先我們來定義一個Runnable的對象

定義Runnable對象
 1 class MyRunner implements Runnable
 2 {
 3     public void run() {
 4         System.out.println(Thread.currentThread().getName() + "運行開始");
 5         for(int i = 0; i < 1; i++)
 6         {
 7             try
 8             {
 9                 System.out.println(Thread.currentThread().getName() + "正在運行");
10                 Thread.sleep(200);
11             }
12             catch(Exception ex)
13             {
14                 ex.printStackTrace();
15             }
16         }
17         System.out.println(Thread.currentThread().getName() + "運行結束");
18     }
19 }

  可以看出,它的功能非常簡單,只是輸出了線程的執行過程。

  ScheduledThreadPool

  這和我們平時使用的ScheduledTask比較類似,或者說很像Timer,它可以使得一個線程在指定的一段時間內開始運行,並且在間隔另外一段時間后再次運行,直到線程池關閉。

  示例代碼如下:

ScheduledThreadPool示例
 1 private static void scheduledThreadPoolTest()
 2 {
 3     final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
 4     
 5     MyRunner runner = new MyRunner();
 6     
 7     final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
 8     final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
 9     
10     scheduler.schedule(new Runnable()
11     {
12         public void run()
13         {
14             handler1.cancel(true);
15             handler2.cancel(true);
16             scheduler.shutdown();
17         }
18     }, 30, TimeUnit.SECONDS
19     );
20 }

  FixedThreadPool

  這是一個指定容量的線程池,即我們可以指定在同一時間,線程池中最多有多個線程在運行,超出的線程,需要等線程池中有空閑線程時,才能有機會運行。

  來看下面的代碼:

FixedThreadPool示例
 1 private static void fixedThreadPoolTest()
 2 {
 3     ExecutorService exec = Executors.newFixedThreadPool(3);
 4     for(int i = 0; i < 5; i++)
 5     {
 6         MyRunner runner = new MyRunner();
 7         exec.execute(runner);
 8     }
 9     exec.shutdown();
10 }

  注意它的輸出結果:

pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行開始
pool-1-thread-3正在運行
pool-1-thread-1運行結束
pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行結束
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行結束
pool-1-thread-1運行結束
pool-1-thread-2運行結束

  可以看到從始至終,最多有3個線程在同時運行。

  CachedThreadPool

  這是另外一種線程池,它不需要指定容量,只要有需要,它就會創建新的線程。

  它的使用方式和FixedThreadPool非常像,來看下面的代碼:

CachedThreadPool示例
 1 private static void cachedThreadPoolTest()
 2 {
 3     ExecutorService exec = Executors.newCachedThreadPool();
 4     for(int i = 0; i < 5; i++)
 5     {
 6         MyRunner runner = new MyRunner();
 7         exec.execute(runner);
 8     }
 9     exec.shutdown();
10 }

  它的執行結果如下:

pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行開始
pool-1-thread-3正在運行
pool-1-thread-4運行開始
pool-1-thread-4正在運行
pool-1-thread-5運行開始
pool-1-thread-5正在運行
pool-1-thread-1運行結束
pool-1-thread-2運行結束
pool-1-thread-3運行結束
pool-1-thread-4運行結束
pool-1-thread-5運行結束

  可以看到,它創建了5個線程。

  處理線程返回值

  在有些情況下,我們需要使用線程的返回值,在上述的所有代碼中,線程這是執行了某些操作,沒有任何返回值。

  如何做到這一點呢?我們可以使用JDK中的Callable<T>和CompletionService<T>,前者返回單個線程的結果,后者返回一組線程的結果。

  返回單個線程的結果

  還是直接看代碼吧:

Callable示例
 1 private static void callableTest() throws InterruptedException, ExecutionException
 2 {
 3     ExecutorService exec = Executors.newFixedThreadPool(1);
 4     Callable<String> call = new Callable<String>()
 5     {
 6         public String call()
 7         {
 8             return "Hello World.";
 9         }
10     };
11     Future<String> result = exec.submit(call);
12     System.out.println("線程的返回值是" + result.get());
13     exec.shutdown();
14 }

  執行結果如下:

線程的返回值是Hello World.

  返回線程池中每個線程的結果

  這里需要使用CompletionService<T>,代碼如下:

CompletionService示例
 1 private static void completionServiceTest() throws InterruptedException, ExecutionException
 2 {
 3     ExecutorService exec = Executors.newFixedThreadPool(10);
 4     CompletionService<String> service = new ExecutorCompletionService<String>(exec);
 5     for (int i = 0; i < 10; i++)
 6     {
 7         Callable<String> call = new Callable<String>()
 8         {
 9             public String call() throws InterruptedException
10             {
11                 return Thread.currentThread().getName();
12             }
13         };
14         service.submit(call);
15     }
16     
17     Thread.sleep(1000);
18     for(int i = 0; i < 10; i++)
19     {
20         Future<String> result = service.take();
21         System.out.println("線程的返回值是" + result.get());
22     }
23     exec.shutdown();
24 }

  執行結果如下:

線程的返回值是pool-2-thread-1
線程的返回值是pool-2-thread-2
線程的返回值是pool-2-thread-3
線程的返回值是pool-2-thread-5
線程的返回值是pool-2-thread-4
線程的返回值是pool-2-thread-6
線程的返回值是pool-2-thread-8
線程的返回值是pool-2-thread-7
線程的返回值是pool-2-thread-9
線程的返回值是pool-2-thread-10

  實現生產者-消費者模型

  對於生產者-消費者模型來說,我們應該都不會陌生,通常我們都會使用某種數據結構來實現它。在concurrent工具包中,我們可以使用BlockingQueue來實現生產者-消費者模型,如下:

BlockingQueue示例
 1 public class BlockingQueueSample {
 2 
 3     public static void main(String[] args)
 4     {
 5         blockingQueueTest();
 6     }
 7     
 8     private static void blockingQueueTest()
 9     {
10         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
11         final int maxSleepTimeForSetter = 10;
12         final int maxSleepTimerForGetter = 10;
13         
14         Runnable setter = new Runnable()
15         {
16             public void run()
17             {
18                 Random r = new Random();
19                 while(true)
20                 {
21                     int value = r.nextInt(100);
22                     try
23                     {
24                         queue.put(new Integer(value));
25                         System.out.println(Thread.currentThread().getName() + "---向隊列中插入值" + value);
26                         Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
27                     }
28                     catch(Exception ex)
29                     {
30                         ex.printStackTrace();
31                     }
32                 }
33             }
34         };
35         
36         Runnable getter = new Runnable()
37         {
38             public void run()
39             {
40                 Random r = new Random();
41                 while(true)
42                 {
43                     try
44                     {
45                         if (queue.size() == 0)
46                         {
47                             System.out.println(Thread.currentThread().getName() + "---隊列為空");
48                         }
49                         else
50                         {
51                             int value = queue.take().intValue();
52                             System.out.println(Thread.currentThread().getName() + "---從隊列中獲取值" + value);
53                         }
54                         Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
55                     }
56                     catch(Exception ex)
57                     {
58                         ex.printStackTrace();
59                     }
60                 }
61             }
62         };
63         
64         ExecutorService exec = Executors.newFixedThreadPool(2);
65         exec.execute(setter);
66         exec.execute(getter);
67     }
68 }

  我們定義了兩個線程,一個線程向Queue中添加數據,一個線程從Queue中取數據。我們可以通過控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,來使得程序得出不同的結果。

  可能的執行結果如下:

pool-1-thread-1---向隊列中插入值88
pool-1-thread-2---從隊列中獲取值88
pool-1-thread-1---向隊列中插入值75
pool-1-thread-2---從隊列中獲取值75
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-1---向隊列中插入值50
pool-1-thread-2---從隊列中獲取值50
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-1---向隊列中插入值51
pool-1-thread-1---向隊列中插入值92
pool-1-thread-2---從隊列中獲取值51
pool-1-thread-2---從隊列中獲取值92

  因為Queue中的值和Thread的休眠時間都是隨機的,所以執行結果也不是固定的。

  使用信號量來控制線程

  JDK提供了Semaphore來實現“信號量”的功能,它提供了兩個方法分別用於獲取和釋放信號量:acquire和release,示例代碼如下:

SemaPhore示例
 1 private static void semaphoreTest()
 2 {
 3     ExecutorService exec = Executors.newFixedThreadPool(10);
 4     final Semaphore semp = new Semaphore(2);
 5     
 6     for (int i = 0; i < 10; i++)
 7     {
 8         Runnable runner = new Runnable()
 9         {
10             public void run()
11             {
12                 try
13                 {
14                     semp.acquire();
15                     System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在執行。");
16                     Thread.sleep(5000);
17                     semp.release();
18                 }
19                 catch(Exception ex)
20                 {
21                     ex.printStackTrace();
22                 }
23             }
24         };
25         exec.execute(runner);
26     }
27     
28     exec.shutdown();
29 }

  執行結果如下:

Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在執行。
Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在執行。
Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在執行。
Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在執行。
Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在執行。
Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在執行。
Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在執行。
Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在執行。
Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在執行。
Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在執行。

  可以看出,盡管線程池中創建了10個線程,但是同時運行的,只有2個線程。

  控制線程池中所有線程的執行步驟

  在前面,我們已經提到,可以用synchronized關鍵字來控制單個線程中的執行步驟,那么如果我們想要對線程池中的所有線程的執行步驟進行控制的話,應該如何實現呢?

  我們有兩種方式,一種是使用CyclicBarrier,一種是使用CountDownLatch。

  CyclicBarrier使用了類似於Object.wait的機制,它的構造函數中需要接收一個整型數字,用來說明它需要控制的線程數目,當在線程的run方法中調用它的await方法時,它會保證所有的線程都執行到這一步,才會繼續執行后面的步驟。

  示例代碼如下:

CyclicBarrier示例
 1 class MyRunner2 implements Runnable
 2 {
 3     private CyclicBarrier barrier = null;
 4     public MyRunner2(CyclicBarrier barrier)
 5     {
 6         this.barrier = barrier;
 7     }
 8     
 9     public void run() {
10         Random r = new Random();
11         try
12         {
13             for (int i = 0; i < 3; i++)
14             {
15                 Thread.sleep(r.nextInt(10) * 1000);
16                 System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");
17                 barrier.await();
18             }
19         }
20         catch(Exception ex)
21         {
22             ex.printStackTrace();
23         }
24     }
25     
26 }
27 
28 private static void cyclicBarrierTest()
29 {
30     CyclicBarrier barrier = new CyclicBarrier(3);
31     
32     ExecutorService exec = Executors.newFixedThreadPool(3);
33     for (int i = 0; i < 3; i++)
34     {
35         exec.execute(new MyRunner2(barrier));
36     }
37     exec.shutdown();
38 }

  執行結果如下:

Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。
Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。
Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。
Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。
Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。
Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。
Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。
Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。
Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。

  可以看出,thread-2到第1次等待點時,一直等到thread-1到達后才繼續執行。

  CountDownLatch則是采取類似”倒計時計數器”的機制來控制線程池中的線程,它有CountDown和Await兩個方法。示例代碼如下:

CountDownLatch示例
 1 private static void countdownLatchTest() throws InterruptedException
 2 {
 3     final CountDownLatch begin = new CountDownLatch(1);
 4     final CountDownLatch end = new CountDownLatch(5);
 5     ExecutorService exec = Executors.newFixedThreadPool(5);
 6     for (int i = 0; i < 5; i++)
 7     {
 8         Runnable runner = new Runnable()
 9         {
10             public void run()
11             {
12                 Random r = new Random();
13                 try
14                 {
15                     begin.await();
16                     System.out.println(Thread.currentThread().getName() + "運行開始");
17                     Thread.sleep(r.nextInt(10)*1000);
18                     System.out.println(Thread.currentThread().getName() + "運行結束");
19                 }
20                 catch(Exception ex)
21                 {
22                     ex.printStackTrace();
23                 }
24                 finally
25                 {
26                     end.countDown();
27                 }
28             }
29         };
30         exec.execute(runner);
31     }
32     begin.countDown();
33     end.await();
34     System.out.println(Thread.currentThread().getName() + "運行結束");
35     exec.shutdown();
36 }

  執行結果如下:

pool-1-thread-1運行開始
pool-1-thread-5運行開始
pool-1-thread-2運行開始
pool-1-thread-3運行開始
pool-1-thread-4運行開始
pool-1-thread-2運行結束
pool-1-thread-1運行結束
pool-1-thread-3運行結束
pool-1-thread-5運行結束
pool-1-thread-4運行結束
main運行結束

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM