Java線程間通信的幾種方式


文章目錄

一、使用同一個共享變量控制

二、PipedInputStream、PipedOutputStream

三、利用BlockingQueue

四、利用LockSupport

五、利用ThreadLocal

 

一、使用同一個共享變量控制

Synchronized、wait、notify

public class Demo1 {

    private final List<Integer> list =new ArrayList<>();

    public static void main(String[] args) {
        Demo1 demo =new Demo1();
        new Thread(()->{
            for (int i=0;i<10;i++){
                synchronized (demo.list){
                    if(demo.list.size()%2==1){
                        try {
                            demo.list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                    demo.list.notify();
                }
            }

        }).start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                synchronized (demo.list){
                    if(demo.list.size()%2==0){
                        try {
                            demo.list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                    demo.list.notify();
                }
            }
        }).start();
    }
}

Lock、Condition

public class Task {
    private final Lock lock = new ReentrantLock();

    private final Condition addConditon = lock.newCondition();
    private final Condition subConditon = lock.newCondition();

    private volatile int num = 0;
    private List<String> list = new ArrayList<>();

    public void add() {
        for (int i = 0; i < 10; i++) {
            lock.lock();

            try {
                if (list.size() == 10) {
                    addConditon.await();
                }
                num++;
                Thread.sleep(100);
                list.add("add " + num);
                System.out.println("The list size is " + list.size());
                System.out.println("The add thread is " + Thread.currentThread().getName());
                System.out.println("-------------");
                subConditon.signal();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public void sub() {
        for (int i = 0; i < 10; i++) {
            lock.lock();

            try {

                if (list.size() == 0) {
                    subConditon.await();
                }
                num--;
                Thread.sleep(100);
                list.remove(0);
                System.out.println("The list size is " + list.size());
                System.out.println("The sub thread is " + Thread.currentThread().getName());
                System.out.println("-------------");
                addConditon.signal();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }


    public static void main(String[] args) {
        Task task = new Task();
        new Thread(task::add).start();
        new Thread(task::sub).start();
    }
}

利用volatile

volatile修飾的變量值直接存在主內存里面,子線程對該變量的讀寫直接寫住內存,而不是像其它變量一樣在local thread里面產生一份copy。volatile能保證所修飾的變量對於多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。

public class Demo2 {
    private volatile List<Integer> list =new ArrayList<>();
    public static void main(String[] args) {
        Demo2 demo =new Demo2();
        new Thread(()->{
            for (int i=0;i<10;i++){
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
            }

        }).start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                }
        }).start();
    }
}

利用AtomicInteger

和volatile類似, 只是原子操作達到預估值非A即B

二、PipedInputStream、PipedOutputStream

這里用流在兩個線程間通信,但是Java中的Stream是單向的,所以在兩個線程中分別建了一個input和output

public class PipedDemo {

    private final PipedInputStream inputStream1;
    private final PipedOutputStream outputStream1;
    private final PipedInputStream inputStream2;
    private final PipedOutputStream outputStream2;

    public PipedDemo(){
        inputStream1 = new PipedInputStream();
        outputStream1 = new PipedOutputStream();
        inputStream2 = new PipedInputStream();
        outputStream2 = new PipedOutputStream();
        try {
            inputStream1.connect(outputStream2);
            inputStream2.connect(outputStream1);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    /**程序退出時,需要關閉stream*/
    public void shutdown() throws IOException {
        inputStream1.close();
        inputStream2.close();
        outputStream1.close();
        outputStream2.close();
    }


    public static void main(String[] args) throws IOException {
        PipedDemo demo =new PipedDemo();
        new Thread(()->{
            PipedInputStream in = demo.inputStream2;
            PipedOutputStream out = demo.outputStream2;

            for (int i = 0; i < 10; i++) {
                try {
                    byte[] inArr = new byte[2];
                    in.read(inArr);
                    System.out.print(Thread.currentThread().getName()+": "+i+" ");
                    System.out.println(new String(inArr));
                    while(true){
                        if("go".equals(new String(inArr)))
                            break;
                    }
                    out.write("ok".getBytes());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }).start();

        new Thread(()->{
            PipedInputStream in = demo.inputStream1;
            PipedOutputStream out = demo.outputStream1;

            for (int i = 0; i < 10; i++) {
                try {
                    out.write("go".getBytes());
                    byte[] inArr = new byte[2];
                    in.read(inArr);
                    System.out.print(Thread.currentThread().getName()+": "+i+" ");
                    System.out.println(new String(inArr));
                    while(true){
                        if("ok".equals(new String(inArr)))
                            break;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }).start();
//        demo.shutdown();
    }
}

輸出:

Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok

三、利用BlockingQueue

BlockingQueue定義的常用方法如下:

  • add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
  • offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
  • put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續。
  • poll(time):獲取並刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。
  • peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。
  • take():獲取並刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。

BlockingQueue有四個具體的實現類:

  • ArrayBlockingQueue:數組阻塞隊列,規定大小,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
  • LinkedBlockingQueue:鏈阻塞隊列,大小不定,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。
  • PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。
  • SynchronousQueue:特殊的BlockingQueue,它的內部同時只能夠容納單個元素,對其的操作必須是放和取交替完成的。
  • DelayQueue:延遲隊列,注入其中的元素必須實現 java.util.concurrent.Delayed 接口

所有BlockingQueue的使用方式類似,以下例子一個線程寫入,一個線程讀取,操作的是同一個Queue:

public class BlockingQueueDemo {

    public static void main(String[] args) {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        //讀線程
        new Thread(() -> {
            int i =0;
            while (true) {
                try {
                    String item = queue.take();
                    System.out.print(Thread.currentThread().getName() + ": " + i + " ");
                    System.out.println(item);
                    i++;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        //寫線程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String item = "go"+i;
                    System.out.print(Thread.currentThread().getName() + ": " + i + " ");
                    System.out.println(item);
                    queue.put(item);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

四、利用LockSupport

用LockSupport的unpark()和park()方法,實現線程間通信。

 

五、利用ThreadLocal

ThreadLocal,即線程變量,是一個以 ThreadLocal 對象為鍵、任意對象為值的存儲結構。這個結構被依附在線程上,也就是說一個線程可以根據一個 ThreadLocal 對象查詢到綁定在這個線程上的一個值。

可以通過 set(T) 方法來設置一個值,在當前線程下再通過 get() 方法獲取到原先設置的值。

public class ThreadLocalDemo {

    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>() {
        @Override
        protected Long initialValue() {
            return System.currentTimeMillis();
        }
    };

    public static final void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static final long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadLocalDemo.begin();
        Thread.sleep(2000);
        System.out.println(ThreadLocalDemo.end());
    }
}
//輸出 2003

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM