【1】生產者-消費者模型的三種實現方式


(手寫生產者消費者模型,寫BlockingQueue較簡便 )

1、背景                                                                    

生產者生產數據到緩沖區中,消費者從緩沖區中取數據。

如果緩沖區已經滿了,則生產者線程阻塞;

如果緩沖區為空,那么消費者線程阻塞。

2、方式一:synchronized、wait和notify

定義Resouce資源類,類中定義資源池大小。資源類的add()和remove()方法是synchronized 的。生產者/消費者線程持有一個資源類Resouce的成員變量,Main方法中通過構造函數將Resouce類傳入,線程run方法中操作Resouce類的add,remove方法

 

package producerConsumer;
//wait 和 notify
public class ProducerConsumerWithWaitNofity {
    public static void main(String[] args) {
        Resource resource = new Resource();
        //生產者線程
        ProducerThread p1 = new ProducerThread(resource);
        ProducerThread p2 = new ProducerThread(resource);
        ProducerThread p3 = new ProducerThread(resource);
        //消費者線程
        ConsumerThread c1 = new ConsumerThread(resource);
        //ConsumerThread c2 = new ConsumerThread(resource);
        //ConsumerThread c3 = new ConsumerThread(resource);
    
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        //c2.start();
        //c3.start();
    }
    
    
    
}
/**
 * 公共資源類
 * @author 
 *
 */
class Resource{//重要
    //當前資源數量
    private int num = 0;
    //資源池中允許存放的資源數目
    private int size = 10;

    /**
     * 從資源池中取走資源
     */
    public synchronized void remove(){
        if(num > 0){
            num--;
            System.out.println("消費者" + Thread.currentThread().getName() +
                    "消耗一件資源," + "當前線程池有" + num + "個");
            notifyAll();//通知生產者生產資源
        }else{
            try {
                //如果沒有資源,則消費者進入等待狀態
                wait();
                System.out.println("消費者" + Thread.currentThread().getName() + "線程進入等待狀態");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 向資源池中添加資源
     */
    public synchronized void add(){
        if(num < size){
            num++;
            System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" 
            + num + "個");
            //通知等待的消費者
            notifyAll();
        }else{
            //如果當前資源池中有10件資源
            try{
                wait();//生產者進入等待狀態,並釋放鎖
                System.out.println(Thread.currentThread().getName()+"線程進入等待");
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
/**
 * 消費者線程
 */
class ConsumerThread extends Thread{
    private Resource resource;
    public ConsumerThread(Resource resource){
        this.resource = resource;
    }
    @Override
    public void run() {
        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
/**
 * 生產者線程
 */
class ProducerThread extends Thread{
    private Resource resource;
    public ProducerThread(Resource resource){
        this.resource = resource;
    }
    @Override
    public void run() {
        //不斷地生產資源
        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
    
}
View Code

 

3、方式二:lock和condition的await、signalAll     

package producerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 使用Lock 和 Condition解決生產者消費者問題
 * @author tangzhijing
 *
 */
public class LockCondition {
        public static void main(String[] args) {
            Lock lock = new ReentrantLock();
            Condition producerCondition = lock.newCondition();
            Condition consumerCondition = lock.newCondition();
            Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);
            
            //生產者線程
            ProducerThread2 producer1 = new ProducerThread2(resource);
            
            //消費者線程
            ConsumerThread2 consumer1 = new ConsumerThread2(resource);
            ConsumerThread2 consumer2 = new ConsumerThread2(resource);
            ConsumerThread2 consumer3 = new ConsumerThread2(resource);
            
            producer1.start();
            consumer1.start();
            consumer2.start();
            consumer3.start();
        }
}
/**
 * 消費者線程
 */
class ConsumerThread2 extends Thread{
    private Resource2 resource;
    public ConsumerThread2(Resource2 resource){
        this.resource = resource;
        //setName("消費者");
    }
    public void run(){
        while(true){
            try {
                Thread.sleep((long) (1000 * Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}
/**
 * 生產者線程
 * @author tangzhijing
 *
 */
class ProducerThread2 extends Thread{
    private Resource2 resource;
    public ProducerThread2(Resource2 resource){
        this.resource = resource;
        setName("生產者");
    }
    public void run(){
        while(true){
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.add();
        }
    }
}
/**
 * 公共資源類
 * @author tangzhijing
 *
 */
class Resource2{
    private int num = 0;//當前資源數量
    private int size = 10;//資源池中允許存放的資源數目
    private Lock lock;
    private Condition producerCondition;
    private Condition consumerCondition;
    public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
 
    }
    /**
     * 向資源池中添加資源
     */
    public void add(){
        lock.lock();
        try{
            if(num < size){
                num++;
                System.out.println(Thread.currentThread().getName() + 
                        "生產一件資源,當前資源池有" + num + "個");
                //喚醒等待的消費者
                consumerCondition.signalAll();
            }else{
                //讓生產者線程等待
                try {
                    producerCondition.await();
                    System.out.println(Thread.currentThread().getName() + "線程進入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally{
            lock.unlock();
        }
    }
    /**
     * 從資源池中取走資源
     */
    public void remove(){
        lock.lock();
        try{
            if(num > 0){
                num--;
                System.out.println("消費者" + Thread.currentThread().getName() 
                        + "消耗一件資源," + "當前資源池有" + num + "個");
                producerCondition.signalAll();//喚醒等待的生產者
            }else{
                try {
                    consumerCondition.await();
                    System.out.println(Thread.currentThread().getName() + "線程進入等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }//讓消費者等待
            }
        }finally{
            lock.unlock();
        }
    }
    
}
View Code

 

4、方式三:BlockingQueue       

定義Resouce資源類,資源類持有一個BlockingQueue。生產者/消費者線程持有一個資源類Resouce的成員變量,Main方法中通過構造函數將Resouce類傳入,線程run方法中操作Resouce類的add,remove方法,add,remove調用Queue的put()take()

package producerConsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

//使用阻塞隊列BlockingQueue解決生產者消費者
public class BlockingQueueConsumerProducer {
    public static void main(String[] args) {
        Resource3 resource = new Resource3();
        //生產者線程
        ProducerThread3 p = new ProducerThread3(resource);
        //多個消費者
        ConsumerThread3 c1 = new ConsumerThread3(resource);
        ConsumerThread3 c2 = new ConsumerThread3(resource);
        ConsumerThread3 c3 = new ConsumerThread3(resource);
 
        p.start();
        c1.start();
        c2.start();
        c3.start();
    }
}
/**
 * 消費者線程
 * @author tangzhijing
 *
 */
class ConsumerThread3 extends Thread {
    private Resource3 resource3;
 
    public ConsumerThread3(Resource3 resource) {
        this.resource3 = resource;
        //setName("消費者");
    }
 
    public void run() {
        while (true) {
            try {
                Thread.sleep((long) (1000 * Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource3.remove();
        }
    }
}
/**
 * 生產者線程
 * @author tangzhijing
 *
 */
class ProducerThread3 extends Thread{
    private Resource3 resource3;
    public ProducerThread3(Resource3 resource) {
        this.resource3 = resource;
        //setName("生產者");
    }
 
    public void run() {
        while (true) {
            try {
                Thread.sleep((long) (1000 * Math.random()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource3.add();
        }
    }
}
class Resource3{
    private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10);
    /**
     * 向資源池中添加資源
     */
    public void add(){
        try {
            resourceQueue.put(1); //1當做生產和消費的Integer資源
            System.out.println("生產者" + Thread.currentThread().getName()
                    + "生產一件資源," + "當前資源池有" + resourceQueue.size() + 
                    "個資源");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * 向資源池中移除資源
     */
    public void remove(){
        try {
            resourceQueue.take();
            System.out.println("消費者" + Thread.currentThread().getName() + 
                    "消耗一件資源," + "當前資源池有" + resourceQueue.size() 
                    + "個資源");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code

 為什么用put和take:

 

 

為什么用put和take:https://blog.csdn.net/qiuchaoxi/article/details/80359462


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM