java中並發Queue種類與各自API特點以及使用場景!


一 先說下隊列

隊列是一種數據結構.它有兩個基本操作:在隊列尾部加入一個元素,和從隊列頭部移除一個元素(注意不要弄混隊列的頭部和尾部)

就是說,隊列以一種先進先出的方式管理數據,如果你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將導致線程阻塞.

在多線程進行合作時,阻塞隊列是很有用的工具。工作者線程可以定期地把中間結果存到阻塞隊列中而其他工作者線程把中間結果取出並在將來修改它們。隊列會自動平衡負載。

如果第一個線程集運行得比第二個慢,則第二個 線程集在等待結果時就會阻塞。如果第一個線程集運行得快,那么它將等待第二個線程集趕上來.

說白了,就是先進先出,線程安全!

java中並發隊列都是在java.util.concurrent並發包下的,Queue接口與List、Set同一級別,都是繼承了Collection接口,最近學習了java中的並發Queue的所有子類應用場景,這里記錄分享一下:

1.1 這里可以先用wait與notify(腦忒fai) 模擬一下隊列的增刪數據,簡單了解一下隊列:

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 模擬隊列增刪數據
 * @author houzheng
 */
public class MyQueue {
	//元素集合
	private LinkedList<Object> list=new LinkedList<Object>();
	//計數器(同步),判斷集合元素數量
	private AtomicInteger count=new AtomicInteger();
	//集合上限與下限,final必須指定初值
	private final int minSize=0;
	private final int maxSize;
	//構造器指定最大值
	public MyQueue(int maxSize) {
		this.maxSize = maxSize;
	}
	
	//初始化對象,用於加鎖,也可直接用this
	private Object lock=new Object();
	//put方法:往集合中添加元素,如果集合元素已滿,則此線程阻塞,直到有空間再繼續
	public void put(Object obj){
		synchronized (lock) {
			while(count.get()==this.maxSize){
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();}
			}
			list.add(obj);
			//計數器加一
			count.incrementAndGet();
			System.out.println("放入元素:"+obj);
			//喚醒另一個線程,(處理極端情況:集合一開始就是空,此時take線程會一直等待)
			lock.notify();
		}
	}
	//take方法:從元素中取數據,如果集合為空,則線程阻塞,直到集合不為空再繼續
	public Object take(){
		Object result=null;
		synchronized(lock){
			while(count.get()==this.minSize){
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();}
			}
			//移除第一個
			result=list.removeFirst();
			//計數器減一
			count.decrementAndGet();
			System.out.println("拿走元素:"+result);
			//喚醒另一個線程,(處理極端情況:集合一開始就是滿的,此時put線程會一直等待)
			lock.notify();
		}
		return result;
	}
	public int getSize(){
		return this.count.get();
	}
	public static void main(String[] args) {
		//創建集合容器
		MyQueue queue=new MyQueue(5);
		queue.put("1");
		queue.put("2");
		queue.put("3");
		queue.put("4");
		queue.put("5");
		System.out.println("當前容器長度為:"+queue.getSize());
		Thread t1=new Thread(()->{
			queue.put("6");
			queue.put("7");
		},"t1");
		Thread t2=new Thread(()->{
			Object take1 = queue.take();
			Object take2 = queue.take();
		},"t2");
		//測試極端情況,兩秒鍾后再執行另一個線程
		t1.start();
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		t2.start();
	}
}

  這里用線程通信的方式簡單模擬了隊列的進出,那么接下來就正式進入java的並發隊列:

二 並發Queue

JDK中並發隊列提供了兩種實現,一種是高性能隊列ConcurrentLinkedQueue,一種是阻塞隊列BlockingQueue,兩種都繼承自Queue:

1 ConcurrentLinkedQueue

這是一個使用於高並發場景的隊列(額,各位看這塊博客的小朋友,最好對線程基礎比較熟悉再來看,當然我也在拼命學習啦,哈哈哈),主要是無鎖的方式,他的想能要比BlockingQueue好

,是基於鏈接節點的無界線程安全隊列,先進先出,不允許有null元素,廢話不多說,上demo:

這種queue比較簡單,沒什么好說的,和ArrayList一樣用就可以,關鍵是BlockingQUeue

2 BlockingQueue

blockingQueue主要有5中實現,我感覺都挺有意思的,其中幾種還比較常用就都學習了下,這里都介紹下:

2.1  ArrayBlockingQueue

@Test
public void test02() throws Exception{
	//必須指定隊列長度
	ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2);
	abq.add("a");
	//add :添加元素,如果BlockingQueue可以容納,則返回true,否則拋異常,支持添加集合
	System.out.println(abq.offer("b"));//容量如果不夠,返回false
	//offer: 如果可能的話,添加元素,即如果BlockingQueue可以容納,則返回true,否則返回false,支持設置超時時間
	//設置超時,如果超過時間就不添加,返回false,  
	abq.offer("d", 2, TimeUnit.SECONDS);// 添加的元素,時長,單位
	//put 添加元素,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.
	abq.put("d");//會一直等待
	//poll 取走頭部元素,若不能立即取出,則可以等time參數規定的時間,取不到時返回null,支持設置超時時間
	abq.poll();
	abq.poll(2,TimeUnit.SECONDS);//兩秒取不到返回null
	//take()  取走頭部元素,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
	abq.take();
	//取出頭部元素,但不刪除
	abq.element();
	//drainTo()
	//一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
	List list=new ArrayList();
	abq.drainTo(list,2);//將隊列中兩個元素取到list中,取走后隊列中就沒有取走的元素
	System.out.println(list); //[a,b]
	System.out.println(abq);  //[]
}

  

2.2 LinkedBlockingQueue

@Test
public void test03(){
	LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定
	lbq.add("a");
	lbq.add("b");
	lbq.add("c");
	//API與ArrayBlockingQueue相同
	//是否包含
	System.out.println(lbq.contains("a"));
	//移除頭部元素或者指定元素  remove("a")
	System.out.println(lbq.remove());
	//轉數組
	Object[] array = lbq.toArray();
	//element 取出頭部元素,但不刪除
	System.out.println(lbq.element());
	System.out.println(lbq.element());
	System.out.println(lbq.element());
}

  

2.3 SynchronousQueue

public static void main(String[] args) {
	SynchronousQueue<String> sq=new SynchronousQueue<String>();
	// iterator() 永遠返回空,因為里面沒東西。 
    // peek() 永遠返回null
	/**
	 * isEmpty()永遠是true。 
	 * remainingCapacity() 永遠是0。 
	 * remove()和removeAll() 永遠是false。
	 */
	new Thread(()->{
		try {
			//取出並且remove掉queue里的element(認為是在queue里的。。。),取不到東西他會一直等。 
			System.out.println(sq.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}).start();
	new Thread(()->{
		try {
			//offer() 往queue里放一個element后立即返回,
			//如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false
			//true ,上面take線程一直在等,
			////下面剛offer進去就被拿走了,返回true,如果offer線程先執行,則返回false
			System.out.println(sq.offer("b"));
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}).start();
	new Thread(()->{
		try {
			//往queue放進去一個element以后就一直wait直到有其他thread進來把這個element取走
			sq.put("a");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}).start();
}

  

2.4 PriorityBlockingQueue

@Test
public void test04() throws Exception{
	//隊列里元素必須實現Comparable接口,用來決定優先級
	PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>();
	pbq.add("b");
	pbq.add("g");
	pbq.add("a");
	pbq.add("c");
	//獲取的時候會根據優先級取元素,插入的時候不會排序,節省性能
	//System.out.println(pbq.take());//a,獲取時會排序,按優先級獲取
	System.out.println(pbq.toString());//如果前面沒有取值,直接syso也不會排序
	Iterator<String> iterator = pbq.iterator();
	while(iterator.hasNext()){
		System.out.println(iterator.next());
	}
}
@Test
public void test05(){
	PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>();
	Person p2=new Person("姚振",20);
	Person p1=new Person("侯征",24);
	Person p3=new Person("何毅",18);
	Person p4=new Person("李世彪",22);
	pbq.add(p1);
	pbq.add(p2);
	pbq.add(p3);
	pbq.add(p4);
	System.out.println(pbq);//沒有按優先級排序
	try {
		//只要take獲取元素就會按照優先級排序,獲取一次就全部排好序了,后面就會按優先級迭代
		pbq.take();
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	//按年齡排好了序
	for (Iterator iterator = pbq.iterator(); iterator.hasNext();) {
		Person person = (Person) iterator.next();
		System.out.println(person);
	}
}

  

2.5 最后說一下DelayQueue ,這里用個網上很經典的例子,網吧上網計時

網民實體queue中元素

//網民
public class Netizen implements Delayed {
	//身份證
private String ID;
//名字
private String name;
//上網截止時間
private long playTime;
//比較優先級,時間最短的優先
@Override
public int compareTo(Delayed o) {
	Netizen netizen=(Netizen) o;
	return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0;
}
public Netizen(String iD, String name, long playTime) {
	ID = iD;
	this.name = name;
	this.playTime = playTime;
}
//獲取上網時長,即延時時長
@Override
public long getDelay(TimeUnit unit) {
	//上網截止時間減去現在當前時間=時長
	return this.playTime-System.currentTimeMillis();
}

 網吧類:

//網吧
public class InternetBar implements Runnable {
	//網民隊列,使用延時隊列
private DelayQueue<Netizen> dq=new DelayQueue<Netizen>();
//上網
public void startPlay(String id,String name,Integer money){
	//截止時間= 錢數*時間+當前時間(1塊錢1秒)
	Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis());
	System.out.println(name+"開始上網計費......");
	dq.add(netizen);
}
//時間到下機
public void endTime(Netizen netizen){
	System.out.println(netizen.getName()+"余額用完,下機");
}
@Override
public void run() {
	//線程,監控每個網民上網時長
	while(true){
        try {
        	//除非時間到.否則會一直等待,直到取出這個元素為止
            Netizen netizen=dq.take();
            endTime(netizen);
        } 
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
public static void main(String[] args) {
	//新建一個網吧
	InternetBar internetBar=new InternetBar();
	//來了三個網民上網
	internetBar.startPlay("001","侯征",3);
	internetBar.startPlay("002","姚振",7);
	internetBar.startPlay("003","何毅",5);
		Thread t1=new Thread(internetBar);
		t1.start();
	}
}

  這樣就可以完美實現業務需求了,

結果,

這塊東西比較深,還需要不斷加強學習實踐才行!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM