生產者消費者模式-基於線程池


1. 為什么使用生產者消費者模式

(1)解耦合。消費者只關心隊列里面取出來的數據,不用關心數據的來源。比如,生產者服務的域名,url這些變更。

(2)支持異步。生產者生產出來數據,直接放入隊列就好了,接着生產下一個數據,不必等待。比如廚師做菜的時候,只需要把做好的菜放到傳送帶就接着做下一道菜。不需要有等有顧客過來把這個菜領走在做下一道;效率更高。

(3)流量削峰。雙十一零點那一刻,qps會飆升。如果為了這一小會的時間,增加機器不划算,因為平時的時候,這些機器足夠用。那我可以吧這些請求放到一個隊列,服務從隊列中拿出請求,運算后返回給客戶端。

 

2. 生產者消費者圖示

 

 

3. 代碼實現

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer<E> {
    private int queueSize = 5;
    private int producerNum = 2;
    private int consumerNum = 2;
    //創建一個阻塞隊列
    private LinkedBlockingQueue<E> blockingQueue = null;
    //生產者線程池
    private ExecutorService producerTheadPool = null;
    //消費者線程池
    private ExecutorService consumerTheadPool = null;

    public ProducerConsumer(){
        blockingQueue = new LinkedBlockingQueue<>(queueSize);
        producerTheadPool = Executors.newFixedThreadPool(producerNum);
        consumerTheadPool = Executors.newFixedThreadPool(consumerNum);
    }

    public ProducerConsumer(int queueSize, int producerNum, int consumerNum){
        blockingQueue = new LinkedBlockingQueue<>(queueSize);
        producerTheadPool = Executors.newFixedThreadPool(producerNum);
        consumerTheadPool = Executors.newFixedThreadPool(consumerNum);
    }

    public void produceEleAsync(E ele){
        if(!checkSuccess()){
            return;
        }
        Producer<E> producer = new Producer<E>(this.blockingQueue, ele);
        producerTheadPool.execute(producer);
    }

    //執行消費過程
    public void consumeEleAsync() {
        if(!checkSuccess()){
            return;
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        E ele = blockingQueue.take();//阻塞獲取數據
                        Consumer<E> consumer = new Consumer<E>(ele);
                        consumerTheadPool.execute(consumer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    //判空檢查
    private boolean checkSuccess(){
        if(blockingQueue!=null
                && producerTheadPool!=null
                && consumerTheadPool!=null){
            return true;
        }
        return false;
    }

    //生產者
    private class Producer<E> implements Runnable{
        private LinkedBlockingQueue<E> blockingQueue;
        private E ele;

        public Producer(LinkedBlockingQueue<E> blockingQueue, E ele){
            this.blockingQueue = blockingQueue;
            this.ele = ele;
        }

        @Override
        public void run() {
            if(this.blockingQueue!=null && ele!=null){
                try {
                    this.blockingQueue.put(ele);
                } catch (InterruptedException e) {
                    e.getStackTrace();
                }
            }
        }
    }

    //消費者
    private class Consumer<E> implements Runnable{
        private E ele;

        public Consumer(E ele){
            this.ele = ele;
        }

        @Override
        public void run() {
            //執行消費過程
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if(ele!=null){
                System.out.println("消費--->" + ele.toString());
            }
        }
    }
    

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM