Java並發包——線程安全的Collection相關類


Java並發包——線程安全的Collection相關類

摘要:本文主要學習了Java並發包下線程安全的Collection相關的類。

部分內容來自以下博客:

https://www.cnblogs.com/skywang12345/p/3498483.html

https://www.cnblogs.com/skywang12345/p/3498652.html

https://www.cnblogs.com/skywang12345/p/3503458.html

https://www.cnblogs.com/skywang12345/p/3498995.html

分類

參照之前在學習集合時候的分類,可以將JUC下有關Collection相關的類進行分類。

CopyOnWriteArrayList:實現了List接口,相當於線程安全的ArrayList。

CopyOnWriteArraySet:繼承於AbstractSet類,相當於線程安全的HashSet。CopyOnWriteArraySet內部包含一個CopyOnWriteArrayList對象,它是通過CopyOnWriteArrayList實現的。

ConcurrentSkipListSet:繼承於AbstractSet類,相當於線程安全的TreeSet。ConcurrentSkipListSet是通過ConcurrentSkipListMap實現的。

ArrayBlockingQueue:繼承於AbstractQueue類,是數組實現的線程安全的有界的阻塞隊列。

LinkedBlockingQueue:繼承於AbstractQueue類,是單向鏈表實現的(指定大小)阻塞隊列,該隊列按FIFO(先進先出)排序元素。

LinkedBlockingDeque:繼承於AbstractQueue類,是雙向鏈表實現的(指定大小)雙向並發阻塞隊列,該阻塞隊列同時支持FIFO和FILO兩種操作方式。

ConcurrentLinkedQueue:繼承於AbstractQueue類,是單向鏈表實現的無界隊列,該隊列按FIFO(先進先出)排序元素。

ConcurrentLinkedDeque:繼承於AbstractQueue類,是雙向鏈表實現的無界隊列,該隊列同時支持FIFO和FILO兩種操作方式。

CopyOnWriteArrayList

說明

CopyOnWriteArrayList的內部有個“volatile數組”來保持數據。在“添加/修改/刪除”數據時,都會新建一個數組,並將更新后的數據拷貝到新建的數組中,最后再將該數組賦值給“volatile數組”,這就是它叫做CopyOnWriteArrayList的原因。CopyOnWriteArrayList就是通過這種方式實現的動態數組,不過正由於它在“添加/修改/刪除”數據時,都會新建數組,所以涉及到修改數據的操作,CopyOnWriteArrayList效率很低,但是單單只是進行遍歷查找的話,效率比較高。

CopyOnWriteArrayList是通過“volatile數組”來保存數據的。一個線程讀取volatile數組時,總能看到其它線程對該volatile變量最后的寫入,就這樣,通過volatile提供了“讀取到的數據總是最新的”這個機制的
保證。

CopyOnWriteArrayList通過互斥鎖來保護數據。在“添加/修改/刪除”數據時,會先“獲取互斥鎖”,再修改完畢之后,先將數據更新到“volatile數組”中,然后再“釋放互斥鎖”,這樣,就達到了保護數據的目的。

使用迭代器進行遍歷的速度很快,並且不會與其他線程發生沖突。在構造迭代器時,迭代器依賴於不變的數組快照。迭代器支持hasNext()、next()等不可變操作,但不支持add()、remove()等可變操作。

構造方法:

 1 public CopyOnWriteArrayList() {
 2     setArray(new Object[0]);
 3 }
 4 
 5 public CopyOnWriteArrayList(E[] toCopyIn) {
 6     setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
 7 }
 8 
 9 public CopyOnWriteArrayList(Collection<? extends E> c) {
10     Object[] elements;
11     if (c.getClass() == CopyOnWriteArrayList.class)
12         elements = ((CopyOnWriteArrayList<?>)c).getArray();
13     else {
14         elements = c.toArray();
15         // c.toArray might (incorrectly) not return Object[] (see 6260652)
16         if (elements.getClass() != Object[].class)
17             elements = Arrays.copyOf(elements, elements.length, Object[].class);
18     }
19     setArray(elements);
20 }

獲取和設置array的方法

array是被volatile和transient修飾的一個數組。

關於volatile關鍵字,我們知道“volatile能讓變量變得可見”,即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入。正在由於這種特性,每次更新了“volatile數組”之后,其它線程都能看到對它所做的更新。

關於transient關鍵字,它是在序列化中才起作用,transient變量不會被自動序列化。

1 private transient volatile Object[] array;
2 
3 final Object[] getArray() {
4     return array;
5 }
6 
7 final void setArray(Object[] a) {
8     array = a;
9 }

添加元素

因為array數組是volatile修飾的,不能保證線程安全,所以在添加元素時使用鎖來保證線程安全。

又因為array數組是volatile修飾的,所以在調用了setArray()方法后,能保證其它線程都能看到新添加的元素。

 1 public void add(int index, E element) {
 2     // 使用鎖來保證線程安全。
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         // 獲得array指向的引用地址。
 7         Object[] elements = getArray();
 8         int len = elements.length;
 9         // 如果指定位置越界,則拋出異常。
10         if (index > len || index < 0)
11             throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
12         Object[] newElements;
13         // 如果插入位置是末尾。
14         int numMoved = len - index;
15         if (numMoved == 0)
16             // 將原數組進行拷貝並擴大一個容量。
17             newElements = Arrays.copyOf(elements, len + 1);
18         else {
19             // 如果不是插入到末尾,則創建擴大一個容量的數組。
20             newElements = new Object[len + 1];
21             // 分段復制原數組,並空出指定位置。
22             System.arraycopy(elements, 0, newElements, 0, index);
23             System.arraycopy(elements, index, newElements, index + 1, numMoved);
24         }
25         // 設置指定位置的指定元素。
26         newElements[index] = element;
27         // 將array引用的地址指向新的數組。
28         setArray(newElements);
29     } finally {
30         lock.unlock();
31     }
32 }

刪除元素

刪除元素就是將array數組中指定位置的元素刪除。

它的實現方式是,如果被刪除的是最后一個元素,則直接通過Arrays.copyOf()進行處理,而不需要新建數組。否則,新建數組,然后將array數組中被刪除元素之外的其它元素拷貝到新數組中。最后,將新數組賦值給array數組。

 1 public E remove(int index) {
 2     // 使用鎖來保證線程安全。
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         // 獲得array指向的引用地址。
 7         Object[] elements = getArray();
 8         int len = elements.length;
 9         // 根據指定的位置獲取元素。
10         E oldValue = get(elements, index);
11         // 如果指定的元素是最后一個元素。
12         int numMoved = len - index - 1;
13         if (numMoved == 0)
14             // 將原數組進行拷貝截取並將array的引用地址指向新的數組。
15             setArray(Arrays.copyOf(elements, len - 1));
16         else {
17             // 如果不是最后一個元素,則創建減少一個容量的數組。
18             Object[] newElements = new Object[len - 1];
19             // 分段復制原數組,並空出指定位置。
20             System.arraycopy(elements, 0, newElements, 0, index);
21             System.arraycopy(elements, index + 1, newElements, index, numMoved);
22             // 將array的引用地址指向新的數組。
23             setArray(newElements);
24         }
25         // 返回該位置上的元素。
26         return oldValue;
27     } finally {
28         lock.unlock();
29     }
30 }

獲取元素

獲取元素很簡單,就是返回array數組的指定位置的元素。

1 public E get(int index) {
2     return get(getArray(), index);
3 }
4 
5 private E get(Object[] a, int index) {
6     return (E) a[index];
7 }

設置元素

在設置元素之前判斷指定位置的舊元素是否和新元素相等,如果相等則不進行替換,但仍然要調用setArray()方法。

 1 public E set(int index, E element) {
 2     // 使用鎖來保證線程安全。
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         // 獲得array指向的引用地址。
 7         Object[] elements = getArray();
 8         // 獲取指定位置的舊元素。
 9         E oldValue = get(elements, index);
10         // 如果舊元素的引用和新元素的引用不同。
11         if (oldValue != element) {
12             // 創建新的數組並拷貝array數組的值,替換新數組指定位置的元素。
13             int len = elements.length;
14             Object[] newElements = Arrays.copyOf(elements, len);
15             newElements[index] = element;
16             // 將array的引用地址指向新的數組
17             setArray(newElements);
18         } else {
19             // 為了確保voliatile的語義,任何一個讀操作都應該是寫操作的結構,所以盡管寫操作沒有改變數據,還是調用set方法,當然這僅僅是語義的說明,去掉也是可以的。
20             setArray(elements);
21         }
22         return oldValue;
23     } finally {
24         lock.unlock();
25     }
26 }

遍歷

CopyOnWriteArrayList類的迭代方法返回的是一個COWIterator類的對象。

1 public Iterator<E> iterator() {
2     return new COWIterator<E>(getArray(), 0);
3 }

CopyOnWriteArrayList在類里維護了一個用於遍歷的COWIterator類,COWIterator類實現了ListIterator接口。

 1 static final class COWIterator<E> implements ListIterator<E> {
 2     // 數組的快照。
 3     private final Object[] snapshot;
 4     // 指定下標。
 5     private int cursor;
 6 
 7     // 構造方法。
 8     private COWIterator(Object[] elements, int initialCursor) {
 9         cursor = initialCursor;
10         snapshot = elements;
11     }
12 
13     // 判斷是否存在下一個元素。
14     public boolean hasNext() {
15         return cursor < snapshot.length;
16     }
17 
18     // 判斷是否存在上一個元素。
19     public boolean hasPrevious() {
20         return cursor > 0;
21     }
22 
23     // 獲取下一個元素。
24     @SuppressWarnings("unchecked")
25     public E next() {
26         if (! hasNext())
27             throw new NoSuchElementException();
28         return (E) snapshot[cursor++];
29     }
30 
31     // 獲取上一個元素。
32     @SuppressWarnings("unchecked")
33     public E previous() {
34         if (! hasPrevious())
35             throw new NoSuchElementException();
36         return (E) snapshot[--cursor];
37     }
38 
39     // 獲取下一個元素的位置。
40     public int nextIndex() {
41         return cursor;
42     }
43 
44     // 獲取上一個元素的位置。
45     public int previousIndex() {
46         return cursor-1;
47     }
48 
49     // 不支持刪除元素。
50     public void remove() {
51         throw new UnsupportedOperationException();
52     }
53 
54     // 不支持修改元素。
55     public void set(E e) {
56         throw new UnsupportedOperationException();
57     }
58 
59     // 不支持添加元素。
60     public void add(E e) {
61         throw new UnsupportedOperationException();
62     }
63 
64     // JDK1.8新增的方法,使用迭代器Iterator的所有元素,並且第二次調用它將不會做任何事情。
65     @Override
66     public void forEachRemaining(Consumer<? super E> action) {
67         Objects.requireNonNull(action);
68         Object[] elements = snapshot;
69         final int size = elements.length;
70         for (int i = cursor; i < size; i++) {
71             @SuppressWarnings("unchecked") E e = (E) elements[i];
72             action.accept(e);
73         }
74         cursor = size;
75     }
76 }

ArrayBlockingQueue

說明

ArrayBlockingQueue內部是通過Object[]數組保存數據的,也就是說ArrayBlockingQueue本質上是通過數組實現的。ArrayBlockingQueue的大小,即數組的容量是創建ArrayBlockingQueue時指定的。

ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關於具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定,ArrayBlockingQueue默認會使用非公平鎖。

ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象。而且,Condition又依賴於ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問。

構造方法

 1 public ArrayBlockingQueue(int capacity) {
 2     this(capacity, false);
 3 }
 4 
 5 public ArrayBlockingQueue(int capacity, boolean fair) {
 6     if (capacity <= 0)
 7         throw new IllegalArgumentException();
 8     this.items = new Object[capacity];
 9     lock = new ReentrantLock(fair);
10     notEmpty = lock.newCondition();
11     notFull =  lock.newCondition();
12 }
13 
14 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
15     this(capacity, fair);
16     // 加鎖是為了保證可見性,因為可能存在其他線程在初始化之后修改集合。
17     final ReentrantLock lock = this.lock;
18     lock.lock();
19     try {
20         int i = 0;
21         try {
22             for (E e : c) {
23                 checkNotNull(e);
24                 items[i++] = e;
25             }
26         } catch (ArrayIndexOutOfBoundsException ex) {
27             throw new IllegalArgumentException();
28         }
29         count = i;
30         putIndex = (i == capacity) ? 0 : i;
31     } finally {
32         lock.unlock();
33     }
34 }

添加元素

ArrayBlockingQueue提供了offer()方法和put()方法兩種方式添加元素。

offer()方法添加失敗會立即返回false,並且添加過程中不允許被其他線程中斷。

put()方法添加失敗會等待,並且在添加過程中可以被其他線程中斷,拋出InterruptedException異常。

 1 // 不允許被其他線程中斷,添加失敗則立即返回false。
 2 public boolean offer(E e) {
 3     checkNotNull(e);
 4     final ReentrantLock lock = this.lock;
 5     lock.lock();
 6     try {
 7         if (count == items.length)
 8             return false;
 9         else {
10             enqueue(e);
11             return true;
12         }
13     } finally {
14         lock.unlock();
15     }
16 }
17 
18 // 允許被其他線程中斷,拋出InterruptedException,並且添加失敗會等待。
19 public void put(E e) throws InterruptedException {
20     checkNotNull(e);
21     final ReentrantLock lock = this.lock;
22     lock.lockInterruptibly();
23     try {
24         while (count == items.length)
25             notFull.await();
26         enqueue(e);
27     } finally {
28         lock.unlock();
29     }
30 }
31 
32 // 實際上的添加方法,添加成功后會喚醒一個等待刪除元素的線程。
33 private void enqueue(E x) {
34     final Object[] items = this.items;
35     items[putIndex] = x;
36     if (++putIndex == items.length)
37         putIndex = 0;
38     count++;
39     notEmpty.signal();
40 }

刪除元素

