多線程——生產者消費者模式三種實現方式


  生產者消費者模式通過一個阻塞隊列來解決兩者之間的強耦合問題。阻塞隊列相當於一個緩沖區,平衡消費者和生產者的處理能力。

  阻塞隊列有數據——生產者不生產,阻塞隊列沒數據——消費者不消費

一、synchronized+wait+notifyAll

 生產

package com.ProductCusromer.method2;

import java.util.List;
import java.util.Random;

/**
 * @author Millet
 * @date 2020/3/30 17:59
 */
public class Product2 implements Runnable {
    private List list;

    public Product2(List list) {
        this.list = list;
    }
    //多個生產者對應多個消費者
    @Override
    public void run() {
        Random random = new Random(50);//相當於商品
        while (true){
            synchronized (list){
                while(list.size()>0){
                    //集合中有數據就停止寫入,等待消費者消費完
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //執行到這說明集合沒有數據了
                list.add((random.nextInt(50)));
                //提示
                System.out.println(Thread.currentThread().getName()+"線程生產了:"+ list.get(0));
                list.notifyAll();
            }
        }
    }
}

 消費者

package com.ProductCusromer.method2;

import java.util.List;

/**
 * @author Millet
 * @date 2020/3/30 18:07
 */
public class Consumer2 implements Runnable {
    private List list;

    public Consumer2(List list) {
        this.list = list;
    }

    @Override
    public void run() {
        while(true){
            synchronized (list){
                while(list.size()<=0){//適合多生產者多消費者
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                 try {
                     Thread.sleep(500);
                  } catch (InterruptedException e) {
                       e.printStackTrace();
                   }

                 //提示
                  System.err.println(Thread.currentThread().getName()+"線程消費了:"+ list.remove(0));
                  list.notifyAll();
                }
            }
    }
}

 

二、ReetrantLock+Condition

生產者

package com.ProductCusromer.method3;

import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author Millet
 * @date 2020/3/30 17:59
 */
public class Product3 implements Runnable {
    private List list;
    private Lock mLock;
    private Condition mCondition;
    public Product3(List list, Lock lock, Condition condition) {
        this.list = list;
        this.mLock = lock;
        this.mCondition = condition;
    }
    //一個生產者對應一個消費者
    @Override
    public void run() {
        Random random = new Random(50);//相當於商品
        while (true){
            mLock.lock();
            if(list.size()>0){
                //集合中有數據就停止寫入,等待消費者消費完
                try {
                    mCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //執行到這說明集合沒有數據了
            list.add((random.nextInt(50)));
            //提示
            System.out.println(Thread.currentThread().getName()+"線程生產了:"+ list.get(0));
            mCondition.signalAll();
            mLock.unlock();
        }
    }
}

 

 消費者

package com.ProductCusromer.method2;

import java.util.List;

/**
 * @author Millet
 * @date 2020/3/30 18:07
 */
public class Consumer2 implements Runnable {
    private List list;

    public Consumer2(List list) {
        this.list = list;
    }

    @Override
    public void run() {
        while(true){
            synchronized (list){
                while(list.size()<=0){//適合多生產者多消費者
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                 try {
                     Thread.sleep(500);
                  } catch (InterruptedException e) {
                       e.printStackTrace();
                   }

                 //提示
                  System.err.println(Thread.currentThread().getName()+"線程消費了:"+ list.remove(0));
                  list.notifyAll();
                }
            }
    }
}

 

三、BlockingQueue實現

生產者

 

package com.ProductCusromer.method3;

import java.util.concurrent.BlockingQueue;

/**
 * @author Millet
 * @date 2020/3/30 21:28
 */
public class Product4 implements Runnable {
    private BlockingQueue queue;
    private String name;
    public Product4(String name, BlockingQueue queue) {
        this.queue = queue;
        this.name = name;
    }
    @Override
    public void run() {
        while(true){
            try {
                int val = (int) Math.random();
                queue.put(val);
                System.out.println(name + "生產:"+val+".當前隊列長度:"+ queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

消費者

package com.ProductCusromer.method3;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author Millet
 * @date 2020/3/30 21:28
 */
public class Consumer4 implements Runnable{
    private BlockingQueue queue;
    private String name;
    public Consumer4(String name, BlockingQueue queue) {
        this.queue = queue;
        this.name = name;
    }
    @Override
    public void run() {
        try {
            while (true){
                int val = (int) queue.take();
                System.out.println(name + "消費:"+val+".當前隊列長度:"+ queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

測試

package com.ProductCusromer.method3;

import com.ProductCusromer.method1.Consumer1;
import com.ProductCusromer.method1.Product1;

import javax.persistence.criteria.CriteriaBuilder;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Millet
 * @date 2020/3/30 20:33
 */
public class Main {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(20);
        Product4 producer1 = new Product4("生產1號", queue);
        Product4 producer2 = new Product4("生產2號", queue);
        Product4 producer3 = new Product4("生產3號", queue);

        Consumer4 consumer1 = new Consumer4("消費1號", queue);
        Consumer4 consumer2 = new Consumer4("消費2號", queue);

// 開始producer線程進行生產
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(producer3).start();

// 開始consumer線程進行消費。
        new Thread(consumer1).start();
        new Thread(consumer2).start();
    }
}

 

結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM