生產者/消費者模式實現


  wait/notify最經典的案例就是"生產者/消費者"模式。但是此模式有一些需要注意的地方。

  生產者-消費者也有多種實現方式。

    (1)常見的就是synchronized結合wait+notify實現

    (2)用Lock類實現

    (3)使用BlockingQueue阻塞隊列實現

 一、 synchronized結合wait()、notify()實現生產者消費者模型

1. 一個簡單的生產者消費者(相當於一個生產者,兩個消費者)

  一個線程向集合中添加元素,兩個線程從集合中刪除元素,與之前等待/通知博客的最后一個案例類似。

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 利用之前的等待/通知實現一個簡單的生產者消費者
 * 
 * @author Administrator
 *
 */
public class Demo1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 刪除元素線程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                list.wait();
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 刪除元素線程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                list.wait();
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素線程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {
                        synchronized (list) {
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notifyAll();
                            list.wait();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "B");
        addThread.start();
    }
}

 

結果:

18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->0,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->0, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->1,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->1, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->2,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->2, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->3,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->3, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->4,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->4, threadName->sub2

 

2. 多生產與多消費:操作值-假死 

  假死的現象其實就是進入waiting狀態。如果全部線程都進入waiting狀態,則程序就不再執行任何業務功能了,整個項目呈停止狀態。

  例如兩個生產者兩個消費者最后處於假死狀態的例子:

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**
 * 多生產與多消費:操作值-假死( 多生產與多消費保證只有一個元素生產與消費)
 * 
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 刪除元素線程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 刪除元素線程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素線程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add1");
        addThread.start();

        // 增加元素線程
        Thread.sleep(1 * 1000);
        Thread addThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add2");
        addThread2.start();

        Thread.sleep(10 * 1000);
        LOGGER.info("sub1 state->{}", sub1.getState());
        LOGGER.info("sub2 state->{}", sub2.getState());
        LOGGER.info("add1 state->{}", addThread.getState());
        LOGGER.info("add2 state->{}", addThread2.getState());
    }
}

 

結果:

18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->0,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] list.remove ->0, threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->add2,threadName->{}
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add2
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub2 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add2 state->WAITING

  

解釋一下上面的線程假死的原因:

  由於喚醒線程調用的是notify()喚醒單個線程,所以有可能喚醒的是同類的線程,也就是生產者喚醒的是生產者,消費者喚醒的是消費者。導致最后四個線程都處於waiting狀態。

 

解決辦法:

  喚醒的時候采用notifyAll()喚醒所有的線程喚醒所有的線程,避免只喚醒同類線程。

 

3. 多生產與多消費:操作值

  為了避免上面的假死線下,喚醒的時候采用notifyAll()喚醒所有的線程喚醒所有的線程,避免只喚醒同類線程。

  喚醒的時候也喚醒異類,這樣就不會出現假死的狀態了,程序會一直運行下去。

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 多生產與多消費:操作值
 * 
 * @author Administrator
 *
 */
public class Demo3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 刪除元素線程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 刪除元素線程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素線程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add1");
        addThread.start();

        // 增加元素線程
        Thread.sleep(1 * 1000);
        Thread addThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add2");
        addThread2.start();

        Thread.sleep(10 * 1000);
        LOGGER.info("sub1 state->{}", sub1.getState());
        LOGGER.info("sub2 state->{}", sub2.getState());
        LOGGER.info("add1 state->{}", addThread.getState());
        LOGGER.info("add2 state->{}", addThread2.getState());
    }
}

 

 

4.  多個生產者一個消費者

  附上一個多生產與一個消費者的例子。大體的思路是對資源加鎖,在線程中通過while(true)不停的調用資源的add()方法和remove()方法,兩個方法都是同步方法,需要同步是對當前對象加鎖。每個對象都有一個阻塞隊列,一個就緒隊列。

  在學習完wait/notify之后再次重寫下面代碼就清晰多了。

package cn.qlq.thread.seven;

public class Demo4 {
    public static void main(String[] args) {
        MyResource myResource = new MyResource();
        // 多個生產者一個消費者
        MyConsumerThread myConsumerThread = new MyConsumerThread(myResource);
        // MyConsumerThread myConsumerThread1 = new
        // MyConsumerThread(myResource);
        // MyConsumerThread myConsumerThread2 = new
        // MyConsumerThread(myResource);
        MyProducerThread myProducerThread = new MyProducerThread(myResource);
        MyProducerThread myProducerThread1 = new MyProducerThread(myResource);
        MyProducerThread myProducerThread2 = new MyProducerThread(myResource);
        myProducerThread.start();
        myProducerThread1.start();
        myProducerThread2.start();
        myConsumerThread.start();
        // myConsumerThread1.start();
        // myConsumerThread2.start();
    }
}

/**
 * 資源類 一個加一個減(都有同步鎖)
 * 
 * @author: qlq
 * @date : 2018年6月15日上午11:38:37
 */
class MyResource {
    private int num;// 資源數量
    private int capacity = 10;// 資源容量

    /**
     * 同步方法增加資源 如果數量大於容量,線程進入阻塞狀態 否則通知消費者進行消費
     */
    public synchronized void add() {
        if (num >= capacity) {// 大於等於的話進入阻塞狀態
            try {
                wait();
                System.out.println(Thread.currentThread().getName() + "進入線程等待。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            num++;// 生產一件資源
            System.out.println(Thread.currentThread().getName() + "生產一件資源,目前剩余資源" + num + "件");
            notifyAll();// 通知消費者進行消費
        }
    }

    /**
     * 同步方法移除資源 如果num>0,消費資源,通知生產者進行生產 否則的話進入阻塞隊列
     */
    public synchronized void remove() {
        if (num > 0) {
            num--;
            System.out.println(Thread.currentThread().getName() + "消費一件資源,目前剩余" + num + "件");
            notifyAll();// 喚醒生產者進行生產
        } else {
            try {
                wait();
                System.out.println(Thread.currentThread().getName() + "進入線程等待。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 生產者類
 * 
 * @author: qlq
 * @date : 2018年6月16日上午11:08:05
 */
class MyProducerThread extends Thread {
    private MyResource resource;

    protected MyProducerThread(MyResource resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}

/**
 * 消費者線程
 * 
 * @author: qlq
 * @date : 2018年6月16日上午11:09:06
 */
class MyConsumerThread extends Thread {
    private MyResource resource;

    protected MyConsumerThread(MyResource resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}

  

總結:   

  每個對象都有一個阻塞隊列,一個就緒隊列。同步方法中才可以使用wait()/notify(),而且獲得哪個鎖才能調用對應鎖的對應方法,否則會報非法監視器異常。

 

二、 ReentrantLock結合Condition的await()、sianalAll()實現生產者消費者模式

1.單個生產線程,多個消費線程的例子

  生產者和消費者分別阻塞在兩個條件下面的例子

package cn.qlq.thread.eleven;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ReentrantLock結合Condition實現生產者-消費者模型(單生產-多消費)
 * 
 * @author Administrator
 *
 */
public class Demo4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);
    private Lock lock = new ReentrantLock();
    private Condition producerCon = lock.newCondition();
    private Condition consumerCon = lock.newCondition();
    private volatile List<String> list = new ArrayList<String>();// 模擬是一個容器
    private volatile int capacity = 3;// 最多3個

    public void removeEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                while (list.size() == 0) {
                    consumerCon.await();// 阻塞消費者
                }
                LOGGER.info("threadName - > {} 消費元素 {}", Thread.currentThread().getName(), list.get(0));
                list.remove(0);
                producerCon.signal();// 喚醒生產者,由於是單個生產者,所以可以用signal()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void addEle() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                // 超過容量阻塞生產者
                while (list.size() >= capacity) {
                    producerCon.await();
                }
                LOGGER.info("threadName - > {} 生產元素 {}", Thread.currentThread().getName(), i);
                list.add(i + "");
                consumerCon.signalAll();// 喚醒消費者,由於是多個消費者,所以可以用signalAll()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final Demo4 demo4 = new Demo4();

        // 兩個消費者
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con2").start();

        // 單個生產者
        Thread.sleep(1 * 1000);
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro1").start();
    }
}

結果:

13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 0
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 1
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 2
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消費元素 0
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消費元素 1
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消費元素 2
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 3
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 4
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 5
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消費元素 3
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消費元素 4
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消費元素 5
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 6
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 7
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 8
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消費元素 6
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消費元素 7
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消費元素 8
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生產元素 9
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消費元素 9

 

2.多個生產線程,多個消費線程的例子

package cn.qlq.thread.eleven;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ReentrantLock結合Condition實現生產者-消費者模型(單生產-多消費)
 * 
 * @author Administrator
 *
 */
public class Demo5 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);
    private Lock lock = new ReentrantLock();
    private Condition producerCon = lock.newCondition();
    private Condition consumerCon = lock.newCondition();
    private volatile List<String> list = new ArrayList<String>();// 模擬是一個容器
    private volatile int capacity = 3;// 最多十個

    public void removeEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                while (list.size() == 0) {
                    consumerCon.await();// 阻塞消費者
                }
                LOGGER.info("threadName - > {} 消費元素 {}", Thread.currentThread().getName(), list.get(0));
                list.remove(0);
                producerCon.signalAll();// 喚醒生產者,由於是單個生產者,所以可以用signal()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void addEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                // 超過容量阻塞生產者
                while (list.size() >= capacity) {
                    producerCon.await();
                }
                LOGGER.info("threadName - > {} 生產元素 {}", Thread.currentThread().getName(), i);
                list.add(i + "");
                consumerCon.signalAll();// 喚醒消費者,由於是多個消費者,所以可以用signalAll()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final Demo5 demo4 = new Demo5();

        // 兩個消費者
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con2").start();

        // 單個生產者
        Thread.sleep(1 * 1000);
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro1").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro2").start();
    }
}

結果:

13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生產元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生產元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生產元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消費元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消費元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消費元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生產元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生產元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生產元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消費元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消費元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消費元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生產元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生產元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生產元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消費元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消費元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消費元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生產元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消費元素 4

 

三、 使用BlockingQueue阻塞隊列實現生產者消費者

  參考我的另一篇博客:https://www.cnblogs.com/qlqwjy/p/10175201.html 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM