Java並發包——線程安全的Collection相關類
摘要:本文主要學習了Java並發包下線程安全的Collection相關的類。
部分內容來自以下博客:
https://www.cnblogs.com/skywang12345/p/3498483.html
https://www.cnblogs.com/skywang12345/p/3498652.html
https://www.cnblogs.com/skywang12345/p/3503458.html
https://www.cnblogs.com/skywang12345/p/3498995.html
分類
參照之前在學習集合時候的分類,可以將JUC下有關Collection相關的類進行分類。
CopyOnWriteArrayList:實現了List接口,相當於線程安全的ArrayList。
CopyOnWriteArraySet:繼承於AbstractSet類,相當於線程安全的HashSet。CopyOnWriteArraySet內部包含一個CopyOnWriteArrayList對象,它是通過CopyOnWriteArrayList實現的。
ConcurrentSkipListSet:繼承於AbstractSet類,相當於線程安全的TreeSet。ConcurrentSkipListSet是通過ConcurrentSkipListMap實現的。
ArrayBlockingQueue:繼承於AbstractQueue類,是數組實現的線程安全的有界的阻塞隊列。
LinkedBlockingQueue:繼承於AbstractQueue類,是單向鏈表實現的(指定大小)阻塞隊列,該隊列按FIFO(先進先出)排序元素。
LinkedBlockingDeque:繼承於AbstractQueue類,是雙向鏈表實現的(指定大小)雙向並發阻塞隊列,該阻塞隊列同時支持FIFO和FILO兩種操作方式。
ConcurrentLinkedQueue:繼承於AbstractQueue類,是單向鏈表實現的無界隊列,該隊列按FIFO(先進先出)排序元素。
ConcurrentLinkedDeque:繼承於AbstractQueue類,是雙向鏈表實現的無界隊列,該隊列同時支持FIFO和FILO兩種操作方式。
CopyOnWriteArrayList
說明
CopyOnWriteArrayList的內部有個“volatile數組”來保持數據。在“添加/修改/刪除”數據時,都會新建一個數組,並將更新后的數據拷貝到新建的數組中,最后再將該數組賦值給“volatile數組”,這就是它叫做CopyOnWriteArrayList的原因。CopyOnWriteArrayList就是通過這種方式實現的動態數組,不過正由於它在“添加/修改/刪除”數據時,都會新建數組,所以涉及到修改數據的操作,CopyOnWriteArrayList效率很低,但是單單只是進行遍歷查找的話,效率比較高。
CopyOnWriteArrayList是通過“volatile數組”來保存數據的。一個線程讀取volatile數組時,總能看到其它線程對該volatile變量最后的寫入,就這樣,通過volatile提供了“讀取到的數據總是最新的”這個機制的
保證。
CopyOnWriteArrayList通過互斥鎖來保護數據。在“添加/修改/刪除”數據時,會先“獲取互斥鎖”,再修改完畢之后,先將數據更新到“volatile數組”中,然后再“釋放互斥鎖”,這樣,就達到了保護數據的目的。
使用迭代器進行遍歷的速度很快,並且不會與其他線程發生沖突。在構造迭代器時,迭代器依賴於不變的數組快照。迭代器支持hasNext()、next()等不可變操作,但不支持add()、remove()等可變操作。
構造方法:
1 public CopyOnWriteArrayList() { 2 setArray(new Object[0]); 3 } 4 5 public CopyOnWriteArrayList(E[] toCopyIn) { 6 setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); 7 } 8 9 public CopyOnWriteArrayList(Collection<? extends E> c) { 10 Object[] elements; 11 if (c.getClass() == CopyOnWriteArrayList.class) 12 elements = ((CopyOnWriteArrayList<?>)c).getArray(); 13 else { 14 elements = c.toArray(); 15 // c.toArray might (incorrectly) not return Object[] (see 6260652) 16 if (elements.getClass() != Object[].class) 17 elements = Arrays.copyOf(elements, elements.length, Object[].class); 18 } 19 setArray(elements); 20 }
獲取和設置array的方法
array是被volatile和transient修飾的一個數組。
關於volatile關鍵字,我們知道“volatile能讓變量變得可見”,即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入。正在由於這種特性,每次更新了“volatile數組”之后,其它線程都能看到對它所做的更新。
關於transient關鍵字,它是在序列化中才起作用,transient變量不會被自動序列化。
1 private transient volatile Object[] array; 2 3 final Object[] getArray() { 4 return array; 5 } 6 7 final void setArray(Object[] a) { 8 array = a; 9 }
添加元素
因為array數組是volatile修飾的,不能保證線程安全,所以在添加元素時使用鎖來保證線程安全。
又因為array數組是volatile修飾的,所以在調用了setArray()方法后,能保證其它線程都能看到新添加的元素。
1 public void add(int index, E element) { 2 // 使用鎖來保證線程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 獲得array指向的引用地址。 7 Object[] elements = getArray(); 8 int len = elements.length; 9 // 如果指定位置越界,則拋出異常。 10 if (index > len || index < 0) 11 throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len); 12 Object[] newElements; 13 // 如果插入位置是末尾。 14 int numMoved = len - index; 15 if (numMoved == 0) 16 // 將原數組進行拷貝並擴大一個容量。 17 newElements = Arrays.copyOf(elements, len + 1); 18 else { 19 // 如果不是插入到末尾,則創建擴大一個容量的數組。 20 newElements = new Object[len + 1]; 21 // 分段復制原數組,並空出指定位置。 22 System.arraycopy(elements, 0, newElements, 0, index); 23 System.arraycopy(elements, index, newElements, index + 1, numMoved); 24 } 25 // 設置指定位置的指定元素。 26 newElements[index] = element; 27 // 將array引用的地址指向新的數組。 28 setArray(newElements); 29 } finally { 30 lock.unlock(); 31 } 32 }
刪除元素
刪除元素就是將array數組中指定位置的元素刪除。
它的實現方式是,如果被刪除的是最后一個元素,則直接通過Arrays.copyOf()進行處理,而不需要新建數組。否則,新建數組,然后將array數組中被刪除元素之外的其它元素拷貝到新數組中。最后,將新數組賦值給array數組。
1 public E remove(int index) { 2 // 使用鎖來保證線程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 獲得array指向的引用地址。 7 Object[] elements = getArray(); 8 int len = elements.length; 9 // 根據指定的位置獲取元素。 10 E oldValue = get(elements, index); 11 // 如果指定的元素是最后一個元素。 12 int numMoved = len - index - 1; 13 if (numMoved == 0) 14 // 將原數組進行拷貝截取並將array的引用地址指向新的數組。 15 setArray(Arrays.copyOf(elements, len - 1)); 16 else { 17 // 如果不是最后一個元素,則創建減少一個容量的數組。 18 Object[] newElements = new Object[len - 1]; 19 // 分段復制原數組,並空出指定位置。 20 System.arraycopy(elements, 0, newElements, 0, index); 21 System.arraycopy(elements, index + 1, newElements, index, numMoved); 22 // 將array的引用地址指向新的數組。 23 setArray(newElements); 24 } 25 // 返回該位置上的元素。 26 return oldValue; 27 } finally { 28 lock.unlock(); 29 } 30 }
獲取元素
獲取元素很簡單,就是返回array數組的指定位置的元素。
1 public E get(int index) { 2 return get(getArray(), index); 3 } 4 5 private E get(Object[] a, int index) { 6 return (E) a[index]; 7 }
設置元素
在設置元素之前判斷指定位置的舊元素是否和新元素相等,如果相等則不進行替換,但仍然要調用setArray()方法。
1 public E set(int index, E element) { 2 // 使用鎖來保證線程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 獲得array指向的引用地址。 7 Object[] elements = getArray(); 8 // 獲取指定位置的舊元素。 9 E oldValue = get(elements, index); 10 // 如果舊元素的引用和新元素的引用不同。 11 if (oldValue != element) { 12 // 創建新的數組並拷貝array數組的值,替換新數組指定位置的元素。 13 int len = elements.length; 14 Object[] newElements = Arrays.copyOf(elements, len); 15 newElements[index] = element; 16 // 將array的引用地址指向新的數組 17 setArray(newElements); 18 } else { 19 // 為了確保voliatile的語義,任何一個讀操作都應該是寫操作的結構,所以盡管寫操作沒有改變數據,還是調用set方法,當然這僅僅是語義的說明,去掉也是可以的。 20 setArray(elements); 21 } 22 return oldValue; 23 } finally { 24 lock.unlock(); 25 } 26 }
遍歷
CopyOnWriteArrayList類的迭代方法返回的是一個COWIterator類的對象。
1 public Iterator<E> iterator() { 2 return new COWIterator<E>(getArray(), 0); 3 }
CopyOnWriteArrayList在類里維護了一個用於遍歷的COWIterator類,COWIterator類實現了ListIterator接口。
1 static final class COWIterator<E> implements ListIterator<E> { 2 // 數組的快照。 3 private final Object[] snapshot; 4 // 指定下標。 5 private int cursor; 6 7 // 構造方法。 8 private COWIterator(Object[] elements, int initialCursor) { 9 cursor = initialCursor; 10 snapshot = elements; 11 } 12 13 // 判斷是否存在下一個元素。 14 public boolean hasNext() { 15 return cursor < snapshot.length; 16 } 17 18 // 判斷是否存在上一個元素。 19 public boolean hasPrevious() { 20 return cursor > 0; 21 } 22 23 // 獲取下一個元素。 24 @SuppressWarnings("unchecked") 25 public E next() { 26 if (! hasNext()) 27 throw new NoSuchElementException(); 28 return (E) snapshot[cursor++]; 29 } 30 31 // 獲取上一個元素。 32 @SuppressWarnings("unchecked") 33 public E previous() { 34 if (! hasPrevious()) 35 throw new NoSuchElementException(); 36 return (E) snapshot[--cursor]; 37 } 38 39 // 獲取下一個元素的位置。 40 public int nextIndex() { 41 return cursor; 42 } 43 44 // 獲取上一個元素的位置。 45 public int previousIndex() { 46 return cursor-1; 47 } 48 49 // 不支持刪除元素。 50 public void remove() { 51 throw new UnsupportedOperationException(); 52 } 53 54 // 不支持修改元素。 55 public void set(E e) { 56 throw new UnsupportedOperationException(); 57 } 58 59 // 不支持添加元素。 60 public void add(E e) { 61 throw new UnsupportedOperationException(); 62 } 63 64 // JDK1.8新增的方法,使用迭代器Iterator的所有元素,並且第二次調用它將不會做任何事情。 65 @Override 66 public void forEachRemaining(Consumer<? super E> action) { 67 Objects.requireNonNull(action); 68 Object[] elements = snapshot; 69 final int size = elements.length; 70 for (int i = cursor; i < size; i++) { 71 @SuppressWarnings("unchecked") E e = (E) elements[i]; 72 action.accept(e); 73 } 74 cursor = size; 75 } 76 }
ArrayBlockingQueue
說明
ArrayBlockingQueue內部是通過Object[]數組保存數據的,也就是說ArrayBlockingQueue本質上是通過數組實現的。ArrayBlockingQueue的大小,即數組的容量是創建ArrayBlockingQueue時指定的。
ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關於具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定,ArrayBlockingQueue默認會使用非公平鎖。
ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象。而且,Condition又依賴於ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問。
構造方法
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false); 3 } 4 5 public ArrayBlockingQueue(int capacity, boolean fair) { 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.items = new Object[capacity]; 9 lock = new ReentrantLock(fair); 10 notEmpty = lock.newCondition(); 11 notFull = lock.newCondition(); 12 } 13 14 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 15 this(capacity, fair); 16 // 加鎖是為了保證可見性,因為可能存在其他線程在初始化之后修改集合。 17 final ReentrantLock lock = this.lock; 18 lock.lock(); 19 try { 20 int i = 0; 21 try { 22 for (E e : c) { 23 checkNotNull(e); 24 items[i++] = e; 25 } 26 } catch (ArrayIndexOutOfBoundsException ex) { 27 throw new IllegalArgumentException(); 28 } 29 count = i; 30 putIndex = (i == capacity) ? 0 : i; 31 } finally { 32 lock.unlock(); 33 } 34 }
添加元素
ArrayBlockingQueue提供了offer()方法和put()方法兩種方式添加元素。
offer()方法添加失敗會立即返回false,並且添加過程中不允許被其他線程中斷。
put()方法添加失敗會等待,並且在添加過程中可以被其他線程中斷,拋出InterruptedException異常。
1 // 不允許被其他線程中斷,添加失敗則立即返回false。 2 public boolean offer(E e) { 3 checkNotNull(e); 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try { 7 if (count == items.length) 8 return false; 9 else { 10 enqueue(e); 11 return true; 12 } 13 } finally { 14 lock.unlock(); 15 } 16 } 17 18 // 允許被其他線程中斷,拋出InterruptedException,並且添加失敗會等待。 19 public void put(E e) throws InterruptedException { 20 checkNotNull(e); 21 final ReentrantLock lock = this.lock; 22 lock.lockInterruptibly(); 23 try { 24 while (count == items.length) 25 notFull.await(); 26 enqueue(e); 27 } finally { 28 lock.unlock(); 29 } 30 } 31 32 // 實際上的添加方法,添加成功后會喚醒一個等待刪除元素的線程。 33 private void enqueue(E x) { 34 final Object[] items = this.items; 35 items[putIndex] = x; 36 if (++putIndex == items.length) 37 putIndex = 0; 38 count++; 39 notEmpty.signal(); 40 }
刪除元素
ArrayBlockingQueue提供了poll()方法和take()方法兩種方式刪除元素。
poll()方法刪除失敗會立即返回false,並且添加過程中不允許被其他線程中斷。
take()方法刪除失敗會等待,並且在刪除過程中可以被其他線程中斷,拋出InterruptedException異常。
1 // 不允許被其他線程中斷,刪除失敗則立即返回null。 2 public E poll() { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 return (count == 0) ? null : dequeue(); 7 } finally { 8 lock.unlock(); 9 } 10 } 11 12 // 允許被其他線程中斷,拋出InterruptedException,並且刪除失敗會等待。 13 public E take() throws InterruptedException { 14 final ReentrantLock lock = this.lock; 15 lock.lockInterruptibly(); 16 try { 17 while (count == 0) 18 notEmpty.await(); 19 return dequeue(); 20 } finally { 21 lock.unlock(); 22 } 23 } 24 25 // 實際上的刪除方法,刪除成功后會喚醒一個等待添加元素的線程。 26 private E dequeue() { 27 final Object[] items = this.items; 28 @SuppressWarnings("unchecked") 29 E x = (E) items[takeIndex]; 30 items[takeIndex] = null; 31 if (++takeIndex == items.length) 32 takeIndex = 0; 33 count--; 34 if (itrs != null) 35 itrs.elementDequeued(); 36 notFull.signal(); 37 return x; 38 }
LinkedBlockingQueue
說明
LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。該隊列按FIFO(先進先出)排序元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。
LinkedBlockingQueue是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。
LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。
LinkedBlockingQueue在實現多線程對競爭資源的互斥訪問時,對於插入和取出操作分別使用了不同的鎖。此外,插入鎖putLock和非滿條件notFull相關聯,取出鎖takeLock和非空條件notEmpty相關聯。通過notFull和notEmpty更細膩的控制鎖。
屬性
1 head是鏈表的表頭。取出數據時,都是從表頭head處插入。 2 last是鏈表的表尾。新增數據時,都是從表尾last處插入。 3 count是鏈表的實際大小,即當前鏈表中包含的節點個數。 4 capacity是列表的容量,它是在創建鏈表時指定的。 5 putLock是插入鎖。 6 takeLock是取出鎖。 7 notEmpty是非空條件。 8 notFull是非滿條件。
構造方法
1 // 創建一個容量為Integer.MAX_VALUE的LinkedBlockingQueue。 2 LinkedBlockingQueue() 3 // 創建一個指定容量的LinkedBlockingQueue。 4 LinkedBlockingQueue(int capacity) 5 // 創建一個容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含給定collection的元素,元素按該collection迭代器的遍歷順序添加。 6 LinkedBlockingQueue(Collection<? extends E> c)
其他方法
1 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則等待。 2 void put(E e) 3 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則返回false。 4 boolean offer(E e) 5 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則等待指定的時間。 6 boolean offer(E e, long timeout, TimeUnit unit) 7 // 獲取並移除此隊列的頭部,如果隊列為空,則等待。 8 E take() 9 // 獲取並移除此隊列的頭部,如果隊列為空,則返回null。 10 E poll() 11 // 獲取並移除此隊列的頭部,如果隊列為空,則等待指定的時間。 12 E poll(long timeout, TimeUnit unit) 13 // 獲取但不移除此隊列的頭,如果此隊列為空,則返回null。 14 E peek() 15 // 返回在隊列中的元素上按適當順序進行迭代的迭代器。 16 Iterator<E> iterator()
ConcurrentLinkedQueue
說明
ConcurrentLinkedQueue是線程安全的隊列,它適用於“高並發”的場景。ConcurrentLinkedQueue使用CAS來保證更新的線程安全,是一個非阻塞隊列。
ConcurrentLinkedQueue是一個基於鏈表的無界線程安全隊列,按照FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節點除外)。
構造方法
1 // 創建一個最初為空的ConcurrentLinkedQueue。 2 ConcurrentLinkedQueue() 3 // 創建一個最初包含給定collection元素的ConcurrentLinkedQueue,按照此collection迭代器的遍歷順序來添加元素。 4 ConcurrentLinkedQueue(Collection<? extends E> c)
其他方法
1 // 將指定元素插入此隊列的尾部。 2 boolean offer(E e) 3 // 獲取並移除此隊列的頭,如果隊列為空,則返回null。 4 E poll() 5 // 獲取但不移除此隊列的頭,如果隊列為空,則返回null。 6 E peek() 7 // 返回在此隊列元素上以恰當順序進行迭代的迭代器。 8 Iterator<E> iterator() 9 // 返回此隊列中的元素數量。 10 int size()