ArrayBlockingQueue提供了poll()方法和take()方法兩種方式刪除元素。

poll()方法刪除失敗會立即返回false,並且添加過程中不允許被其他線程中斷。

take()方法刪除失敗會等待,並且在刪除過程中可以被其他線程中斷,拋出InterruptedException異常。

 1 // 不允許被其他線程中斷,刪除失敗則立即返回null。
 2 public E poll() {
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         return (count == 0) ? null : dequeue();
 7     } finally {
 8         lock.unlock();
 9     }
10 }
11 
12 // 允許被其他線程中斷,拋出InterruptedException,並且刪除失敗會等待。
13 public E take() throws InterruptedException {
14     final ReentrantLock lock = this.lock;
15     lock.lockInterruptibly();
16     try {
17         while (count == 0)
18             notEmpty.await();
19         return dequeue();
20     } finally {
21         lock.unlock();
22     }
23 }
24 
25 // 實際上的刪除方法,刪除成功后會喚醒一個等待添加元素的線程。
26 private E dequeue() {
27     final Object[] items = this.items;
28     @SuppressWarnings("unchecked")
29     E x = (E) items[takeIndex];
30     items[takeIndex] = null;
31     if (++takeIndex == items.length)
32         takeIndex = 0;
33     count--;
34     if (itrs != null)
35         itrs.elementDequeued();
36     notFull.signal();
37     return x;
38 }

LinkedBlockingQueue

說明

LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。該隊列按FIFO(先進先出)排序元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。

LinkedBlockingQueue是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。

LinkedBlockingQueue在實現多線程對競爭資源的互斥訪問時,對於插入和取出操作分別使用了不同的鎖。此外,插入鎖putLock和非滿條件notFull相關聯,取出鎖takeLock和非空條件notEmpty相關聯。通過notFull和notEmpty更細膩的控制鎖。

屬性

1 head是鏈表的表頭。取出數據時,都是從表頭head處插入。
2 last是鏈表的表尾。新增數據時,都是從表尾last處插入。
3 count是鏈表的實際大小,即當前鏈表中包含的節點個數。
4 capacity是列表的容量,它是在創建鏈表時指定的。
5 putLock是插入鎖。
6 takeLock是取出鎖。
7 notEmpty是非空條件。
8 notFull是非滿條件。

構造方法

1 // 創建一個容量為Integer.MAX_VALUE的LinkedBlockingQueue。
2 LinkedBlockingQueue()
3 // 創建一個指定容量的LinkedBlockingQueue。
4 LinkedBlockingQueue(int capacity)
5 // 創建一個容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含給定collection的元素,元素按該collection迭代器的遍歷順序添加。
6 LinkedBlockingQueue(Collection<? extends E> c)

其他方法

 1 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則等待。
 2 void put(E e)
 3 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則返回false。
 4 boolean offer(E e)
 5 // 將指定元素插入到此隊列的尾部,如果隊列已滿,則等待指定的時間。
 6 boolean offer(E e, long timeout, TimeUnit unit)
 7 // 獲取並移除此隊列的頭部,如果隊列為空,則等待。
 8 E take()
 9 // 獲取並移除此隊列的頭部,如果隊列為空,則返回null。
10 E poll()
11 // 獲取並移除此隊列的頭部,如果隊列為空,則等待指定的時間。
12 E poll(long timeout, TimeUnit unit)
13 // 獲取但不移除此隊列的頭,如果此隊列為空,則返回null。
14 E peek()
15 // 返回在隊列中的元素上按適當順序進行迭代的迭代器。
16 Iterator<E> iterator()

ConcurrentLinkedQueue

說明

ConcurrentLinkedQueue是線程安全的隊列,它適用於“高並發”的場景。ConcurrentLinkedQueue使用CAS來保證更新的線程安全,是一個非阻塞隊列。

ConcurrentLinkedQueue是一個基於鏈表的無界線程安全隊列,按照FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節點除外)。

構造方法

1 // 創建一個最初為空的ConcurrentLinkedQueue。
2 ConcurrentLinkedQueue()
3 // 創建一個最初包含給定collection元素的ConcurrentLinkedQueue,按照此collection迭代器的遍歷順序來添加元素。
4 ConcurrentLinkedQueue(Collection<? extends E> c)

其他方法

 1 // 將指定元素插入此隊列的尾部。
 2 boolean offer(E e)
 3 // 獲取並移除此隊列的頭,如果隊列為空,則返回null。
 4 E poll()
 5 // 獲取但不移除此隊列的頭,如果隊列為空,則返回null。
 6 E peek()
 7 // 返回在此隊列元素上以恰當順序進行迭代的迭代器。
 8 Iterator<E> iterator()
 9 // 返回此隊列中的元素數量。
10 int size()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM