生產者和消費者並發模式的幾種簡單寫法


1、使用synchronized

package cn.luxh.app.test;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 *    synchronized實現的生產者和消費者
 */
public class ProducerCustomerWithSynchronized {
    
    Executor pool = Executors.newFixedThreadPool(10);
    
    //倉庫
    private List<String> storageList = new LinkedList<String>();
    
    //倉庫容量
    private int MAX_SIZE = 3;
    
    //倉庫為空
    private int ZERO = 0;
    
    //生產者線程
    private class Producer implements Runnable{
        
        //生產方法,需同步
        private void produce(){
            synchronized (storageList) {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備生產!");
                try {
                    if(storageList.size()==MAX_SIZE) {
                        System.out.println("倉庫已滿!等待消費者消費");
                        Thread.sleep(1000);
                        storageList.wait();//當前線程放棄鎖,處於等待狀態,讓其他線程執行
                    }
                    if(storageList.size()<MAX_SIZE) {
                        String name = "產品"+new Random().nextInt();
                        storageList.add(name);
                        System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!");
                    }
                    Thread.sleep(1000);
                    storageList.notifyAll();//當前線程放棄鎖,喚醒其他線程
                        
                }catch(InterruptedException ie) {
                    System.out.println("中斷異常");
                    ie.printStackTrace();
                }
                
            }
        }

        @Override
        public void run() {
            while(true) {
                produce();
            }
        }
    }
    
    //消費者線程
    private class Customer implements Runnable{
        
        //消費方法,需同步
        private void consume() {
            synchronized (storageList) {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備消費!");
                try {
                    if(storageList.size()==ZERO) {
                        System.out.println("倉庫已空!等待生產者生產");
                        Thread.sleep(1000);
                        storageList.wait();//當前線程放棄鎖,處於等待狀態,讓其他線程執行
                    }
                    if(storageList.size()!=ZERO) {
                        System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageList.remove(0));
                    }
                    Thread.sleep(1000);
                    storageList.notifyAll();//當前線程放棄鎖,喚醒其他線程
                }catch(InterruptedException ie) {
                    System.out.println("中斷異常");
                    ie.printStackTrace();
                }
                
            }
        }

        @Override
        public void run() {
            while(true) {
                consume();
            }
        }
        
    }
    
    //啟動生產者和消費者線程
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            //new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }
        
    }
    
    public static void main(String[] args) {
        ProducerCustomerWithSynchronized pc = new ProducerCustomerWithSynchronized();
        pc.start();
    }
}
View Code

 

2、使用Lock

package cn.luxh.app.test;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
 * Lock實現的生產者和消費者
 *
 */
public class ProducerCustomerWithLock {
    
    Executor pool = Executors.newFixedThreadPool(10);
    
    //倉庫
    private List<String> storageList = new LinkedList<String>();
    
    //倉庫容量
    private int MAX_SIZE = 3;
    
    //倉庫為空
    private int ZERO = 0;
    
    //獲取鎖對象
    private Lock lock = new ReentrantLock();
    
    //倉庫滿了,綁定生產者線程
    private Condition full = lock.newCondition();
    
    //倉庫為空,綁定消費者線程
    private Condition empty = lock.newCondition();
    
    //生產者線程
    private class Producer implements Runnable{
        
        //生產方法,需同步
        private void produce(){
            if(lock.tryLock()) {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備生產!");
                try {
                    if(storageList.size()==MAX_SIZE) {
                        System.out.println("倉庫已滿!等待消費者消費");
                        Thread.sleep(1000);
                        full.await();//生產者線程加入線程等待池
                    }
                    if(storageList.size()<MAX_SIZE){
                        String name = "產品"+new Random().nextInt();
                        storageList.add(name);
                        System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!");
                    }
                    Thread.sleep(1000);
                    empty.signalAll();//喚醒消費者線程
                        
                }catch(InterruptedException ie) {
                    System.out.println("中斷異常");
                    ie.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }

        @Override
        public void run() {
            while(true) {
                produce();
            }
        }
    }
    
    //消費者線程
    private class Customer implements Runnable{
        
        //消費方法,需同步
        private void consume() {
            if(lock.tryLock()) {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備消費!");
                try {
                    if(storageList.size()==ZERO) {
                        System.out.println("倉庫已空!等待生產者生產");
                        Thread.sleep(1000);
                        empty.await();//消費者線程加入線程等待池
                    }
                    if(storageList.size()!=ZERO) {
                        System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageList.remove(0));
                    }
                    
                    Thread.sleep(1000);
                    full.signalAll();//喚醒生產者線程
                }catch(InterruptedException ie) {
                    System.out.println("中斷異常");
                    ie.printStackTrace();
                }finally{
                    lock.unlock();
                }
            }
        }

        @Override
        public void run() {
            while(true) {
                consume();
            }
        }
        
    }
    
    //啟動生產者和消費者線程
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            //new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }
        
    }
    
    public static void main(String[] args) {
        ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
        pc.start();
    }
}
View Code

 

3、使用BlockingQueue

package cn.luxh.app.test;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;


/**
 * BlockingQueue實現的生產者和消費者
 *
 */
public class ProducerCustomerWithBlockingQueue {
    
    Executor pool = Executors.newFixedThreadPool(10);
    
    //倉庫
    private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5);
    
    //倉庫容量
    private int MAX_SIZE = 3;
    
    //倉庫為空
    private int ZERO = 0;
    
    
    
    //生產者線程
    private class Producer implements Runnable{
        
        //生產方法,需同步
        private void produce(){
            try {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備生產!");
                if(storageQueue.size()==MAX_SIZE) {
                    System.out.println("倉庫已滿!等待消費者消費");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()<=MAX_SIZE) {
                    String product = "產品"+new Random().nextInt();
                    storageQueue.put(product);
                    System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!");
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中斷異常");
                ie.printStackTrace();
            }
        }

        @Override
        public void run() {
            while(true) {
                produce();
            }
        }
    }
    
    //消費者線程
    private class Customer implements Runnable{
        
        //消費方法,需同步
        private void consume() {
            try {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,准備消費!");
                if(storageQueue.size()==ZERO) {
                    System.out.println("倉庫已空!等待生產者生產");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()!=ZERO) {
                    System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageQueue.take());
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中斷異常");
                ie.printStackTrace();
            }
        }

        @Override
        public void run() {
            while(true) {
                consume();
            }
        }
        
    }
    
    //啟動生產者和消費者線程
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            ///new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }
        
    }
    
    public static void main(String[] args) {
        ProducerCustomerWithBlockingQueue pc = new ProducerCustomerWithBlockingQueue();
        pc.start();
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM