並發容器梳理


 

記錄一個問題先:

AndroidS Studio打包APK時出現問題:org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':app:lintVitalRelease'.

什么意思?任務':app:lintVitalRelease的執行失敗。

關於lint是個什么東西看官網:https://developer.android.com/studio/write/lint?hl=zh-CN

或者:https://blog.csdn.net/u011240877/article/details/54141714#commentBox

總之,這是一個保證代碼的可維護性與助力代碼優化的一個工具,可以檢查出代碼當中不規范或者結構混亂等威脅代碼質量的地方予以警告。

但是Lint的警告當中有的問題是不必要一定解決的,所以過度依賴這個工具,也可能造成代碼編寫時效率低下的問題。

所以在打包的過程當中,由於沒有解決掉lint中警告的問題,沒有辦法通過編譯進行打包。

所以可以在lint配置當中選擇忽略它的警告,例如在app下的build.gradle下android閉包中添加:

lintOptions {
        checkReleaseBuilds false
        abortOnError false
    }

就能成功打包了。

 

 

 

並發容器

ConcurrentHashMap、CopyOnWriteArrayList、阻塞隊列


 

並發容器概覽

  • ConcurrentHashMap:線程安全的HashMap

  • CopyOnWriteArrayList:線程安全的List

  • BlockingQueue:這是一個接口,表示阻塞隊列,非常適合用於作為數據共享的通道。

  • ConcurrentLinkedQueue:高效的非阻塞並發隊列,使用鏈表實現。可以看做是一個線程安全的LinkedList。

  • ConcurrentSkipListMap:是一個Map,使用跳表的數據結構進行快速查找。(不常用)

 

集合類的相關歷史

  • Vector和Hashtable

    這是早期JDK中線程安全的ArrayList和HashMap,並發性能差,所有方法都加上synchronized,那么意味着並發競爭大的時候性能不會太好。

  • ArrayList和HashMap

    雖然這兩個類不是線程安全的,但是可以用Collections.synchronizedList(new ArrayList<E>())Collections.synchronizedMap(new HashMap<K,V>())使之變成安全的。這種方式也是根據傳入的集合類的類型,比如是否RandomAccess類型,實現了RandomAccess接口,來返回一個對應的線程安全的SynchronizedRandomAccessList或者SynchronizedList,而在這些線程安全的list當中,使用synchronized方式並沒有比Vector和Hashtable當中的方法高明到哪兒去,只是沒有加在方法上而已。

  • ConcurrentHashMap、CopyOnWriteArrayList

    這兩種就到了比較不錯的實現了,它們取代同步的HashMap和ArrayList。絕大多數並發情況下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好。除非是一個List經常被修改,那么用Collections.synchronizedList(new ArrayList<E>())會比使用CopyOnWriteArrayList性能更好,因為CopyOnWriteArrayList更適合讀多寫少的場景,每次寫入都會完整復制整個鏈表,比較耗費資源。

 

 


ConcurrentHashMap

  • Map

    Map是一個接口,有這些實現:

    • HashMap

    • Hashtable(性能太低,如果不需要並發就直接使用HashMap,而如果在並發場景下就使用ConcurrentHashMap)

    • LinkedHashMap:HashMap的一個子類,會保存鍵值對的插入順序,在遍歷的時候就有用了,順序和插入順序一致。

    • TreeMap:由於實現了SortedMap接口,所以也就具有排序的功能,也可以自定義排序的規則。所以遍歷的時候也是排過序的。

    常用方法:

    int size();
    boolean isEmpty();
    boolean containsKey(Object key);
    boolean containsValue(Object value);
    V get(Object key);
    V put(K key, V value);
    V remove(Object key);
    Set<K> keySet();
    //等等
  • 為什么需要ConcurrentHashMap

    為什么不用Collections.synchronizedMap(new HashMap<K,V>())?

    為什么HashMap是線程不安全的?

    • 同時put碰撞導致數據丟失

    • 同時put擴容導致數據丟失(擴容之后的數組只有一個會被保存下來)

    • 死循環造成的CPU100%

      這個問題主要是出現JDK1.7及之前存在。

      核心原因就是在多線程同時擴容的時候會造成鏈表的死循環。但是這個問題吧,都說了HashMap不支持並發了,並發場景下使用自然會出現一些問題

      CoolShell

  • HashMap分析

    • JDK1.7拉鏈法

    • JDK1.8拉鏈法升級為紅黑樹

      關於紅黑樹:紅黑樹是對二叉查找樹BST的一種平衡策略,O(logN)vsO(N),會自動平衡,防止極端不平衡從而影響查找效率 的情況發生。紅黑樹的每個節點要么是紅色,要么是黑色,但是根節點永遠是黑色的;而且紅色節點不能連續(也就是,紅色節點的孩子和父親都不能是紅色);從任一節點到其子樹中每個葉子節點的路徑都有相同數量的黑色結點;所有葉結點都是黑色的。

    • HashMap關於並發的特點:

      1. 非線程安全

      2. 迭代的時候不允許修改內容

      3. 只讀的並發是安全的

      4. 如果一定要把HashMap用在並發環境下,

        用Collections.synchronizedMap(new HashMap<K,V>())。

  • JDK1.7中的ConcurrentHashMap的實現分析

    Java7中的ConcurrentHashMap最外層是多個segment,每個segment的底層數據結構和HashMap類似,仍然是數組和鏈表組成的拉鏈法。每個segment獨立上ReentrantLock鎖,每個segment之間互不影響,提高了並發效率。ConcurrentHashMap默認有16個segment,所以最多可以同時支持16個線程並發寫(操作分別分布在不同的segment上),這個默認值可以在初始化的時候設置為其他的值,但是一旦初始化以后,是不可以擴容的。

  • JDK1.8中的ConcurrentHashMap實現和分析

    Java8中的ConcurrentHashMap是把代碼完全的重寫了,代碼量也從一千多行漲到了六千多行。

    putVal流程:

    1. 判斷key不為空

    2. 計算hash值

    3. 根據對應位置的節點類型,來賦值,或者helpTransfer,或者增長鏈表,或者給紅黑樹增加節點。

    4. 檢查滿足閾值就“紅黑樹化”

    5. 返回oldVal。

    get流程:

    1. 計算hash值

    2. 找到對應的位置,根據情況進行:

    3. 直接取值:

    4. 紅黑樹里取值

    5. 遍歷鏈表取值

    6. 返回找到的結果

    圖解ConcurrentHashMap

    源碼分析:

    public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
       implements ConcurrentMap<K,V>, Serializable {
       private static final long serialVersionUID = 7249069246763182397L;
       
       //省略。。。
       
       //初始化,可見沒有任何內容,真正的初始化要等到putval方法中調用initTable方法
       public ConcurrentHashMap() {
           
      }
       
       /**
        * Initializes table, using the size recorded in sizeCtl.
        * 使用sizeCtl中記錄的大小初始化表。
        */
       //初始化 table,通過對 sizeCtl 的變量賦值來保證數組只能被初始化一次。
       private final Node<K, V>[] initTable() {
           Node<K, V>[] tab;
           int sc;
           //通過自旋保證初始化成功
           while ((tab = table) == null || tab.length == 0) {
               // 小於 0 代表有線程正在初始化,釋放當前 CPU 的調度權,重新發起鎖的競爭
               if ((sc = sizeCtl) < 0)
                   Thread.yield(); // lost initialization race; just spin
                   // CAS 賦值保證當前只有一個線程在初始化,-1 代表當前只有一個線程能初始化
                   // 保證了數組的初始化的安全性
               else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                   try {
                       // 很有可能執行到這里的時候,table 已經不為空了,這里是雙重 check
                       if ((tab = table) == null || tab.length == 0) {
                           // 進行初始化
                           int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                           @SuppressWarnings("unchecked")
                           Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                           table = tab = nt;
                           sc = n - (n >>> 2);
                      }
                  } finally {
                       sizeCtl = sc;
                  }
                   break;
              }
          }
           return tab;
      }
       
       //新增
       public V put(K key, V value) {
           return putVal(key, value, false);
      }
       
       final V putVal(K key, V value, boolean onlyIfAbsent) {
           if (key == null || value == null) throw new NullPointerException();
           //計算hash
           int hash = spread(key.hashCode());
           int binCount = 0;
           for (Node<K, V>[] tab = table; ; ) {
               Node<K, V> f;
               int n, i, fh;
               //table是空的,進行初始化
               if (tab == null || (n = tab.length) == 0)
                   tab = initTable();//哈希表的初始化
                   //如果當前索引位置沒有值,直接創建
               else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                   //cas 在 i 位置創建新的元素,當 i 位置是空時,即能創建成功,結束for自循,
                   //否則繼續自旋
                   if (casTabAt(tab, i, null,
                           new Node<K, V>(hash, key, value, null)))
                       break;                   // no lock when adding to empty bin
              }
               //如果當前槽點是轉移節點,表示該槽點正在擴容,就會一直等待擴容完成
               //轉移節點的 hash 值是固定的,都是 MOVED
               else if ((fh = f.hash) == MOVED)
                   tab = helpTransfer(tab, f);
                   //槽點上有值的
               else {
                   V oldVal = null;
                   //鎖定當前槽點,其余線程不能操作,保證了安全
                   synchronized (f) {
                       //這里再次判斷 i 索引位置的數據沒有被修改
                       //binCount 被賦值的話,說明走到了修改表的過程里面
                       if (tabAt(tab, i) == f) {
                           //鏈表
                           if (fh >= 0) {
                               binCount = 1;
                               for (Node<K, V> e = f; ; ++binCount) {
                                   K ek;
                                   //值有的話,直接返回
                                   if (e.hash == hash &&
                                          ((ek = e.key) == key ||
                                                  (ek != null && key.equals(ek)))) {
                                       oldVal = e.val;
                                       if (!onlyIfAbsent)
                                           e.val = value;
                                       break;
                                  }
                                   Node<K, V> pred = e;
                                   //把新增的元素賦值到鏈表的最后,退出自旋
                                   if ((e = e.next) == null) {
                                       pred.next = new Node<K, V>(hash, key,
                                               value, null);
                                       break;
                                  }
                              }
                          }
                           //紅黑樹,這里沒有使用 TreeNode,使用的是 TreeBin,
                           //TreeNode 只是紅黑樹的一個節點
                           //TreeBin 持有紅黑樹的引用,並且會對其加鎖,保證其操作的線程安全
                           else if (f instanceof TreeBin) {
                               Node<K, V> p;
                               binCount = 2;
                               //滿足if的話,把老的值給oldVal
                               //在putTreeVal方法里面,在給紅黑樹重新着色旋轉的時候
                               //會鎖住紅黑樹的根節點
                               if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                       value)) != null) {
                                   oldVal = p.val;
                                   if (!onlyIfAbsent)
                                       p.val = value;
                              }
                          }
                      }
                  }
                   //binCount不為空,並且 oldVal 有值的情況,說明已經新增成功了
                   if (binCount != 0) {
                       // 鏈表是否需要轉化成紅黑樹
                       if (binCount >= TREEIFY_THRESHOLD)
                           treeifyBin(tab, i);
                       if (oldVal != null)
                           return oldVal;
                       //這一步幾乎走不到。槽點已經上鎖,只有在紅黑樹或者鏈表新增失敗的時候
                       //才會走到這里,這兩者新增都是自旋的,幾乎不會失敗
                       break;
                  }
              }
          }
           //check 容器是否需要擴容,如果需要去擴容,調用 transfer 方法去擴容
           //如果已經在擴容中了,check有無完成
           addCount(1L, binCount);
           return null;
      }
       /* put方法的大致思路:
       1.如果數組為空,初始化,初始化完成之后,走 2;
       2.計算當前槽點有沒有值,沒有值的話,cas 創建,失敗繼續自旋(for 死循環),直到成功,
       槽點有值的話,走 3;
       3.如果槽點是轉移節點(正在擴容),就會一直自旋等待擴容完成之后再新增,不是轉移節點走 4;
       4.槽點有值的,先鎖定當前槽點,保證其余線程不能操作,如果是鏈表,新增值到鏈表的尾部,
       如果是紅黑樹,使用紅黑樹新增的方法新增;
       5.新增完成之后 check 需不需要擴容,需要的話去擴容。*/
       
       
       private final void addCount(long x, int check) {
           CounterCell[] as; long b, s;
           if ((as = counterCells) != null ||
               !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
               CounterCell a; long v; int m;
               boolean uncontended = true;
               if (as == null || (m = as.length - 1) < 0 ||
                  (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                   !(uncontended =
                     U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                   fullAddCount(x, uncontended);
                   return;
              }
               if (check <= 1)
                   return;
               s = sumCount();
          }
           if (check >= 0) {
               Node<K,V>[] tab, nt; int n, sc;
               while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                      (n = tab.length) < MAXIMUM_CAPACITY) {
                   int rs = resizeStamp(n);
                   if (sc < 0) {
                       if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                           sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                           transferIndex <= 0)
                           break;
                       if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                           transfer(tab, nt);
                  }
                   else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                (rs << RESIZE_STAMP_SHIFT) + 2))
                       transfer(tab, null);
                   s = sumCount();
              }
          }
      }
       
       
       // 擴容主要分 2 步,第一新建新的空數組,第二移動拷貝每個元素到新數組中去
    // tab:原數組,nextTab:新數組
       private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
           // 老數組的長度
           int n = tab.length, stride;
           if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
               stride = MIN_TRANSFER_STRIDE; // subdivide range
           // 如果新數組為空,初始化,大小為原數組的兩倍,n << 1
           if (nextTab == null) {            // initiating
               try {
                   @SuppressWarnings("unchecked")
                   Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                   nextTab = nt;
              } catch (Throwable ex) {      // try to cope with OOME
                   sizeCtl = Integer.MAX_VALUE;
                   return;
              }
               nextTable = nextTab;
               transferIndex = n;
          }
           // 新數組的長度
           int nextn = nextTab.length;
           // 代表轉移節點,如果原數組上是轉移節點,說明該節點正在被擴容
           ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
           boolean advance = true;
           boolean finishing = false; // to ensure sweep before committing nextTab
           // 無限自旋,i 的值會從原數組的最大值開始,慢慢遞減到 0
           for (int i = 0, bound = 0; ; ) {
               Node<K, V> f;
               int fh;
               while (advance) {
                   int nextIndex, nextBound;
                   // 結束循環的標志
                   if (--i >= bound || finishing)
                       advance = false;
                       // 已經拷貝完成
                   else if ((nextIndex = transferIndex) <= 0) {
                       i = -1;
                       advance = false;
                  }
                   // 每次減少 i 的值
                   else if (U.compareAndSwapInt
                          (this, TRANSFERINDEX, nextIndex,
                                   nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                       bound = nextBound;
                       i = nextIndex - 1;
                       advance = false;
                  }
              }
               // if 任意條件滿足說明拷貝結束了
               if (i < 0 || i >= n || i + n >= nextn) {
                   int sc;
                   // 拷貝結束,直接賦值,因為每次拷貝完一個節點,都在原數組上放轉移節點,所以拷貝完成的節點的數據一定不會再發生變化。
                   // 原數組發現是轉移節點,是不會操作的,會一直等待轉移節點消失之后在進行操作。
                   // 也就是說數組節點一旦被標記為轉移節點,是不會再發生任何變動的,所以不會有任何線程安全的問題
                   // 所以此處直接賦值,沒有任何問題。
                   if (finishing) {
                       nextTable = null;
                       table = nextTab;
                       sizeCtl = (n << 1) - (n >>> 1);
                       return;
                  }
                   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                       if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                           return;
                       finishing = advance = true;
                       i = n; // recheck before commit
                  }
              } else if ((f = tabAt(tab, i)) == null)
                   advance = casTabAt(tab, i, null, fwd);
               else if ((fh = f.hash) == MOVED)
                   advance = true; // already processed
               else {
                   synchronized (f) {
                       // 進行節點的拷貝
                       if (tabAt(tab, i) == f) {
                           Node<K, V> ln, hn;
                           if (fh >= 0) {
                               int runBit = fh & n;
                               Node<K, V> lastRun = f;
                               for (Node<K, V> p = f.next; p != null; p = p.next) {
                                   int b = p.hash & n;
                                   if (b != runBit) {
                                       runBit = b;
                                       lastRun = p;
                                  }
                              }
                               if (runBit == 0) {
                                   ln = lastRun;
                                   hn = null;
                              } else {
                                   hn = lastRun;
                                   ln = null;
                              }
                               // 如果節點只有單個數據,直接拷貝,如果是鏈表,循環多次組成鏈表拷貝
                               for (Node<K, V> p = f; p != lastRun; p = p.next) {
                                   int ph = p.hash;
                                   K pk = p.key;
                                   V pv = p.val;
                                   if ((ph & n) == 0)
                                       ln = new Node<K, V>(ph, pk, pv, ln);
                                   else
                                       hn = new Node<K, V>(ph, pk, pv, hn);
                              }
                               // 在新數組位置上放置拷貝的值
                               setTabAt(nextTab, i, ln);
                               setTabAt(nextTab, i + n, hn);
                               // 在老數組位置上放上 ForwardingNode 節點
                               // put 時,發現是 ForwardingNode 節點,就不會再動這個節點的數據了
                               setTabAt(tab, i, fwd);
                               advance = true;
                          }
                           // 紅黑樹的拷貝
                           else if (f instanceof TreeBin) {
                               // 紅黑樹的拷貝工作,同 HashMap 的內容,代碼忽略
                           …………
                               // 在老數組位置上放上 ForwardingNode 節點
                               setTabAt(tab, i, fwd);
                               advance = true;
                          }
                      }
                  }
              }
          }
      }
       /*ConcurrentHashMap 的擴容時機和 HashMap 相同,
       都是在 put 方法的最后一步檢查是否需要擴容,
       如果需要則進行擴容,但兩者擴容的過程完全不同,
       ConcurrentHashMap 擴容的方法叫做 transfer,
       從 put 方法的 addCount 方法進去,就能找到。
       
       1.首先需要把老數組的值全部拷貝到擴容之后的新數組上,先從數組的隊尾開始拷貝;
       2.拷貝數組的槽點時,先把原數組槽點鎖住,保證原數組槽點不能操作,
       成功拷貝到新數組時,把原數組槽點賦值為轉移節點;
       3.這時如果有新數據正好需要 put 到此槽點時,發現槽點為轉移節點,就會一直等待,
       所以在擴容完成之前,該槽點對應的數據是不會發生變化的;
       4.從數組的尾部拷貝到頭部,每拷貝成功一次,就把原數組中的節點設置成轉移節點;
       5.直到所有數組數據都拷貝到新數組時,直接把新數組整個賦值給數組容器,拷貝完成。*/
       
       
       //獲取
       public V get(Object key) {
           Node<K, V>[] tab;
           Node<K, V> e, p;
           int n, eh;
           K ek;
           //計算hashcode
           int h = spread(key.hashCode());
           //不是空的數組 && 並且當前索引的槽點數據不是空的
           //否則該key對應的值不存在,返回null
           if ((tab = table) != null && (n = tab.length) > 0 &&
                  (e = tabAt(tab, (n - 1) & h)) != null) {
               //槽點第一個值和key相等,直接返回
               if ((eh = e.hash) == h) {
                   if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                       return e.val;
              }
               //如果是紅黑樹或者轉移節點,使用對應的find方法
               else if (eh < 0)
                   return (p = e.find(h, key)) != null ? p.val : null;
               //如果是鏈表,遍歷查找
               while ((e = e.next) != null) {
                   if (e.hash == h &&
                          ((ek = e.key) == key || (ek != null && key.equals(ek))))
                       return e.val;
              }
          }
           return null;
      }
       /*ConcurrentHashMap 的獲取,先獲取數組的下標,
       然后通過判斷數組下標的 key 是否和我們的 key 相等,相等的話直接返回,
       如果下標的槽點是鏈表或紅黑樹的話,分別調用相應的查找數據的方法,
       整體思路和 HashMap 相似。*/
    }
    • 數組初始化時的線程安全

      數組初始化時,首先通過自旋來保證一定可以初始化成功,然后通過 CAS 設置 SIZECTL 變量的值,來保證同一時刻只能有一個線程對數組進行初始化,CAS 成功之后,還會再次判斷當前數組是否已經初始化完成,如果已經初始化完成,就不會再次初始化,通過自旋 + CAS + 雙重 check 等手段保證了數組初始化時的線程安全。sizeCtl 默認值為0,當一個線程初始化數組時,會將 sizeCtl 改成 -1,由於被 volatile 修飾,對於其他線程來說這個變化是可見的,上面代碼看到后續線程判斷 sizeCtl 小於0 就會讓出執行權。

    • 新增槽點值時的線程安全

      保證了各種情況下的新增(不考慮擴容的情況下)線程安全,通過自旋 + CAS + 鎖

      1. 通過自旋死循環保證一定可以新增成功。

        在新增之前,通過 for (Node[] tab = table;;) 這樣的死循環來保證新增一定可以成功,一旦新增成功,就可以退出當前死循環,新增失敗的話,會重復新增的步驟,直到新增成功為止。

      2. 當前槽點為空時,通過 CAS 新增。

        這里的寫法非常嚴謹,沒有在判斷槽點為空的情況下直接賦值,因為在判斷槽點為空和賦值的瞬間,很有可能槽點已經被其他線程賦值了,所以采用 CAS 算法,能夠保證槽點為空的情況下賦值成功,如果恰好槽點已經被其他線程賦值,當前 CAS 操作失敗,會再次執行 for 自旋,再走槽點有值的 put 流程,這里就是自旋 + CAS 的結合。

      3. 當前槽點有值,鎖住當前槽點。

        put 時,如果當前槽點有值,就是 key 的 hash 沖突的情況,此時槽點上可能是鏈表或紅黑樹,我們通過synchronized (f)鎖住槽點,來保證同一時刻只會有一個線程能對槽點進行修改。

      4. 紅黑樹旋轉時,鎖住紅黑樹的根節點,保證同一時刻,當前紅黑樹只能被一個線程旋轉。

    • 擴容時的線程安全

      Java8 ConcunrrentHashMap 支持並發擴容,之前擴容總是由一個線程將舊數組中的鍵值對轉移到新的數組中,支持並發的話,轉移所需要的時間就可以縮短了,當然相應的並發處理控制邏輯也就更復雜了,擴容轉移通過 transfer 方法完成。

      擴容方法通過在原數組上設置轉移節點,put 時碰到轉移節點時會等待擴容成功之后才能 put 的策略,來保證了整個擴容過程中肯定是線程安全的,因為數組的槽點一旦被設置成轉移節點,在沒有擴容完成之前,是無法進行操作的。

      1. 拷貝槽點時,會把原數組的槽點鎖住;

      2. 拷貝成功之后,會把原數組的槽點設置成轉移節點,這樣如果有數據需要 put 到該節點時,發現該槽點是轉移節點,會一直等待直到擴容成功,才能繼續 put,可參考 put 方法中的 helpTransfer 方法;

      3. 從尾到頭進行拷貝,拷貝成功就把原數組的槽點設置成轉移節點。

      4. 等擴容拷貝都完成之后,直接把新數組的值賦值給數組容器,之前等待 put 的數據才能繼續 put。

  • 對於JDK1.7和1.8的缺點,為什么要把1.7的結構改成1.8的結構?

    • 數據結構

    • Hash碰撞

    • 保證並發安全

    • 查詢復雜度

    • 為什么超過8要轉為紅黑樹?

      紅黑樹的每個結點占用的空間是鏈表的兩倍,空間損耗是要大些的比起鏈表,所以開始的時候是默認使用占用空間更少的鏈表,8的這個取值是通過泊松分布得出的,鏈表長度增加到8也就是沖突達到8次的時候概率已經相當小,所以一般情況下鏈表的長度不會達到8,如果遇到了這種達到8情況,只能說明hash算法出現了問題。所以為了保證這種極端情況下,ConcurrentHashMap仍然有較高的查詢效率,所以就從這個情況開始轉化為紅黑樹。

  • 組合操作:ConcurrentHashMap也不是線程安全的?

    錯誤使用可能會造成線程不安全的情況,而不是說一個工具是線程安全的,使用起來就一定是線程安全的

    所以ConcurrentHashMap提供了一些組合操作:

    replace方法把一個值根據當前的值決定是否進行替換。

    putIfAbsent如果當前key不存在值的時候才放入。

  • 實際生產案例

 


CopyOnWriteArrayList

  • 誕生歷史和原因

    從JDK1.5就存在。是List當中的重要並發工具,代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一樣。Vector和SynchronizedList鎖粒度太大,並發效率相對較低,並且迭代時無法編輯。除了CopyOnWriteArrayList以外,Copy-On-Write並發容器還包括了CopyOnWriteArraySet,用來替代同步Set。

  • 適用場景

    讀操作可以盡可能快,而寫操作慢一些也沒有太大關系。

    讀多寫少:黑名單,每日更新;監聽器,迭代操作遠多於修改操作。

  • 讀寫規則

    讀寫鎖當中是讀讀共享,其他互斥(寫寫、讀寫、寫讀)

    在CopyOnWriteArrayList中讀寫鎖規則升級:讀取完全不用加鎖,並且寫入也不會阻塞讀取操作。只有寫入和寫入之間才需要進行同步等待。

    但是在迭代過程中進行修改以后,讀過程對寫過程是沒有感知的,讀出的數據是修改之前的數據。

  • 實現原理

    CopyOnWrite創建新副本,讀寫分離。底層通過新建一個新的數組,修改的時候對原有數據進行一次復制,把修改的內容寫入新的副本中,最后再替換回去,所以是一種讀寫分離的思想,讀和寫使用完全不同的容器。因為對數據的實時性要求不高的情況下使用。舊容器具有不可變的特性,完全可以並發讀沒有問題。迭代的時候讀的是舊數據,迭代時修改不報錯,只是說迭代的數據可能是過期的。

    public class ArrayList<E> extends AbstractList<E>
           implements List<E>, RandomAccess, Cloneable, java.io.Serializable
    {
       private class Itr implements Iterator<E> {
           //修改的次數在創建迭代器的時候就記錄下來了。
           int expectedModCount = modCount;
           
           @SuppressWarnings("unchecked")
           public E next() {
               //這個方法里面會去檢查修改的次數
               checkForComodification();
               int i = cursor;
               if (i >= size)
                   throw new NoSuchElementException();
               Object[] elementData = ArrayList.this.elementData;
               if (i >= elementData.length)
                   throw new ConcurrentModificationException();
               cursor = i + 1;
               return (E) elementData[lastRet = i];
          }
           
           final void checkForComodification() {
               //可以看到當修改的次數和記錄的次數不一樣的時候就會拋出一個異常
               if (modCount != expectedModCount)
                   throw new ConcurrentModificationException();
          }
      }
    }
  • 缺點

    數據一致性問題:CopyOnWrite容器只能保證數據的最終一致性,不能保證數據的實時一致性。所以如果希望寫入的數據馬上能夠讀到,就不應該使用CopyOnWrite容器

    內存占用問題:因為CopyOnWrite的寫時復制機制,所以在進行寫操作的時候,內存里會同時駐扎兩個對象的內存。

  • 源碼分析

    數據結構:

    //JDK1.8源碼
    public class CopyOnWriteArrayList<E>
       implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
       /** The lock protecting all mutators */
       final transient ReentrantLock lock = new ReentrantLock();
       /** The array, accessed only via getArray/setArray. */
       private transient volatile Object[] array;
       
       /**
        * Creates an empty list.
        */
       public CopyOnWriteArrayList() {
           setArray(new Object[0]);
      }
       
       public boolean add(E e) {
           final ReentrantLock lock = this.lock;
           lock.lock();
           try {
               Object[] elements = getArray();
               int len = elements.length;
               //關鍵部分,copy數組
               Object[] newElements = Arrays.copyOf(elements, len + 1);
               newElements[len] = e;
               setArray(newElements);
               return true;
          } finally {
               lock.unlock();
          }
      }
       
       //沒有看到get操作有進行過加鎖
       public E get(int index) {
           return get(getArray(), index);
      }
       
       private E get(Object[] a, int index) {
           return (E) a[index];
      }
    }

     

並發隊列Queue(阻塞隊列與非阻塞隊列)

  • 為什么使用隊列

    用隊列可以在線程間傳遞數據:生產者消費者模式、銀行轉賬。

    考慮鎖等線程安全問題的重任從使用者轉移到了隊列上了。

  • 並發隊列

    Queue

    BlockingQueue

  • 各個並發隊列關系

  • 阻塞隊列BlockingQueue

    什么是阻塞隊列:阻塞隊列是具有阻塞功能的隊列,所以首先它是一個隊列,其次是具有阻塞功能。通常,阻塞隊列的一端是給生產者放數據用,另一端給消費者拿數據用。阻塞隊列是線程安全的,所以生產者和消費者都可以是多線程的。

    主要方法:最有特色的帶有兩個阻塞功能的方法是take()和put()。take()方法:獲取並且移除隊列的頭結點,一旦如果執行take的時候,隊列里無數據,就阻塞,直到隊列里有數據。put()方法:插入元素。但是如果隊列已滿,那么就無法繼續插入,則阻塞,直到隊列有了空閑空間。

    • put、take(阻塞)

    • add、remove、element

    • offer、poll、peek

    是否有界(容量多大):這是一個非常重要的屬性,無界隊列意味着里面可以容納非常多(Integer.MAX_VALUE,約為2的31次,是一個非常大的數,可以近似認為是無限容量)

    阻塞隊列和線程池的關系:阻塞隊列是線程池的重要組成部分

    • ArrayBlockingQueue

      有界,可以指定容量

      公平:可以指定是否需要保證公平,如果需要保證公平的話,等待了最長時間的線程會被優先處理,不過這同時會帶來一定的性能損耗。

      //Java10
      public class ArrayBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable {
         //省略
         public void put(E e) throws InterruptedException {
             Objects.requireNonNull(e);
             final ReentrantLock lock = this.lock;
             //意味着put方法在這期間是可以被中斷的
             lock.lockInterruptibly();
             try {
                 //如果當前隊列滿了,就會開始等待
                 while (count == items.length)
                     notFull.await();
                 //否則,入隊
                 enqueue(e);
            } finally {
                 lock.unlock();
            }
        }
      }
    • LinkedBlockingQueue

      無界,容量Integer.MAX_VALUE

      內部結構:Node、兩把鎖。

      public class LinkedBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable {
         /**
          * Linked list node class.
          */
         static class Node<E> {
             E item;
             Node<E> next;
             Node(E x) { item = x; }
        }
         
         /** Lock held by take, poll, etc */
         private final ReentrantLock takeLock = new ReentrantLock();
         /** Lock held by put, offer, etc */
         private final ReentrantLock putLock = new ReentrantLock();
         //兩把鎖,都是ReentrantLock,就可以讓take和put互不干擾
         
         public void put(E e) throws InterruptedException {
             //不允許放入空
             if (e == null) throw new NullPointerException();
             // Note: convention in all put/take/etc is to preset local var
             // holding count negative to indicate failure unless set.
             int c = -1;
             Node<E> node = new Node<E>(e);
             final ReentrantLock putLock = this.putLock;
             final AtomicInteger count = this.count;
             //上鎖
             putLock.lockInterruptibly();
             try {
                 /*
                  * Note that count is used in wait guard even though it is
                  * not protected by lock. This works because count can
                  * only decrease at this point (all other puts are shut
                  * out by lock), and we (or some other waiting put) are
                  * signalled if it ever changes from capacity. Similarly
                  * for all other uses of count in other wait guards.
                  */
                 //滿了,進入阻塞
                 while (count.get() == capacity) {
                     notFull.await();
                }
                 //沒滿,放入隊列
                 enqueue(node);
                 c = count.getAndIncrement();
                 //如果放入以后還沒有滿,喚醒一個線程
                 if (c + 1 < capacity)
                     notFull.signal();
            } finally {
                 //釋放鎖
                 putLock.unlock();
            }
             if (c == 0)
                 signalNotEmpty();
        }
      }
    • PriorityBlockingQueue

      支持優先級

      自然排序(而不是先進先出)(可以自己指定排序規則)

      無界隊列(容量不夠會進行擴容,所以put不會阻塞,take可能阻塞)

      PriorityQueue的線程安全版本(不能插入null值,並且插入的值一定是可以比較的)

    • SynchronousQueue

      容量為0,它內部是不存儲的。

      需要注意的是,SynchronousQueue的容量不是1而是0,因為SynchronousQueue不需要去持有元素,它要做的就是直接傳遞,中間不做存儲,所以效率很高。是一個極好的用來直接傳遞的並發數據結構。

      SynchronousQueue是線程池Executors.newCachedThreadPool()使用的阻塞隊列

      SynchronousQueue沒有peek等函數,因為peek的含義是取出頭結點,但是SynchronousQueue的容量是0,所以連頭結點都沒有,也就沒有peek方法。同理也沒有iterate方法。

    • DelayQueue

      無界延遲隊列,根據延遲時間排序。元素必須實現Delayed接口,規定排序規則。

  • 非阻塞隊列

    並發包中的非阻塞隊列只有ConcurrentLinkedQueue這一種,顧名思義就是使用鏈表作為數據結構,使用CAS非阻塞算法來實現線程安全(不具備阻塞功能),適合用在對性能要求較高的並發場景。用的相對較少些。

    //Java8
    class ConcurrentLinkedQueue{
       //省略。。。
       
       public boolean offer(E e) {
           checkNotNull(e);
           final Node<E> newNode = new Node<E>(e);

           for (Node<E> t = tail, p = t;;) {
               Node<E> q = p.next;
               if (q == null) {
                   // p is last node
                   //CAS操作
                   if (p.casNext(null, newNode)) {
                       // Successful CAS is the linearization point
                       // for e to become an element of this queue,
                       // and for newNode to become "live".
                       if (p != t) // hop two nodes at a time
                           casTail(t, newNode);  // Failure is OK.
                       return true;
                  }
                   // Lost CAS race to another thread; re-read next
              }
               else if (p == q)
                   // We have fallen off list. If tail is unchanged, it
                   // will also be off-list, in which case we need to
                   // jump to head, from which all live nodes are always
                   // reachable. Else the new tail is a better bet.
                   p = (t != (t = tail)) ? t : head;
               else
                   // Check for tail updates after two hops.
                   p = (p != t && t != (t = tail)) ? t : q;
          }
      }
       
       
       private static class Node<E> {
           //省略。。。
           boolean casNext(Node<E> cmp, Node<E> val) {
               return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
          }
      }
    }
  • 如何選擇適合的隊列

    • 邊界以及擴容

    • 空間

    • 吞吐量,LinkedBlockingQueue的鎖粒度比ArrayBlockingQueue更小,兩把鎖。等等

       

各並發容器總結

juc包中提供的容器分為三類:Concurrent*、CopyOnWrite*、Blocking*

Concurrent的特點是大部分通過CAS來實現並發,而CopyOnWrite則是通過復制一份原數據來實現的,Blocking通過AQS實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM