多線程(六)線程間的通信和協作


  系統要實現某個全局功能必定要需要各個子模塊之間的協調和配合,就像一個團隊要完成某項任務的時候需要團隊各個成員之間密切配合一樣。而對於系統中的各個子線程來說,如果要完成一個系統功能,同樣需要各個線程的配合,這樣就少不了線程之間的通信與協作。常見的線程之間通信方式有如下幾種:

  1、wait和notify/notifyAll

   2、await和signal/signalAll

   3、sleep/yield/join

   4、CyclicBarrier 柵欄

   5、CountDownLatch 閉鎖

   6、Semaphore 信號量

 

一、wait和notify/notifyAll

在使用之前先明確 :

   wait和notify是Object的方法,任何一個對象都具有該方法。在使用的時候,首先需要設置一個全局鎖對象,通過對該鎖的釋放和持有來控制該線程的運行和等待。因此在調用wait和notify的時候,該線程必須要已經持有該鎖,然后才可調用,否則將會拋出IllegalMonitorStateException異常。
           確定要讓哪個線程等待?讓哪個線程等待就在哪個線程中調用鎖對象的wait方法。調用wait等待的是當前線程,而不是被調用線程,並不是theread.wait()就可以讓thread等待,而是讓當前線程(調用wait方法的線程,不是調用者)進行等待。盡量不要把線程對象當做全局鎖使用,以免混淆等待線程。
看一下使用方法:(代碼中省略了main方法,對sleep()和println()方法進行了封裝)

package thread.blogs.cooperation;

import scala.Console;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by PerkinsZhu on 2017/8/21 10:25.
 */
public class TestWaitAndNotify {
    public static void main(String[] args) {
        TestWaitAndNotify test = new TestWaitAndNotify();
        test.testWait();
    }

    Object obj = new Object();//創建一個全局變量,用來協調各個線程
    ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>();//設置一個線程wait和notify的觸發條件
    class MyRunner implements Runnable {
        @Override
        public void run() {
            num.set(new AtomicInteger(0));
            while (true) {
                Console.println(Thread.currentThread().getName());
                if (num.get().getAndIncrement() == 1) {
                    synchronized (obj) {//如果要想調用wait方法,則必須持有該對象。否則將會拋出IllegalMonitorStateException
                        try {
                            Console.println(Thread.currentThread().getName() + "掛起等待");
                            obj.wait();//同一個線程可以wait多次,多個線程也可以使用同一個obj調用wait
                            Console.println(Thread.currentThread().getName() + "喚醒!!!");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                sleep(1000);
            }
        }
    }

    private void testWait() {
        MyRunner runner = new MyRunner();
        new Thread(runner).start();
        new Thread(runner).start();

        AtomicInteger num03 = new AtomicInteger(0);
        Thread th03 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    synchronized (obj) {//調用notify/notifyAll和wait一樣,同樣需要持有該對象
                        if (num03.getAndIncrement() == 5) {
                            obj.notify();//喚醒最先一個掛在obj上面的線程.每次只喚醒一個。這里是按照等待的先后順序進行喚醒
                        }
                    }
                    sleep(1000);
                }
            }
        });
        th03.start();
    }

    private void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   運行結果如下:

Thread-1
Thread-0
Thread-1
Thread-0
Thread-1掛起等待
Thread-0掛起等待
Thread-1喚醒!!!
Thread-1
Thread-1
Thread-1

 從執行結果只中可以看出,在執行兩次輸出之后,兩個線程被分別掛起等待。過一會之后線程1被成功喚醒。這里之所以喚醒的是Thread-1是因為Thread-1是第一個先掛起的,所以在notify()方法在喚起wait線程的時候也是公平的,依照wait的掛起順序來執行喚醒。

在使用wait的時候,同一個obj可以被多個線程調用obj.wait(),也可以被同一個線程執行多次obj.wait();

例如,修改try catch代碼代碼塊

                            Console.println(Thread.currentThread().getName() + "掛起等待");
                            obj.wait();//執行多次wait操作
                            obj.wait();
                            obj.wait();
                            Console.println(Thread.currentThread().getName() + "喚醒!!!");

然后只啟動一個線程

        new Thread(runner,"thread--01").start();
//        new Thread(runner,"thread--02").start();

 執行結果如下:

thread--01
thread--01
thread--01掛起等待

 線程一直停滯在此處,無法繼續執行,這是因為線程調用了三此wait,而如果要想成功喚醒線程,則同樣需要調用三次notify或者調用一次notifyAll()。這里就不再列出代碼。

wait方法有兩個重載方法:

public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException 
 
        

兩個方法都是wait指定時間之后,如果依舊沒有被其它線程喚醒或者被中斷則會自動停止wait。其中第二個方法指定了時間的單位。

public final void wait(long timeout,
                       int nanos)
               throws InterruptedException
Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed.
This method is similar to the wait method of one argument, but it allows finer control over the amount of time to wait for a notification before giving up. The amount of real time, measured in nanoseconds, is given by:
       1000000*timeout+nanos
In all other respects, this method does the same thing as the method wait(long) of one argument. In particular, wait(0, 0) means the same thing as wait(0).
The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until either of the following two conditions has occurred:
Another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method.
The timeout period, specified by timeout milliseconds plus nanos nanoseconds arguments, has elapsed.
The thread then waits until it can re-obtain ownership of the monitor and resumes execution.
As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:
           synchronized (obj) {
               while (<condition does not hold>)
                   obj.wait(timeout, nanos);
               ... // Perform action appropriate to condition
           }
       
This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

 

  注意這里的synchronized的目的不是加鎖控制線程的串行,而是為了持有鎖來調用wait和notify對象。
   在理解這線程調用obj.wait()的時候可以理解為"掛在obj對象上的線程",而對於線程調用obj.notify()可以理解為"喚起最后一個掛在obj上面的那個線程",而對於線程調用obj.notifyAll(),則可以理解為"喚起所有掛在obj對象上的線程"。obj對象在這里起的作用就是一個信息載體中介。各個線程通過這個中介進行通行協作,控制線程之間的暫停和執行。

二、await和signal/signalAll

  await和signal是Condition的兩個方法,其作用和wait和notify一樣,目的都是讓線程掛起等待,不同的是,這兩種方法是屬於Condition的兩個方法,而Condition對象是由ReentrantLock調用newCondition()方法得到的。Condition對象就相當於前面所說的中介,在線程中調用contiton.await()和condition.signal()可以分別使線程等待和喚醒。

如需要了解tryLock的使用可以看這里:多線程(五) java的線程鎖

 使用示例:

package thread.blogs.cooperation;

import scala.Console;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by PerkinsZhu on 2017/8/21 11:59.
 */
public class TestCondition {
    public static void main(String[] args) {
        TestCondition test = new TestCondition();
        test.testWait();
    }

    ReentrantLock lock = new ReentrantLock();
    ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>();
    Condition condition = lock.newCondition();

    private void testWait() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                num.set(new AtomicInteger(1));
                while (true) {
                    if (num.get().getAndIncrement() == 5) {
                        Console.println("signal---!!!");
                        try {
                            lock.lock();
                            condition.signal();
                        } finally {
                            lock.unlock();
                        }
                    }
                    Console.println("thread ---- 01");
                    sleep(1000);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                num.set(new AtomicInteger(1));
                while (true) {
                    if (num.get().getAndIncrement() == 2) {
                        try {
                           //lock.tryLock();
                            //lock.tryLock(5000, TimeUnit.MILLISECONDS);
                            lock.lock();//這里同樣要加鎖,否則會拋出IllegalMonitorStateException異常。注意的是這里不要使用synchronized進行加鎖,而是使用lock
                            condition.await();//注意這里不要調用wait!!!
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            lock.unlock();
                        }
                    }
                    Console.println("thread ---- 02");
                    sleep(1000);
                }
            }
        }).start();
    }
    private void sleep(int time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  在使用Condition的時候和Synchronized沒有太大的區別,只是調用的方法變為await和signal。需要注意的是這里加鎖不再使用synchronized()進行加鎖,而是使用lock和unlock進行加鎖。

 執行結果如下:

thread ---- 01
thread ---- 02
thread ---- 01
thread ---- 01
thread ---- 01
signal---!!!
thread ---- 01
thread ---- 02
thread ---- 02
thread ---- 01

  三、sleep/yield/join

對於sleep()方法應該很熟悉了,讓當前線程睡眠一段時間。期間不會釋放任何持有的鎖

public static native void sleep(long millis) throws InterruptedException;
 1     /**
 2      * Causes the currently executing thread to sleep (temporarily cease
 3      * execution) for the specified number of milliseconds, subject to
 4      * the precision and accuracy of system timers and schedulers. The thread
 5      * does not lose ownership of any monitors.
 6      *
 7      * @param  millis
 8      *         the length of time to sleep in milliseconds
 9      *
10      * @throws  IllegalArgumentException
11      *          if the value of {@code millis} is negative
12      *
13      * @throws  InterruptedException
14      *          if any thread has interrupted the current thread. The
15      *          <i>interrupted status</i> of the current thread is
16      *          cleared when this exception is thrown.
17      */

  對於yield()方法可能使用的情況少一下。其作用主要是讓當前線程從運行狀態轉變為就緒狀態,由線程調度重新選擇就緒狀態的線程分配CPU資源。至於最終會選取哪個線程分配CPU資源就由調度策略來決定了,有可能還是該線程,有可能換為其它線程。

 1   /**
 2      * A hint to the scheduler that the current thread is willing to yield
 3      * its current use of a processor. The scheduler is free to ignore this
 4      * hint.
 5      *
 6      * <p> Yield is a heuristic attempt to improve relative progression
 7      * between threads that would otherwise over-utilise a CPU. Its use
 8      * should be combined with detailed profiling and benchmarking to
 9      * ensure that it actually has the desired effect.
10      *
11      * <p> It is rarely appropriate to use this method. It may be useful
12      * for debugging or testing purposes, where it may help to reproduce
13      * bugs due to race conditions. It may also be useful when designing
14      * concurrency control constructs such as the ones in the
15      * {@link java.util.concurrent.locks} package.
16      */
17     public static native void yield();

  對於join方法,作用是暫停當前線程,等待被調用線程指向結束之后再繼續執行。

 1  /**
 2      * Waits for this thread to die.
 3      *
 4      * <p> An invocation of this method behaves in exactly the same
 5      * way as the invocation
 6      *
 7      * <blockquote>
 8      * {@linkplain #join(long) join}{@code (0)}
 9      * </blockquote>
10      *
11      * @throws  InterruptedException
12      *          if any thread has interrupted the current thread. The
13      *          <i>interrupted status</i> of the current thread is
14      *          cleared when this exception is thrown.
15      */
16     public final void join() throws InterruptedException;

  使用join的時候需要注意:

  1、調用join的時候,當前線程不會釋放掉鎖,如果調用線程也需要該鎖則就會導致死鎖!

  2、join方法不會啟動調用線程,所以,在調用join之前,該調用線程必須已經start啟動,否則不會達到想要的效果。

join的底層實際是就是使用了一個自旋等待機制,判斷調用線程是否死亡,如果沒有則一直讓當前線程wait。可以看一下底層實現源碼:

 1 public final synchronized void join(long millis) throws InterruptedException {
 2         long base = System.currentTimeMillis();
 3         long now = 0;
 4         if (millis < 0) {
 5             throw new IllegalArgumentException("timeout value is negative");
 6         }
 7         if (millis == 0) {
 8             while (isAlive()) {//如果調用者依舊沒有結束,讓當前線程進行等待
 9                 wait(0);//注意這里的wait是等待的當前線程,而不是調用者線程
10             }
11         } else {
12             while (isAlive()) {
13                 long delay = millis - now;
14                 if (delay <= 0) {
15                     break;
16                 }
17                 wait(delay);//指定等待的時間
18                 now = System.currentTimeMillis() - base;
19             }
20         }
21     }

  四、CyclicBarrier柵欄

  CyclicBarrier字面理解為線程屏障,當指定數量的線程執行到指定位置的時候,才能觸發后續動作的進行。其最終目的是讓所有線程同時開始后續的工作。

例如:三個員工來公司開會,由於三人住的地方與公司距離不同,所以到會議室的時間也不同。而會議開始必須等待三者都到達會議室之后才能進行。

代碼如下:

 1 package thread.blogs.cooperation;
 2 
 3 import scala.Console;
 4 
 5 import java.util.concurrent.CyclicBarrier;
 6 
 7 /**
 8  * Created by PerkinsZhu on 2017/8/30 10:32.
 9  */
10 public class TestCyclicBarrier {
11     public static void main(String[] args) {
12         testCyclicBarrier();
13     }
14 
15     private static void testCyclicBarrier() {
16         /**
17          * 注意這里等待的是三個線程。這就相當於一個線程計數器,當指定個數的線程執行 barrier.await();方法之后,才會執行后續的代碼,否則每個線程都會一直進行等待。
18          * 如果把3修改為4,則將永遠等待下去,不會開始會議。
19          * 如果把3修改為2,則小張到達之后就會提前開始會議,不會繼續等待小王。
20          */
21         CyclicBarrier barrier = new CyclicBarrier(3);
22 
23         Thread 小李 = new Thread(new MyRunner(barrier, "小李", 2000));
24         小李.start();
25         Thread 小張 = new Thread(new MyRunner(barrier, "小張", 4000));
26         小張.start();
27         Thread 小王 = new Thread(new MyRunner(barrier, "小王", 5000));
28         小王.start();
29     }
30 
31     static class MyRunner implements Runnable {
32         CyclicBarrier barrier;
33         String name;
34         int time;
35 
36         public MyRunner(CyclicBarrier barrier, String name, int time) {
37             this.barrier = barrier;
38             this.name = name;
39             this.time = time;
40         }
41 
42         @Override
43         public void run() {
44             Console.println(name + " 開始出發去公司。");
45             sleep(time);
46             Console.println(name + " 終於到會議室!!!");
47             try {
48                 barrier.await();
49             } catch (Exception e) {
50                 e.printStackTrace();
51             }
52             startMeeting(name);
53         }
54     }
55 
56     private static void startMeeting(String name) {
57         Console.println(name + "說:人齊了。會議開始!!");
58     }
59 
60     private static void sleep(int time) {
61         try {
62             Thread.sleep(time);
63         } catch (InterruptedException e) {
64             e.printStackTrace();
65         }
66     }
67 }

  運行結果:

1 小李 開始出發去公司。
2 小王 開始出發去公司。
3 小張 開始出發去公司。
4 小李 終於到會議室!!!
5 小張 終於到會議室!!!
6 小王 終於到會議室!!!
7 小王說:人齊了。會議開始!!
8 小李說:人齊了。會議開始!!
9 小張說:人齊了。會議開始!!

  在使用CyclicBarrier的時候,提供了一個重載的構造器。

public CyclicBarrier(int parties, Runnable barrierAction) {}
barrierAction會在一組線程中的最后一個線程到達之后(但在釋放所有線程之前)觸發。
例如修改上面的代碼21行為:
        CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                Console.println("======");
            }
        });

   運行結果:

小張 開始出發去公司。
小李 開始出發去公司。
小王 開始出發去公司。
小李 終於到會議室!!!
小張 終於到會議室!!!
小王 終於到會議室!!!
======
小王說:人齊了。會議開始!!
小李說:人齊了。會議開始!!
小張說:人齊了。會議開始!!

 五、CountDownLatch閉鎖

與CycliBarrier不同的是CountDownLatch是某一個線程等待其他線程執行到某一位置之后,該線程(調用countDownLatch.await();等待的線程)才會繼續后續工作。而CycliBarrier是各個線程執行到某位置之后,然后所有線程一齊開始后續的工作。相同的是兩者都屬於線程計數器。

使用示例如下: boss等待所有員工來開會,當所有人員都到齊之后,boss宣布開始會議!!!

 1 package thread.blogs.cooperation;
 2 
 3 import scala.Console;
 4 
 5 import java.util.concurrent.CountDownLatch;
 6 
 7 /**
 8  * Created by PerkinsZhu on 2017/8/30 10:32.
 9  */
10 public class TestCyclicBarrier {
11     public static void main(String[] args) {
12         testCyclicBarrier();
13     }
14 
15     private static void testCyclicBarrier() {
16 
17         CountDownLatch countDownLatch = new CountDownLatch(3);//注意這里的參數指定了等待的線程數量
18 
19         new Thread(new MyRunner(countDownLatch, "小李", 2000)).start();
20         new Thread(new MyRunner(countDownLatch, "小張", 4000)).start();
21         new Thread(new MyRunner(countDownLatch, "小王", 5000)).start();
22 
23         try {
24             Console.println("等待員工到來開會。。。。。。。");
25             countDownLatch.await();//注意這里是await。主線程將會一直等待在這里,當所有線程都執行 countDownLatch.countDown();之后當前線程才會繼續執行
26             startMeeting("Boss");
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30     }
31 
32     static class MyRunner implements Runnable {
33         CountDownLatch countDownLatch;
34         String name;
35         int time;
36 
37         public MyRunner(CountDownLatch countDownLatch, String name, int time) {
38             this.countDownLatch = countDownLatch;
39             this.name = name;
40             this.time = time;
41         }
42 
43         @Override
44         public void run() {
45             Console.println(name + " 開始出發去公司。");
46             sleep(time);
47             Console.println(name + " 終於到會議室!!!");
48             countDownLatch.countDown();
         Console.println(name + " 准備好了!!");
49 } 50 } 51 52 private static void startMeeting(String name) { 53 Console.println(name + "說:人齊了。會議開始!!"); 54 } 55 56 private static void sleep(int time) { 57 try { 58 Thread.sleep(time); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 } 63 }

  執行結果如下:

等待員工到來開會。。。。。。。
小王 開始出發去公司。
小張 開始出發去公司。
小李 開始出發去公司。
小李 終於到會議室!!!
小李 准備好了!!
小張 終於到會議室!!!
小張 准備好了!!
小王 終於到會議室!!!
小王 准備好了!!
Boss說:人齊了。會議開始!!

  注意區分是某一個線程等待其他線程還是所有線程在達到某一條件之后一起執行!!!

 6、Semaphore 信號量

  Semaphore在線程協作方面主要用於控制同時訪問臨界區資源的線程個數。信號量是屬於操作系統層面的概念,jdk提供了操作接口。

 使用示例如下:

 1 package thread.blogs.cooperation;
 2 
 3 import scala.Console;
 4 
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Semaphore;
 8 
 9 /**
10  * Created by PerkinsZhu on 2017/8/30 11:43.
11  */
12 public class TestSemaphore {
13     public static void main(String[] args) {
14         testSemaphore();
15     }
16 
17     private static void testSemaphore() {
18         Semaphore semaphore = new Semaphore(2, true);//指定同時訪問臨界區資源的線程數量。第二個參數指定以公平方式訪問臨界區資源
19         ExecutorService excutorService = Executors.newFixedThreadPool(10);
20         for (int i = 0; i < 6; i++) {//啟動10個線程請求資源
21             excutorService.execute(new MyRunner(semaphore));
22             sleep(0);//逐個啟動線程
23         }
24         excutorService.shutdown();
25     }
26 
27     static class MyRunner implements Runnable {
28         Semaphore semaphore;
29 
30         public MyRunner(Semaphore semaphore) {
31             this.semaphore = semaphore;
32         }
33 
34         @Override
35         public void run() {
36             String name = Thread.currentThread().getName();
37             try {
38                 Console.println(name + "  ------請求資源!!");
39                 //semaphore.acquire(2);//設置請求資源的數量。必須有足夠數量的資源才可進去臨界區。不過釋放的時候也要一起釋放,請求幾個就要調用幾次release()
40                 semaphore.acquire();//請求獲取資源,如果有空閑資源則會立即獲取,進入臨界區,否則將會等待,一直等待到獲取到臨界區資源
41                 Console.println(name + "  ======獲取資源!!");
42                 sleep(1000);
43                 //semaphore.release();
44                 semaphore.release();//釋放資源
45                 Console.println(name + "  ******釋放資源!!");
46 
47             } catch (InterruptedException e) {
48                 e.printStackTrace();
49             }
50         }
51     }
52 
53     private static void sleep(int time) {
54         try {
55             Thread.sleep(time);
56         } catch (InterruptedException e) {
57             e.printStackTrace();
58         }
59     }
60 }

  執行結果如下:

pool-1-thread-1  ------請求資源!!
pool-1-thread-2  ------請求資源!!
pool-1-thread-6  ------請求資源!!
pool-1-thread-5  ------請求資源!!
pool-1-thread-3  ------請求資源!!
pool-1-thread-4  ------請求資源!!
pool-1-thread-2  ======獲取資源!!
pool-1-thread-1  ======獲取資源!!
pool-1-thread-1  ******釋放資源!!
pool-1-thread-6  ======獲取資源!!
pool-1-thread-5  ======獲取資源!!
pool-1-thread-2  ******釋放資源!!
pool-1-thread-6  ******釋放資源!!
pool-1-thread-4  ======獲取資源!!
pool-1-thread-3  ======獲取資源!!
pool-1-thread-5  ******釋放資源!!
pool-1-thread-4  ******釋放資源!!
pool-1-thread-3  ******釋放資源!!

 根據結果可以看出只有當有線程釋放資源之后,才會有新的線程獲取到資源。即控制了同一時間訪問臨界區資源的線程數量。當Semaphore(1)設置為1的時候,此時可以當做鎖來使用。多線程(五) java的線程鎖

 

 線程之間的通信和協作方式大概就以上六種,要熟悉每種工具的使用場景和方法特性,通過靈活的組合各個工具來靈活控制各個線程的工作。在決定使用哪種工具之前必須要明確自己的目的是什么,要實現什么樣的機制,這樣才能確定選擇哪種工具協調各個線程。

 

=========================================

原文鏈接:多線程(六)線程間的通信和協作轉載請注明出處!

=========================================

---end


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM