java多線程總結-同步容器與並發容器的對比與介紹


1 容器集簡單介紹

java.util包下面的容器集主要有兩種,一種是Collection接口下面的List和Set,一種是Map,
大致結構如下:

  • Collection

    • List
      • LinkedList
      • ArrayList
      • Vector
        • Stack
    • Set
      • HashSet
      • TreeSet
      • LinkedSet
  • Map

    • Hashtable
    • HashMap
    • WeakHashMap

2 同步容器

同步容器也叫線程安全容器,是通過syncrhoized關鍵字對線程不安全的操作進行加鎖來保證線程安全的
其中同步容器主要包括:
1.Vector、Stack、HashTable
2.Collections 工具類中提供的同步集合類
Collections類是一個工具類,相當於Arrays類對於Array的支持,Collections類中提供了大量對集合或者容器進行排序、查找的方法。它還提供了幾個靜態方法來創建同步容器類:

3 並發容器

java.util.concurrent提供了多種線程安全容器,大多數是使用系統底層技術實現的線程安全,也叫並發容器,類似native。Java8中使用CAS。

4 案例講解

這里主要介紹一些常見的同步容器和並發容器,通過案例輸出結果對比進行介紹
我大致分為了三類Map/Set,List,Queue來進行講解,但一個Map/Set,只介紹了Map,因為在java的設計中,Set就是Map,說白了就是只有Key沒有Value的Map,好了,現在開始進入正題

4.1 Map/Set

代碼中new了三個Map,HashTable,ConcurrentHashMap,ConcurrentSkipListMap比較每個map的運行效率,起100個線程向map中存放10000條隨機數,並通過門閂CountDownLatch控制運行狀態,輸出運行時間

/**
 * 並發容器 - ConcurrentMap
 */
package com.bernardlowe.concurrent.t06;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;

public class Test_01_ConcurrentMap {
	
	public static void main(String[] args) {
		final Map<String, String> map = new Hashtable<>();
		// final Map<String, String> map = new ConcurrentHashMap<>();
		// final Map<String, String> map = new ConcurrentSkipListMap<>();
		final Random r = new Random();
		Thread[] array = new Thread[100];
		final CountDownLatch latch = new CountDownLatch(array.length);
		
		long begin = System.currentTimeMillis();
		for(int i = 0; i < array.length; i++){
			array[i] = new Thread(new Runnable() {
				@Override
				public void run() {
					for(int j = 0; j < 10000; j++){
						map.put("key"+r.nextInt(100000000), "value"+r.nextInt(100000));
					}
					latch.countDown();
				}
			});
		}
		for(Thread t : array){
			t.start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("執行時間為 : " + (end-begin) + "毫秒!");
	}

}

Hashtable結果:

ConcurrentHashMap結果:

ConcurrentSkipListMap結果:

ConcurrentHashMap的底層是哈希實現的同步Map(Set)
ConcurrentSkipListMap內部是SkipList(跳表)結構實現的非阻塞讀/寫/刪除 的 Map,它的value是有序存儲的, 而且其內部是由縱橫鏈表組成,在JDK1.8中,ConcurrentHashMap的性能和存儲空間要優於ConcurrentSkipListMap

為了讓測試數據結果對比更加直觀,我這里故意將生成的隨機數調的比較大。這里需要注意一下,在測試的時候,如果機器性能比較好,可能結果會出現誤差,因為System.currentTimeMillis(),這個方法調用了個native方法,獲取的時間精度會依賴於操作系統的實現機制,具體為什么,可以看看這篇文章http://blog.sina.com.cn/s/blog_6b8bd9d80101fe8t.html。但我按照文檔的辦法將System.currentTimeMillis()改為System.nanoTime(),發現並沒有解決這個問題,可能是因為並沒有達到納秒級別吧。

4.2 List

下面代碼與4.1的代碼類似,也是new了三個List,ArrayList,Vector,CopyOnWriteArrayList,起100個線程向map中存放1000條隨機數,並通過門閂CountDownLatch控制運行狀態,輸出運行時間和最后list的的長度。由於ArrayList是線程不安全,在多線程執行的時候,需要try{}catch{},否則會因為數組越界而報錯,因為ArrayList底層是一個長度動態擴展的數組

/**
 * 並發容器 - CopyOnWriteList
 */
package com.bernardlowe.concurrent.t06;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

public class Test_02_CopyOnWriteList {
	
	public static void main(String[] args) {
		 final List<String> list = new ArrayList<String>(); // 線程不安全
//		 final List<String> list = new Vector<>(); // 線程安全
//		final List<String> list = new CopyOnWriteArrayList<>(); // 線程安全
		final Random r = new Random();
		Thread[] array = new Thread[100];
		final CountDownLatch latch = new CountDownLatch(array.length);
		
		long begin = System.currentTimeMillis();
		for(int i = 0; i < array.length; i++){
			array[i] = new Thread(new Runnable() {
				@Override
				public void run() {
					for(int j = 0; j < 1000; j++){
						try {
							list.add("value" + r.nextInt(100000));
						} catch (Exception e) {

						}
					}
					latch.countDown();
				}
			});
		}
		for(Thread t : array){
			t.start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("執行時間為 : " + (end-begin) + "毫秒!");
		System.out.println("List.size() : " + list.size());
	}

}

ArrayList結果:因為ArrayList是線程不安全的,所以在多線程環境中,可能會丟失數據

Vector結果:

CopyOnWriteArrayList結果:

CopyOnWriteArrayList是讀寫分離的,寫時復制出一個新的數組,完成插入、修改或者移除操作后將新數組賦值給array,讀取時直接讀取最新的數組,所以在寫操作時,效率非常低(雖然寫比較慢,但它在刪除數組頭和尾還是很快的)
從上面三個結果可以看出,CopyOnWriteArrayList雖然保證了線程安全,但它的寫操作效率太低了,但相比Vector,並發安全且性能比Vector好,Vector是增刪改查方法都加了synchronized,保證同步,但是每個方法執行的時候都要去獲得鎖,性能就會大大下降,而CopyOnWriteArrayList 只是在增刪改上加鎖,但是讀不加鎖,在讀方面的性能就好於Vector,CopyOnWriteArrayList支持讀多寫少的並發情況,所以CopyOnWriteArrayList是不會存在臟讀問題的

4.3 Queue

這一節主要介紹一些並發隊列的常用api

4.3.1 ConcurrentLinkedQueue

基礎鏈表同步隊列

peek() -> 查看queue中的首數據
poll() -> 獲取queue中的首數據

/**
 * 並發容器 - ConcurrentLinkedQueue
 *  隊列 - 鏈表實現的。 
 */
package com.bernardlowe.concurrent.t06;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Test_03_ConcurrentLinkedQueue {
	
	public static void main(String[] args) {
		Queue<String> queue = new ConcurrentLinkedQueue<>();

		//向隊列中增加10個數據
		for(int i = 0; i < 10; i++){
			queue.offer("value" + i);
		}
		
		System.out.println(queue);
		System.out.println(queue.size());

		// peek() -> 查看queue中的首數據,
		System.out.println("首數據 " + queue.peek());
		System.out.println("隊列長度 "+ queue.size());
		System.out.println("===================");
		// poll() -> 獲取queue中的首數據
		System.out.println("首數據 " + queue.peek());
		System.out.println("隊列長度 "+ queue.size());
	}

}


結果:

4.3.2 阻塞隊列LinkedBlockingQueue

阻塞隊列,隊列容量不足自動阻塞,隊列容量為0自動阻塞。

put & take - 自動阻塞
put自動阻塞, 隊列容量滿后,自動阻塞
take自動阻塞方法, 隊列容量為0后,自動阻塞

/**
 * 並發容器 - LinkedBlockingQueue
 *  阻塞容器。
 */
package com.bernardlowe.concurrent.t06;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test_04_LinkedBlockingQueue {
	
	final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
	final Random r = new Random();
	
	public static void main(String[] args) {
		final Test_04_LinkedBlockingQueue t = new Test_04_LinkedBlockingQueue();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				while(true){
					try {
						t.queue.put("value"+t.r.nextInt(1000));
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}, "producer").start();
		
		for(int i = 0; i < 3; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					while(true){
						try {
							System.out.println(Thread.currentThread().getName() + 
									" - " + t.queue.take());
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}, "consumer"+i).start();
		}
	}

}

結果:

結果就是一個簡單的生產者消費者

4.3.3 BlockingQueue

底層數組實現的有界隊列,當容量不足的時候,有阻塞能力,根據調用API(add/put/offer)不同,有不同特性
這里主要介紹三個api方法add,put,offer

  • add方法在容量不足的時候,拋出異常。
  • put方法在容量不足的時候,阻塞等待。
  • offer方法

    單參數offer方法,不阻塞。容量不足的時候,返回false。當前新增數據操作放棄。
    三參數offer方法(offer(value,times,timeunit)),容量不足的時候,阻塞times時長(單位為timeunit),如果在阻塞時長內,有容量空閑,新增數據返回true。如果阻塞時長范圍內,無容量空閑,放棄新增數據,返回false。

/**
 * 並發容器 - ArrayBlockingQueue
 *  有界容器。
 */
package com.bernardlowe.concurrent.t06;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test_05_ArrayBlockingQueue {
	
	final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
	
	public static void main(String[] args) {
		final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue();
		
		for(int i = 0; i < 5; i++){

			// 1.add method
			System.out.println("add method : " + t.queue.add("value"+i));

			// 2.put method
//			try {
//				t.queue.put("put"+i);
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
//			System.out.println("put method : " + i);

			// 3.offer method
//			System.out.println("offer method : " + t.queue.offer("value"+i));
//			try {
//				System.out.println("offer method : " +
//							t.queue.offer("value"+i, 1, TimeUnit.SECONDS));
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
		}
		
		System.out.println(t.queue);
	}

}

add方法結果:容量不足的時候,拋出異常

put方法結果:容量不足的時候,阻塞等待

單/多參數offer方法結果:

單參數offer:容量不足,直接返回結果,不阻塞
多參數offer:容量不足,阻塞

4.3.4 延時隊列DelayQueue

延時隊列。根據比較機制,實現自定義處理順序的隊列。常用於定時任務。
如:定時關機。
具體示例代碼如下

/**
 * 並發容器 - DelayQueue
 */
package com.bernardlowe.concurrent.t06;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Test_06_DelayQueue {
	
	static BlockingQueue<MyTask_06> queue = new DelayQueue<>();
	
	public static void main(String[] args) throws InterruptedException {
		long value = System.currentTimeMillis();
		MyTask_06 task1 = new MyTask_06(value + 2000);
		MyTask_06 task2 = new MyTask_06(value + 1000);
		MyTask_06 task3 = new MyTask_06(value + 3000);
		MyTask_06 task4 = new MyTask_06(value + 2500);
		MyTask_06 task5 = new MyTask_06(value + 1500);
		
		queue.put(task1);
		queue.put(task2);
		queue.put(task3);
		queue.put(task4);
		queue.put(task5);
		
		System.out.println(queue);
		System.out.println(value);
		for(int i = 0; i < 5; i++){
			System.out.println(queue.take());
		}
	}

}

class MyTask_06 implements Delayed {
	
	private long compareValue;
	
	public MyTask_06(long compareValue){
		this.compareValue = compareValue;
	}

	/**
	 * 比較大小。自動實現升序
	 * 建議和getDelay方法配合完成。
	 * 如果在DelayQueue是需要按時間完成的計划任務,必須配合getDelay方法完成。
	 */
	@Override
	public int compareTo(Delayed o) {
		return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
	}

	/**
	 * 獲取計划時長的方法。
	 * 根據參數TimeUnit來決定,如何返回結果值。
	 */
	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
	}
	
	@Override
	public String toString(){
		return "Task compare value is : " + this.compareValue;
	}
	
}

結果:

4.3.5 轉移隊列LinkedTransferQueue

這里主要是兩個方法的區別,add和transfer

  • add - 隊列會保存數據,不做阻塞等待。
  • transfer - 是TransferQueue的特有方法。必須有消費者(take()方法的調用者)。
/**
 * 並發容器 - LinkedTransferQueue
 *  轉移隊列
 */
package com.bernardlowe.concurrent.t06;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class Test_07_TransferQueue {
	
	TransferQueue<String> queue = new LinkedTransferQueue<>();
	
	public static void main(String[] args) {
		final Test_07_TransferQueue t = new Test_07_TransferQueue();
		
		/*new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println(Thread.currentThread().getName() + " thread begin " );
					System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}, "output thread").start();

		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		try {
			t.queue.transfer("test string");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}*/
		
		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					t.queue.transfer("test string");
					// t.queue.add("test string");
					System.out.println("add ok");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println(Thread.currentThread().getName() + " thread begin " );
					System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}, "output thread").start();
		
	}

}

這里的transfer()和take()都是阻塞方法,take先請求接收數據或者transfer先發送數據,都會進行阻塞等待。
舉個例子,transfer()就相當與手機打電話,當A給B打電話,B必須接收到電話信號接聽才能進行通話,否則A會一直等待
add()就相當於A給B發短信,短信已經存到了運營商那邊,等待B接收,不管發短信時B是否在線

4.3.6 SynchronousQueue

該隊列一個容量為0的隊列,是一個特殊的TransferQueue,它和TransferQueue很像,但這個隊列必須要有消費線程才行
又兩個方法add,put
add方法,無阻塞。若沒有消費線程阻塞等待數據,則拋出異常。
put方法,有阻塞。若沒有消費線程阻塞等待數據,則阻塞。

/**
 * 並發容器 - SynchronousQueue
 */
package com.bernardlowe.concurrent.t06;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test_08_SynchronusQueue {
	
	BlockingQueue<String> queue = new SynchronousQueue<>();
	
	public static void main(String[] args) {
		final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println(Thread.currentThread().getName() + " thread begin " );
					try {
						TimeUnit.SECONDS.sleep(2);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}, "output thread").start();
		
		/*try {
			TimeUnit.SECONDS.sleep(3);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}*/
		// t.queue.add("test add");
		try {
			t.queue.put("test put");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());
	}

}

t.queue.add("test add");的注釋打開,t.queue.put("test put");加上注釋
add方法異常結果: 因為它是一個容量為0的隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM