java_阻塞隊列(FIFO先進先出)


ArrayBlockingQueue:由數組結構組成的有界阻塞隊列;

LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列(但大小默認值為:Integer.MAX_VALUE);

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列;

DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列;

SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列;

LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列;

LinkedBlockingDeque:由鏈表結果組成的雙向阻塞隊列;

 

阻塞隊列核心方法

add(e):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full

remove():為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,拋出異常 java.util.NoSuchElementException

element():檢查當前阻塞隊列的首個元素,成功時返回首個元素,當阻塞隊列為空時,拋出異常 java.util.NoSuchElementException

 

offer(e):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時返回 false

poll():為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,返回 null

peek():檢查當前阻塞隊列的首個元素,成功時返回首個元素,當阻塞隊列為空時,返回 null

 

put(e):為阻塞隊列添加一個元素e,無返回值,當阻塞隊列滿時會阻塞線程,直到操作成功為止

take():為阻塞隊列移除一個元素,成功時返回首個元素,當阻塞隊列為空時會阻塞線程,直到操作成功為止

 

offer(e,time,unit):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時會阻塞隊列一段時間(time:long類型的時間,unit為時間單位),失敗返回 false

poll(time,unit):為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,會阻塞隊列一段時間(time:long類型的時間,unit為時間單位),失敗返回  null

 

 

SynchronousQueue:不存儲元素的阻塞隊列

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue queue = new SynchronousQueue<>();

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName()+"\t put 1");
                queue.put(1);
                System.out.println(Thread.currentThread().getName()+"\t put 2");
                queue.put(2);
                System.out.println(Thread.currentThread().getName()+"\t put 3");
                queue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();

        new Thread(() ->{
            try {
                try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+" 第一次移除隊列元素");
                queue.take();
                try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+" 第二次移除隊列元素");
                queue.take();
                try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+" 第三次移除隊列元素");
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();
    }
}

 

 

生產者與消費者實例:(生產一個消費一個)

第一版(使用synchronized、wait 、 notify等方法進行實現)

第二版(使用ReentrantLock、await、signal / signalAll等方法進行進行實現)

第二版示例代碼:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData {//資源類
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try {
            //1、判斷
            while (number != 0) {
                //等待
                condition.await();
            }
            //2、干活
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //3、喚醒
            condition.signalAll();//喚醒所有等待線程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            //1、判斷
            while (number == 0) {
                //等待
                condition.await();
            }
            //2、干活
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            //3、喚醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ProdConsumer_TraditionDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(() ->{
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(() ->{
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}

第三版:(volatile/CAS/AtomicInteger/BlockingQueue/線程交互/原子引用)等進行實現

第三版示例代碼:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean flag = true;//默認開啟,進行生產+消費 volatile(可見性、不保證原子性、禁止指令重排)
    private AtomicInteger atomicInteger = new AtomicInteger();//原子整型類(保證原子性)、默認值:0
    BlockingQueue<String> queue = null;

    public MyResource(BlockingQueue<String> queue) {
        this.queue = queue;
        System.out.println("當前傳入的BlockingQueue:" + queue.getClass().getName());
    }

    public void myProd()throws Exception{//生產蛋糕
        String data = null;
        boolean retValue;//默認值為 false
        while (flag){
             //atomicInteger.incrementAndGet();// ++atomicInteger
             //atomicInteger.getAndIncrement();// atomicInteger++
            data = atomicInteger.incrementAndGet() + "";
            retValue = queue.offer(data,2L, TimeUnit.SECONDS);//添加 data,若失敗會阻塞2秒 返回false
            if (retValue){
                System.out.println(Thread.currentThread().getName()+"\t 插入隊列:"+data+" 成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"\t 插入隊列:"+data+" 失敗");
            }
            TimeUnit.SECONDS.sleep(1);//每一秒生產 1個
        }
        System.out.println(Thread.currentThread().getName()+"\t flag = false 生產結束");
    }

    public void myConsumer()throws Exception{
        String data = null;
        while (flag){
            data = queue.poll(2L,TimeUnit.SECONDS);//獲取 data,若失敗會阻塞2秒 返回 null
            if (data == null || data.equalsIgnoreCase("")){
                System.out.println(Thread.currentThread().getName()+"\t 2秒沒有取到蛋糕 消費退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t 消費隊列蛋糕:"+data+" 成功");
        }
    }

    public void stop()throws Exception{
        this.flag = false;
    }
}
/**
 * volatile/CAS/AtomicInteger/BlockingQueue/線程交互/原子引用
 */
public class ProConsumer_BlockingQueueDemo {
    public static void main(String[] args) throws Exception {
        MyResource resource = new MyResource(new ArrayBlockingQueue<>(10));
        new Thread(() ->{
            try {
                resource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"prod").start();

        new Thread(() ->{
            try {
                resource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();

        //睡眠5秒
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println();
        System.out.println("5秒鍾到,活動結束");
        resource.stop();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM