高並發編程基礎(java.util.concurrent包常見類基礎)


  JDK5中添加了新的java.util.concurrent包,相對同步容器而言,並發容器通過一些機制改進了並發性能。因為同步容器將所有對容器狀態的訪問都串行化了,這樣保證了線程的安全性,所以這種方法的代價就是嚴重降低了並發性,當多個線程競爭容器時,吞吐量嚴重降低。因此JDK5開始針對多線程並發訪問設計,提供了並發性能較好的並發容器,引入了java.util.concurrent包。與Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的並發容器主要解決了兩個問題:

1)根據具體場景進行設計,盡量避免synchronized,提供並發性。
2)定義了一些並發安全的復合操作,並且保證並發環境下的迭代操作不會出錯。

這里先粗略的介紹下 concurrrent 包下面常見並發容器的簡單應用,后續再針對性的去深入研究。

並發容器:

這些容器的關鍵方法大部分都實現了線程安全的功能,卻不使用同步關鍵字(synchronized)。
 
常見阻塞隊列:
   BlockingQueue. class,阻塞隊列接口
   DelayQueue. class,阻塞隊列,無界隊列,並且元素是Delay的子類,保證元素在達到一定時間后才可以取得到
public class T07_DelayQueue {
	// 執行定時任務
	static BlockingQueue<MyTask> tasks = new DelayQueue<>();
	static Random r = new Random();

	static class MyTask implements Delayed {

		long runningTime;

		MyTask(Long rt) {

			this.runningTime = rt;
		}
		@Override
		public int compareTo(Delayed o) {
			if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
				return -1;
			if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
				return 1;
			else
				return 0;
		}
		@Override
		public long getDelay(TimeUnit unit) {
			return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
		}
		
		@Override
		public String toString() {
			return "" + runningTime ;
		}
	}
	public static void main(String[] args) throws InterruptedException {
		long now = System.currentTimeMillis();
		
		MyTask t1 =new MyTask(now +1000);
		MyTask t2 =new MyTask(now +2000);
		MyTask t3 =new MyTask(now +1500);
		MyTask t4 =new MyTask(now +2500);
		MyTask t5 =new MyTask(now +500);
		
		tasks.put(t1);
		tasks.put(t2);
		tasks.put(t3);
		tasks.put(t4);
		tasks.put(t5);
		
		System.out.println(tasks);
		
		for(int i=0;i<5;i++) {
			System.out.println(tasks.take());
		}
		
	}
}
  
  BlockingDeque. class,雙端阻塞隊列接口
   ArrayBlockingQueue. class,阻塞隊列,數組實現。有界阻塞隊列
public class T06_ArrayBlockingQueue {
	// 阻塞隊列
	static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
	static Random r =new Random();
	public static void main(String[] args) throws InterruptedException { 
		for(int i=0;i<10;i++) {
			strs.put("a"+i);
		}
//		strs.add("aaa");//Exception in thread "main" java.lang.IllegalStateException: Queue full
//		strs.put("aaa"); // 滿了會等待 阻塞
//		strs.offer("aaa");//不會報異常 有返回值  true or false
//		strs.offer("aaa",1,TimeUnit.SECONDS); // 按照時間段阻塞
	}
}

   LinkedBlockingDeque. class,阻塞雙端隊列,鏈表實現
   LinkedBlockingQueue. class,阻塞隊列,鏈表實現,無界,以下是該類對與生產者消費者模型的簡單實現:
public class T05_LinkedBlockingQueue {
	// 阻塞隊列
	static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
	
	static Random r =new Random();
	//生產者消費者
	public static void main(String[] args) {
		new Thread(()->{
			for(int i=0;i<100;i++) {
				try {
					strs.put("a"+i);//如果滿了就會等待阻塞
					System.err.println("a"+i);
					TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"p1").start();
		
		for(int i=0;i<5;i++) {
			new Thread(()->{
				while(true) {
					try {
						//take 如果空了 就會阻塞
						System.out.println(Thread.currentThread().getName()+" take - "+ strs.take());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			},"c" +i).start();
		}
	}
}
  
   SynchronousQueue.class,同步隊列,但是隊列長度為0,生產者放入隊列的操作會被阻塞,直到消費者過來取,所以這個隊列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留
public class T09_SynchronousQueue {
	// 特殊的 transferQueue ,隊列為空的
	//任何添加東西都要直接丟給消費者的,不會往隊列里加的
	public static void main(String[] args) throws InterruptedException {
		
		BlockingQueue<String> strs = new SynchronousQueue<>();
		
		new Thread(()->{
			try {
				System.out.println(strs.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
		strs.put("aaa");//阻塞,等待消費者消費
//		strs.add("aaa"); // 報錯
		System.out.println(strs.size());
	}		
}
 
常見非阻塞隊列:
   ConcurrentLinkedDeque.class,非阻塞雙端隊列,鏈表實現,無界隊列
   ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
public class T04_ConcurrentQueue {

	public static void main(String[] args) {
		Queue<String> strs = new ConcurrentLinkedQueue<>();
		Queue<String> strs2 = new ConcurrentLinkedDeque<>();//雙端隊列
		
		for(int i=0;i<10;i++) {
			// 根據返回值來判斷是否添加成功
			strs.offer("a"+i);
		}
		
		System.out.println(strs);//[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
		System.out.println(strs.size());// 10
		System.out.println(strs.poll());// a0
		System.out.println(strs.size());// 9
		System.out.println(strs.peek());// a1
		System.out.println(strs.size());// 9
	}
}

 

轉移隊列:
   TransferQueue.class,轉移隊列接口,生產者要等消費者消費的隊列,生產者嘗試把元素直接轉移給消費者
   LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快,transfer 方法有客戶端准備消費,直接把消息直接傳遞給消費者,不放到隊列里,沒有消費者線程的話該線程會阻塞。但是可以調用 add put 王隊列里丟,隊列還是有容量的。
public class T08_TransferQueue {
	
	public static void main(String[] args) throws InterruptedException {
		
		LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
		
		new Thread(()->{
			try {
				System.out.println(strs.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
		strs.transfer("aaa");
//		new Thread(()->{
//			try {
//				System.out.println(strs.take());
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
//		}).start();
	}		
}

  

常見集合容器:
   ConcurrentMap.class,並發Map的接口,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個並發場景下特定的方法
   ConcurrentHashMap.class,並發HashMap,ConcurrentHashMap在並發中效率比 HashTable高,因為 HashTable 在往里添加東西的時候葯鎖定整個對象,而 ConcurrentHashMap 分成了16段,插入的時候只鎖定了其中的一段,其實就是把鎖細粒度化了,因此在多線程情況下回比 hashTable高 ,同樣也比 Collections.synchronizedMap(map1) 高。
   ConcurrentSkipListMap.class,跳表數據結構,它也是NavigableMap的實現類(要求元素之間可以比較),只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,在高並發中要求排序才去使用它。
   ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
  以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題
public class T01_ConcurrentMap {
	public static void main(String[] args) {
Map<String,String> map =new ConcurrentHashMap<>();//並發性比較高 // Map<String,String> map =new ConcurrentSkipListMap<>();//並發性高而且要求排序 // Map<String,String> map =new Hashtable<>();//並發性不是跟高 // Map<String,String> map1 =new HashMap<>(); // Map<String, String> map = Collections.synchronizedMap(map1);//並發性不是很高 Random r =new Random(); Thread[] ths =new Thread[100]; CountDownLatch latch =new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0;i<ths.length;i++) { ths[i]=new Thread(()->{ for(int j=0;j<10000;j++) { map.put("a"+r.nextInt(10000), "a"+r.nextInt(10000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(o->o.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end-start); } }
 
   CopyOnWriteArrayList.class,copy-on-write(寫時復制)模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,然后將原先的引用指向副本。適合讀遠遠大於寫的場景
   CopyOnWriteArraySet.class,和上面類似,list變成set而已 
  以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題 
  
public class T01_CopyOnWriteList {

	public static void main(String[] args) {
		List<String> lists = 
//				new Vector<String>();//並發效率相對高
//				new ArrayList<String>();//這個並發會有問題
				new CopyOnWriteArrayList<String>();//效率比較低,讀取非常快,讀的時候不加鎖
		Random r =new Random();
		Thread[] ths  =new Thread[100];
		
		for(int i=0;i<ths.length;i++) {
			ths[i]=new Thread(()->{
				for(int j=0;j<1000;j++) {
					lists.add("a"+r.nextInt(10000) );
				}
			});
		}
		long start = System.currentTimeMillis();
		
		Arrays.asList(ths).forEach(o->o.start());
		Arrays.asList(ths).forEach(o->{
			try {
				o.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});
		
		long end = System.currentTimeMillis();
		System.out.println(end-start);
		System.out.println(lists.size());
	}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM