ConcurrentHashMap總結


https://my.oschina.net/hosee/blog/675884

https://blog.csdn.net/justloveyou_/article/details/72783008

https://blog.csdn.net/jianghuxiaojin/article/details/52006118

 

並發編程實踐中,ConcurrentHashMap是一個經常被使用的數據結構,相比於Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎上提供了更好的寫並發能力,但同時降低了對讀一致性的要求(這點好像CAP理論啊 O(∩_∩)O)。ConcurrentHashMap的設計與實現非常精巧,大量的利用了volatile,final,CAS等lock-free技術來減少鎖競爭對於性能的影響,無論對於Java並發編程的學習還是Java內存模型的理解,ConcurrentHashMap的設計以及源碼都值得非常仔細的閱讀與揣摩。

這篇日志記錄了自己對ConcurrentHashMap的一些總結,由於JDK6,7,8中實現都不同,需要分開闡述在不同版本中的ConcurrentHashMap。

之前已經在ConcurrentHashMap原理分析中解釋了ConcurrentHashMap的原理,主要是從代碼的角度來闡述是源碼是如何寫的,本文仍然從源碼出發,挑選個人覺得重要的點(會用紅色標注)再次進行回顧,以及闡述ConcurrentHashMap的一些注意點。

1. JDK6與JDK7中的實現

1.1 設計思路

ConcurrentHashMap采用了分段鎖的設計,只有在同一個分段內才存在競態關系,不同的分段鎖之間沒有鎖競爭。相比於對整個Map加鎖的設計,分段鎖大大的提高了高並發環境下的處理能力。但同時,由於不是對整個Map加鎖,導致一些需要掃描整個Map的方法(如size(), containsValue())需要使用特殊的實現,另外一些方法(如clear())甚至放棄了對一致性的要求(ConcurrentHashMap是弱一致性的,具體請查看ConcurrentHashMap能完全替代HashTable嗎?)。

ConcurrentHashMap中的分段鎖稱為Segment,它即類似於HashMap(JDK7與JDK8中HashMap的實現)的結構,即內部擁有一個Entry數組,數組中的每個元素又是一個鏈表;同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。ConcurrentHashMap中的HashEntry相對於HashMap中的Entry有一定的差異性:HashEntry中的value以及next都被volatile修飾,這樣在多線程讀寫過程中能夠保持它們的可見性,代碼如下:

static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;

1.2 並發度(Concurrency Level)

並發度可以理解為程序運行時能夠同時更新ConccurentHashMap且不產生鎖競爭的最大線程數,實際上就是ConcurrentHashMap中的分段鎖個數,即Segment[]的數組長度。ConcurrentHashMap默認的並發度為16,但用戶也可以在構造函數中設置並發度。當用戶設置並發度時,ConcurrentHashMap會使用大於等於該值的最小2冪指數作為實際並發度(假如用戶設置並發度為17,實際並發度則為32)。運行時通過將key的高n位(n = 32 – segmentShift)和並發度減1(segmentMask)做位與運算定位到所在的Segment。segmentShift與segmentMask都是在構造過程中根據concurrency level被相應的計算出來。

如果並發度設置的過小,會帶來嚴重的鎖競爭問題;如果並發度設置的過大,原本位於同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。(文檔的說法是根據你並發的線程數量決定,太多會導性能降低)

1.3 創建分段鎖

和JDK6不同,JDK7中除了第一個Segment之外,剩余的Segments采用的是延遲初始化的機制:每次put之前都需要檢查key對應的Segment是否為null,如果是則調用ensureSegment()以確保對應的Segment被創建。

ensureSegment可能在並發環境下被調用,但與想象中不同,ensureSegment並未使用鎖來控制競爭,而是使用了Unsafe對象的getObjectVolatile()提供的原子讀語義結合CAS來確保Segment創建的原子性。代碼段如下:

if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } }

1.4 put/putIfAbsent/putAll

和JDK6一樣,ConcurrentHashMap的put方法被代理到了對應的Segment(定位Segment的原理之前已經描述過)中。與JDK6不同的是,JDK7版本的ConcurrentHashMap在獲得Segment鎖的過程中,做了一定的優化 - 在真正申請鎖之前,put方法會通過tryLock()方法嘗試獲得鎖,在嘗試獲得鎖的過程中會對對應hashcode的鏈表進行遍歷,如果遍歷完畢仍然找不到與key相同的HashEntry節點,則為后續的put操作提前創建一個HashEntry。當tryLock一定次數后仍無法獲得鎖,則通過lock申請鎖。

需要注意的是,由於在並發環境下,其他線程的put,rehash或者remove操作可能會導致鏈表頭結點的變化,因此在過程中需要進行檢查,如果頭結點發生變化則重新對表進行遍歷。而如果其他線程引起了鏈表中的某個節點被刪除,即使該變化因為是非原子寫操作(刪除節點后鏈接后續節點調用的是Unsafe.putOrderedObject(),該方法不提供原子寫語義)可能導致當前線程無法觀察到,但因為不影響遍歷的正確性所以忽略不計。

之所以在獲取鎖的過程中對整個鏈表進行遍歷,主要目的是希望遍歷的鏈表被CPU cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能。

在獲得鎖之后,Segment對鏈表進行遍歷,如果某個HashEntry節點具有相同的key,則更新該HashEntry的value值,否則新建一個HashEntry節點,將它設置為鏈表的新head節點並將原頭節點設為新head的下一個節點。新建過程中如果節點總數(含新建的HashEntry)超過threshold,則調用rehash()方法對Segment進行擴容,最后將新建HashEntry寫入到數組中。

put方法中,鏈接新節點的下一個節點(HashEntry.setNext())以及將鏈表寫入到數組中(setEntryAt())都是通過Unsafe的putOrderedObject()方法來實現,這里並未使用具有原子寫語義的putObjectVolatile()的原因是:JMM會保證獲得鎖到釋放鎖之間所有對象的狀態更新都會在鎖被釋放之后更新到主存,從而保證這些變更對其他線程是可見的。

1.5 rehash

相對於HashMap的resize,ConcurrentHashMap的rehash原理類似,但是Doug Lea為rehash做了一定的優化,避免讓所有的節點都進行復制操作:由於擴容是基於2的冪指來操作,假設擴容前某HashEntry對應到Segment中數組的index為i,數組的容量為capacity,那么擴容后該HashEntry對應到新數組中的index只可能為i或者i+capacity,因此大多數HashEntry節點在擴容前后index可以保持不變。基於此,rehash方法中會定位第一個后續所有節點在擴容后index都保持不變的節點,然后將這個節點之前的所有節點重排即可。這部分代碼如下:

 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }

1.6 remove

和put類似,remove在真正獲得鎖之前,也會對鏈表進行遍歷以提高緩存命中率。

1.7 get與containsKey

get與containsKey兩個方法幾乎完全一致:他們都沒有使用鎖,而是通過Unsafe對象的getObjectVolatile()方法提供的原子讀語義,來獲得Segment以及對應的鏈表,然后對鏈表遍歷判斷是否存在key相同的節點以及獲得該節點的value。但由於遍歷過程中其他線程可能對鏈表結構做了調整,因此get和containsKey返回的可能是過時的數據,這一點是ConcurrentHashMap在弱一致性上的體現。如果要求強一致性,那么必須使用Collections.synchronizedMap()方法。

1.8 size、containsValue

這些方法都是基於整個ConcurrentHashMap來進行操作的,他們的原理也基本類似:首先不加鎖循環執行以下操作:循環所有的Segment(通過Unsafe的getObjectVolatile()以保證原子讀語義),獲得對應的值以及所有Segment的modcount之和。如果連續兩次所有Segment的modcount和相等,則過程中沒有發生其他線程修改ConcurrentHashMap的情況,返回獲得的值。

當循環次數超過預定義的值時,這時需要對所有的Segment依次進行加鎖,獲取返回值后再依次解鎖。值得注意的是,加鎖過程中要強制創建所有的Segment,否則容易出現其他線程創建Segment並進行put,remove等操作。代碼如下:

for(int j =0; j < segments.length; ++j) ensureSegment(j).lock();// force creation

一般來說,應該避免在多線程環境下使用size和containsValue方法。

注1:modcount在put, replace, remove以及clear等方法中都會被修改。

注2:對於containsValue方法來說,如果在循環過程中發現匹配value的HashEntry,則直接返回true。

最后,與HashMap不同的是,ConcurrentHashMap並不允許key或者value為null,按照Doug Lea的說法,這么設計的原因是在ConcurrentHashMap中,一旦value出現null,則代表HashEntry的key/value沒有映射完成就被其他線程所見,需要特殊處理。在JDK6中,get方法的實現中就有一段對HashEntry.value == null的防御性判斷。但Doug Lea也承認實際運行過程中,這種情況似乎不可能發生(參考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。

2. JDK8中的實現

ConcurrentHashMap在JDK8中進行了巨大改動,很需要通過源碼來再次學習下Doug Lea的實現方法。

它摒棄了Segment(鎖段)的概念,而是啟用了一種全新的方式實現,利用CAS算法。它沿用了與它同時期的HashMap版本的思想,底層依然由“數組”+鏈表+紅黑樹的方式思想(JDK7與JDK8中HashMap的實現),但是為了做到並發,又增加了很多輔助的類,例如TreeBin,Traverser等對象內部類。

2.1 重要的屬性

首先來看幾個重要的屬性,與HashMap相同的就不再介紹了,這里重點解釋一下sizeCtl這個屬性。可以說它是ConcurrentHashMap中出鏡率很高的一個屬性,因為它是一個控制標識符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含義。

  • 負數代表正在進行初始化或擴容操作
  • -1代表正在初始化
  • -N 表示有N-1個線程正在進行擴容操作
  • 正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小,這一點類似於擴容閾值的概念。還后面可以看到,它的值始終是當前ConcurrentHashMap容量的0.75倍,這與loadfactor是對應的。
/** * 盛裝Node元素的數組 它的大小是2的整數次冪 * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table; /** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. hash表初始化或擴容時的一個控制位標識量。 負數代表正在進行初始化或擴容操作 -1代表正在初始化 -N 表示有N-1個線程正在進行擴容操作 正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小 */ private transient volatile int sizeCtl; // 以下兩個是用來控制擴容的時候 單線程進入的變量 /** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static int RESIZE_STAMP_BITS = 16; /** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash值是-1,表示這是一個forwardNode節點 static final int TREEBIN = -2; // hash值是-2 表示這時一個TreeBin節點

2.2 重要的類

2.2.1 Node

Node是最核心的內部類,它包裝了key-value鍵值對,所有插入ConcurrentHashMap的數據都包裝在這里面。它與HashMap中的定義很相似,但是但是有一些差別它對value和next屬性設置了volatile同步鎖(與JDK7的Segment相同),它不允許調用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法。

2.2.2 TreeNode

樹節點類,另外一個核心的數據結構。當鏈表長度過長的時候,會轉換為TreeNode。但是與HashMap不相同的是,它並不是直接轉換為紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的包裝。而且TreeNode在ConcurrentHashMap集成自Node類,而並非HashMap中的集成自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣做的目的是方便基於TreeBin的訪問。

2.2.3 TreeBin

這個類並不負責包裝用戶的key、value信息,而是包裝的很多TreeNode節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap“數組”中,存放的是TreeBin對象,而不是TreeNode對象,這是與HashMap的區別。另外這個類還帶有了讀寫鎖。

這里僅貼出它的構造方法。可以看到在構造TreeBin節點時,僅僅指定了它的hash值為TREEBIN常量,這也就是個標識為。同時也看到我們熟悉的紅黑樹構造方法

2.2.4 ForwardingNode

一個用於連接兩個table的節點類。它包含一個nextTable指針,用於指向下一張表。而且這個節點的key value next指針全部為null,它的hash值為-1. 這里面定義的find的方法是從nextTable里進行查詢節點,而不是以自身為頭節點進行查找。

/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }

2.3 Unsafe與CAS

在ConcurrentHashMap中,隨處可以看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實現無鎖化的修改值的操作,他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較當前內存中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為當前線程中的值已經不是最新的值,你的修改很可能會覆蓋掉其他線程修改的結果。這一點與樂觀鎖,SVN的思想是比較類似的。

2.3.1 unsafe靜態塊

unsafe代碼塊控制了一些屬性的修改工作,比如最常用的SIZECTL 。在這一版本的concurrentHashMap中,大量應用來的CAS方法進行變量、屬性的修改工作。利用CAS進行無鎖操作,可以大大提高性能。

 private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } 

2.3.2 三個核心方法

ConcurrentHashMap定義了三個原子操作,用於對指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的線程安全。

//獲得在i位置上的Node節點 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法設置i位置上的Node節點。之所以能實現並發是因為他指定了原來這個節點的值是多少 //在CAS算法中,會比較內存中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改 //因此當前線程中的值並不是最新的值,這種修改可能會覆蓋掉其他線程的修改結果 有點類似於SVN static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法設置節點位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }

2.4 初始化方法initTable

對於ConcurrentHashMap來說,調用它的構造方法僅僅是設置了一些參數而已。而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發生的。如調用put、computeIfAbsent、compute、merge等方法的時候,調用時機是檢查table==null。

初始化方法主要應用了關鍵屬性sizeCtl 如果這個值〈0,表示其他線程正在進行初始化,就放棄這個操作。在這也可以看出ConcurrentHashMap的初始化只能由一個線程完成。如果獲得了初始化權限,就用CAS方法將sizeCtl置為-1,防止其他線程進入。初始化數組后,將sizeCtl的值改為0.75*n。

/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl表示有其他線程正在進行初始化操作,把線程掛起。對於table的初始化工作,只能有一個線程在進行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置為-1 表示本線程正在進行初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2);//相當於0.75*n 設置一個擴容的閾值 } } finally { sizeCtl = sc; } break; } } return tab; }

2.5 擴容方法 transfer

當ConcurrentHashMap容量不足的時候,需要對table進行擴容。這個方法的基本思想跟HashMap是很像的,但是由於它是支持並發擴容的,所以要復雜的多。原因是它支持多線程進行擴容操作,而並沒有加鎖。我想這樣做的目的不僅僅是為了滿足concurrent的要求,而是希望利用並發處理去減少擴容帶來的時間影響。因為在擴容的時候,總是會涉及到從一個“數組”到另一個“數組”拷貝的操作,如果這個操作能夠並發進行,那真真是極好的了。

整個擴容操作分為兩個部分

  •  第一部分是構建一個nextTable,它的容量是原來的兩倍,這個操作是單線程完成的。這個單線程的保證是通過RESIZE_STAMP_SHIFT這個常量經過一次運算來保證的,這個地方在后面會有提到;

  • 第二個部分就是將原來table中的元素復制到nextTable中,這里允許多線程進行操作。

先來看一下單線程是如何完成的:

它的大體思想就是遍歷、復制的過程。首先根據運算得到需要遍歷的次數i,然后利用tabAt方法獲得i位置的元素:

  • 如果這個位置為空,就在原table中的i位置放入forwardNode節點,這個也是觸發並發擴容的關鍵點;

  • 如果這個位置是Node節點(fh>=0),如果它是一個鏈表的頭節點,就構造一個反序鏈表,把他們分別放在nextTable的i和i+n的位置上

  • 如果這個位置是TreeBin節點(fh<0),也做一個反序處理,並且判斷是否需要untreefi,把處理的結果分別放在nextTable的i和i+n的位置上

  • 遍歷過所有的節點以后就完成了復制工作,這時讓nextTable作為新的table,並且更新sizeCtl為新容量的0.75倍 ,完成擴容。

再看一下多線程是如何完成的:

在代碼的69行有一個判斷,如果遍歷到的節點是forward節點,就向后繼續遍歷,再加上給節點上鎖的機制,就完成了多線程的控制。多線程遍歷節點,處理了一個節點,就把對應點的值set為forward,另一個線程看到forward,就向后遍歷。這樣交叉就完成了復制工作。而且還很好的解決了線程安全的問題。 這個方法的設計實在是讓我膜拜。

 

  /** * 一個過渡的table表 只有在擴容的時候才會使用 */ private transient volatile Node<K,V>[] nextTable; /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//構造一個nextTable對象 它的容量是原來的兩倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//構造一個連節點指針 用於標志位 boolean advance = true;//並發擴容的關鍵屬性 如果等於true 說明這個節點已經處理過 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //這個while循環體的作用就是在控制i-- 通過i--可以依次遍歷原hash表中的節點 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //如果所有的節點都已經完成復制工作 就把nextTable賦值給table 清空臨時對象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//擴容閾值設置為原來容量的1.5倍 依然相當於現在容量的0.75倍 return; } //利用CAS方法更新這個擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍歷到的節點為空 則放入ForwardingNode指針 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果遍歷到ForwardingNode節點 說明這個點已經被處理過了 直接跳過 這里是控制並發擴容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //節點上鎖 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //如果fh>=0 證明這是一個Node節點 if (fh >= 0) { int runBit = fh & n; //以下的部分在完成的工作是構造兩個鏈表 一個是原鏈表 另一個是原鏈表的反序排列 Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節點 表示已經處理過該節點 setTabAt(tab, i, fwd); //設置advance為true 返回到上面的while循環中 就可以執行i--操作 advance = true; } //對TreeBin對象進行處理 與上面的過程類似 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //構造正序和反序兩個鏈表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果擴容后已經不再需要tree的結構 反向轉換為鏈表結構 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節點 表示已經處理過該節點 setTabAt(tab, i, fwd); //設置advance為true 返回到上面的while循環中 就可以執行i--操作 advance = true; } } } } } }

2.6 Put方法

前面的所有的介紹其實都為這個方法做鋪墊。ConcurrentHashMap最常用的就是put和get兩個方法。現在來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,如果i位置是空的,直接放進去,否則進行判斷,如果i位置是樹節點,按照樹的方式插入新的節點,否則把i插入到鏈表的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不同點就是ConcurrentHashMap不允許key或value為null值。另外由於涉及到多線程,put方法就要復雜一點。在多線程中可能有以下兩個情況

  1. 如果一個或多個線程正在對ConcurrentHashMap進行擴容操作,當前線程也要進入擴容的操作中。這個擴容的操作之所以能被檢測到,是因為transfer方法中在空結點上插入forward節點,如果檢測到需要插入的位置被forward節點占有,就幫助進行擴容;

  2. 如果檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。

整體流程就是首先定義不允許key或value為null的情況放入  對於每一個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來確定這個值在table中的位置。

如果這個位置是空的,那么直接放入,而且不需要加鎖操作。

 如果這個位置存在結點,說明發生了hash碰撞,首先判斷這個節點的類型。如果是鏈表節點(fh>0),則得到的結點就是hash值相同的節點組成的鏈表的頭節點。需要依次向后遍歷確定這個新加入的值所在位置。如果遇到hash值與key值都與新加入節點是一致的情況,則只需要更新value值即可。否則依次向后遍歷,直到鏈表尾插入這個結點。如果加入這個節點以后鏈表長度大於8,就把這個鏈表轉換成紅黑樹。如果這個節點的類型已經是樹節點的話,直接調用樹節點的插入方法進行插入新的值。

public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //不允許 key或value為null if (key == null || value == null) throw new NullPointerException(); //計算hash值 int hash = spread(key.hashCode()); int binCount = 0; //死循環 何時插入成功 何時跳出 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table為空的話,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根據hash值計算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果這個位置沒有值 ,直接放進去,不需要加鎖 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //當遇到表連接點時,需要進行整合表的操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //結點上鎖 這里的結點可以理解為hash值相同組成的鏈表的頭結點 synchronized (f) { if (tabAt(tab, i) == f) { //fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點 if (fh >= 0) { binCount = 1; //在這里遍歷鏈表所有的結點 for (Node<K,V> e = f;; ++binCount) { K ek; //如果hash值和key值相同 則修改對應結點的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果遍歷到了最后一個結點,那么就證明新的節點需要插入 就把它插入在鏈表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //如果這個節點是樹節點,就按照樹的方式插入值 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果鏈表長度已經達到臨界值8 就需要把鏈表轉換為樹結構 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //將當前ConcurrentHashMap的元素數量+1 addCount(1L, binCount); return null; } 

我們可以發現JDK8中的實現也是鎖分離的思想,只是鎖住的是一個Node,而不是JDK7中的Segment,而鎖住Node之前的操作是無鎖的並且也是線程安全的,建立在之前提到的3個原子操作上。

2.6.1 helpTransfer方法

這是一個協助擴容的方法。這個方法被調用的時候,當前ConcurrentHashMap一定已經有了nextTable對象,首先拿到這個nextTable對象,調用transfer方法。回看上面的transfer方法可以看到,當本線程進入擴容方法的時候會直接進入復制階段。

2.6.2 treeifyBin方法

這個方法用於將過長的鏈表轉換為TreeBin對象。但是他並不是直接轉換,而是進行一次容量判斷,如果容量沒有達到轉換的要求,直接進行擴容操作並返回;如果滿足條件才鏈表的結構抓換為TreeBin ,這與HashMap不同的是,它並沒有把TreeNode直接放入紅黑樹,而是利用了TreeBin這個小容器來封裝所有的TreeNode.

2.7 get方法

get方法比較簡單,給定一個key來確定value的時候,必須滿足兩個條件  key相同  hash值相同,對於節點可能在鏈表或樹上的情況,需要分別去查找。

public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //計算hash值 int h = spread(key.hashCode()); //根據hash值確定節點位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果搜索到的節點key與傳入的key相同且不為null,直接返回這個節點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果eh<0 說明這個節點在樹上 直接尋找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否則遍歷鏈表 找到對應的值並返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

2.8 Size相關的方法

對於ConcurrentHashMap來說,這個table里到底裝了多少東西其實是個不確定的數量,因為不可能在調用size()方法的時候像GC的“stop the world”一樣讓其他線程都停下來讓你去統計,因此只能說這個數量是個估計值。對於這個估計值,ConcurrentHashMap也是大費周章才計算出來的。

2.8.1 輔助定義

為了統計元素個數,ConcurrentHashMap定義了一些變量和一個內部類

/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } /******************************************/ /** * 實際上保存的是hashmap中的元素個數 利用CAS鎖進行更新 但它並不用返回當前hashmap的元素個數 */ private transient volatile long baseCount; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells;

2.8.2 mappingCount與Size方法

mappingCount與size方法的類似  從Java工程師給出的注釋來看,應該使用mappingCount代替size方法 兩個方法都沒有直接返回basecount 而是統計一次這個值,而這個值其實也是一個大概的數值,因此可能在統計的時候有其他線程正在執行插入或刪除操作。

public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value;//所有counter的值求和 } } return sum; }

2.8.3 addCount方法

在put方法結尾處調用了addCount方法,把當前ConcurrentHashMap的元素個數+1這個方法一共做了兩件事,更新baseCount的值,檢測是否進行擴容。

private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //如果check值大於等於0 則需要檢驗是否需要進行擴容操作 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //如果已經有其他線程在執行擴容操作 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當前線程是唯一的或是第一個發起擴容的線程 此時nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

總結

JDK6,7中的ConcurrentHashmap主要使用Segment來實現減小鎖粒度,把HashMap分割成若干個Segment,在put的時候需要鎖住Segment,get時候不加鎖,使用volatile來保證可見性,當要統計全局時(比如size),首先會嘗試多次計算modcount來確定,這幾次嘗試中,是否有其他線程進行了修改操作,如果沒有,則直接返回size。如果有,則需要依次鎖住所有的Segment來計算。

jdk7中ConcurrentHashmap中,當長度過長碰撞會很頻繁,鏈表的增改刪查操作都會消耗很長的時間,影響性能,所以jdk8 中完全重寫了concurrentHashmap,代碼量從原來的1000多行變成了 6000多 行,實現上也和原來的分段式存儲有很大的區別。

主要設計上的變化有以下幾點: 

  1. 不采用segment而采用node,鎖住node來實現減小鎖粒度。
  2. 設計了MOVED狀態 當resize的中過程中 線程2還在put數據,線程2會幫助resize。
  3. 使用3個CAS操作來確保node的一些操作的原子性,這種方式代替了鎖。
  4. sizeCtl的不同值來代表不同含義,起到了控制的作用。

至於為什么JDK8中使用synchronized而不是ReentrantLock,我猜是因為JDK8中對synchronized有了足夠的優化吧。

Reference:

1. http://www.jianshu.com/p/4806633fcc55

2. https://www.zhihu.com/question/22438589

3. http://blog.csdn.net/u010723709/article/details/48007881

 

 

CAS

CAS:Compare and Swap, 翻譯成比較並交換。 

java.util.concurrent包中借助CAS實現了區別於synchronouse同步鎖的一種樂觀鎖。

 

本文先從CAS的應用說起,再深入原理解析。

 

CAS應用

CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什么都不做。

 

非阻塞算法 (nonblocking algorithms)

一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。

現代的CPU提供了特殊的指令,可以自動更新共享數據,而且能夠檢測到其他線程的干擾,而 compareAndSet() 就用這些代替了鎖定。

拿出AtomicInteger來研究在沒有鎖的情況下是如何做到數據正確性的。

private volatile int value;

首先毫無以為,在沒有鎖的機制下可能需要借助volatile原語,保證線程間的數據是可見的(共享的)。

這樣才獲取變量的值的時候才能直接讀取。

public final int get() {
        return value;
    }

然后來看看++i是怎么做到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

在這里采用了CAS操作,每次從內存中讀取數據然后將此數據和+1后的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。

而compareAndSet利用JNI來完成CPU指令的操作。

public final boolean compareAndSet(int expect, int update) {   
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

整體的過程就是這樣子的,利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法。其它原子操作都是利用類似的特性完成的。

 

其中

unsafe.compareAndSwapInt(this, valueOffset, expect, update);

類似:

if (this == expect) {

  this = update

 return true;

} else {

return false;

}

 

那么問題就來了,成功過程中需要2個步驟:比較this == expect,替換this = update,compareAndSwapInt如何這兩個步驟的原子性呢? 參考CAS的原理。

 

CAS原理

 CAS通過調用JNI的代碼實現的。JNI:Java Native Interface為JAVA本地調用,允許java調用其他語言。

而compareAndSwapInt就是借助C來調用CPU底層指令實現的。

下面從分析比較常用的CPU(intel x86)來解釋CAS的實現原理。

 下面是sun.misc.Unsafe類的compareAndSwapInt()方法的源代碼:

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

 

可以看到這是個本地方法調用。這個本地方法在openjdk中依次調用的c++代碼為:unsafe.cpp,atomic.cpp和atomicwindowsx86.inline.hpp。這個本地方法的最終實現在openjdk的如下位置:openjdk-7-fcs-src-b147-27jun2011\openjdk\hotspot\src\oscpu\windowsx86\vm\ atomicwindowsx86.inline.hpp(對應於windows操作系統,X86處理器)。下面是對應於intel x86處理器的源代碼的片段:

 

// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0  \
                       __asm je L0      \
                       __asm _emit 0xF0 \
                       __asm L0:

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

如上面源代碼所示,程序會根據當前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(lock cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身會維護單處理器內的順序一致性,不需要lock前綴提供的內存屏障效果)。

 

 intel的手冊對lock前綴的說明如下:

  1. 確保對內存的讀-改-寫操作原子執行。在Pentium及Pentium之前的處理器中,帶有lock前綴的指令在執行期間會鎖住總線,使得其他處理器暫時無法通過總線訪問內存。很顯然,這會帶來昂貴的開銷。從Pentium 4,Intel Xeon及P6處理器開始,intel在原有總線鎖的基礎上做了一個很有意義的優化:如果要訪問的內存區域(area of memory)在lock前綴指令執行期間已經在處理器內部的緩存中被鎖定(即包含該內存區域的緩存行當前處於獨占或以修改狀態),並且該內存區域被完全包含在單個緩存行(cache line)中,那么處理器將直接執行該指令。由於在指令執行期間該緩存行會一直被鎖定,其它處理器無法讀/寫該指令要訪問的內存區域,因此能保證指令執行的原子性。這個操作過程叫做緩存鎖定(cache locking),緩存鎖定將大大降低lock前綴指令的執行開銷,但是當多處理器之間的競爭程度很高或者指令訪問的內存地址未對齊時,仍然會鎖住總線。
  2. 禁止該指令與之前和之后的讀和寫指令重排序。
  3. 把寫緩沖區中的所有數據刷新到內存中。

備注知識:

關於CPU的鎖有如下3種:

  3.1 處理器自動保證基本內存操作的原子性

  首先處理器會自動保證基本的內存操作的原子性。處理器保證從系統內存當中讀取或者寫入一個字節是原子的,意思是當一個處理器讀取一個字節時,其他處理器不能訪問這個字節的內存地址。奔騰6和最新的處理器能自動保證單處理器對同一個緩存行里進行16/32/64位的操作是原子的,但是復雜的內存操作處理器不能自動保證其原子性,比如跨總線寬度,跨多個緩存行,跨頁表的訪問。但是處理器提供總線鎖定和緩存鎖定兩個機制來保證復雜內存操作的原子性。 

  3.2 使用總線鎖保證原子性

  第一個機制是通過總線鎖保證原子性。如果多個處理器同時對共享變量進行讀改寫(i++就是經典的讀改寫操作)操作,那么共享變量就會被多個處理器同時進行操作,這樣讀改寫操作就不是原子的,操作完之后共享變量的值會和期望的不一致,舉個例子:如果i=1,我們進行兩次i++操作,我們期望的結果是3,但是有可能結果是2。如下圖

 

 

  原因是有可能多個處理器同時從各自的緩存中讀取變量i,分別進行加一操作,然后分別寫入系統內存當中。那么想要保證讀改寫共享變量的操作是原子的,就必須保證CPU1讀改寫共享變量的時候,CPU2不能操作緩存了該共享變量內存地址的緩存。

  處理器使用總線鎖就是來解決這個問題的。所謂總線鎖就是使用處理器提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其他處理器的請求將被阻塞住,那么該處理器可以獨占使用共享內存。

  3.3 使用緩存鎖保證原子性

  第二個機制是通過緩存鎖定保證原子性。在同一時刻我們只需保證對某個內存地址的操作是原子性即可,但總線鎖定把CPU和內存之間通信鎖住了,這使得鎖定期間,其他處理器不能操作其他內存地址的數據,所以總線鎖定的開銷比較大,最近的處理器在某些場合下使用緩存鎖定代替總線鎖定來進行優化。

  頻繁使用的內存會緩存在處理器的L1,L2和L3高速緩存里,那么原子操作就可以直接在處理器內部緩存中進行,並不需要聲明總線鎖,在奔騰6和最近的處理器中可以使用“緩存鎖定”的方式來實現復雜的原子性。所謂“緩存鎖定”就是如果緩存在處理器緩存行中內存區域在LOCK操作期間被鎖定,當它執行鎖操作回寫內存時,處理器不在總線上聲言LOCK#信號,而是修改內部的內存地址,並允許它的緩存一致性機制來保證操作的原子性,因為緩存一致性機制會阻止同時修改被兩個以上處理器緩存的內存區域數據,當其他處理器回寫已被鎖定的緩存行的數據時會起緩存行無效,在例1中,當CPU1修改緩存行中的i時使用緩存鎖定,那么CPU2就不能同時緩存了i的緩存行。

  但是有兩種情況下處理器不會使用緩存鎖定。第一種情況是:當操作的數據不能被緩存在處理器內部,或操作的數據跨多個緩存行(cache line),則處理器會調用總線鎖定。第二種情況是:有些處理器不支持緩存鎖定。對於Inter486和奔騰處理器,就算鎖定的內存區域在處理器的緩存行中也會調用總線鎖定。

  以上兩個機制我們可以通過Inter處理器提供了很多LOCK前綴的指令來實現。比如位測試和修改指令BTS,BTR,BTC,交換指令XADD,CMPXCHG和其他一些操作數和邏輯指令,比如ADD(加),OR(或)等,被這些指令操作的內存區域就會加鎖,導致其他處理器不能同時訪問它。

 

CAS缺點

 CAS雖然很高效的解決原子操作,但是CAS仍然存在三大問題。ABA問題,循環時間長開銷大和只能保證一個共享變量的原子操作

1.  ABA問題。因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。

從Java1.5開始JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等於預期引用,並且當前標志是否等於預期標志,如果全部相等,則以原子方式將該引用和該標志的值設置為給定的更新值。

關於ABA問題參考文檔: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html

2. 循環時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支持處理器提供的pause指令那么效率會有一定的提升,pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因內存順序沖突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執行效率。

 

3. 只能保證一個共享變量的原子操作。當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合並一下ij=2a,然后用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你可以把多個變量放在一個對象里來進行CAS操作。

 

 

concurrent包的實現

由於java的CAS同時具有 volatile 讀和volatile寫的內存語義,因此Java線程之間的通信現在有了下面四種方式:

  1. A線程寫volatile變量,隨后B線程讀這個volatile變量。
  2. A線程寫volatile變量,隨后B線程用CAS更新這個volatile變量。
  3. A線程用CAS更新一個volatile變量,隨后B線程用CAS更新這個volatile變量。
  4. A線程用CAS更新一個volatile變量,隨后B線程讀這個volatile變量。

Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對內存執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支持原子性讀-改-寫指令的計算機器,是順序計算圖靈機的異步等價機器,因此任何現代的多處理器都會去支持某種能對內存執行原子性讀-改-寫操作的原子指令)。同時,volatile變量的讀/寫和CAS可以實現線程之間的通信。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式:

  1. 首先,聲明共享變量為volatile;
  2. 然后,使用CAS的原子條件更新來實現線程之間的同步;
  3. 同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內存語義來實現線程之間的通信。

AQS,非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent包的實現示意圖如下:

 

 

非阻塞同步算法與CAS(Compare and Swap)無鎖算法

鎖(lock)的代價

鎖是用來做並發最簡單的方式,當然其代價也是最高的。內核態的鎖的時候需要操作系統進行一次上下文切換,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,等待鎖的線程會被掛起直至鎖釋放。在上下文切換的時候,cpu之前緩存的指令和數據都將失效,對性能有很大的損失。操作系統對多線程的鎖進行判斷就像兩姐妹在為一個玩具在爭吵,然后操作系統就是能決定他們誰能拿到玩具的父母,這是很慢的。用戶態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

Java在JDK1.5之前都是靠synchronized關鍵字保證同步的,這種通過使用一致的鎖定協議來協調對共享狀態的訪問,可以確保無論哪個線程持有守護變量的鎖,都采用獨占的方式來訪問這些變量,如果出現多個線程同時訪問鎖,那第一些線線程將被掛起,當線程恢復執行時,必須等待其它線程執行完他們的時間片以后才能被調度執行,在掛起和恢復執行過程中存在着很大的開銷。鎖還存在着其它一些缺點,當一個線程正在等待鎖時,它不能做任何事。如果一個線程在持有鎖的情況下被延遲執行,那么所有需要這個鎖的線程都無法執行下去。如果被阻塞的線程優先級高,而持有鎖的線程優先級低,將會導致優先級反轉(Priority Inversion)。

樂觀鎖與悲觀鎖

獨占鎖是一種悲觀鎖,synchronized就是一種獨占鎖,它假設最壞的情況,並且只有在確保其它線程不會造成干擾的情況下執行,會導致其它所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。

volatile的問題

與鎖相比,volatile變量是一和更輕量級的同步機制,因為在使用這些變量時不會發生上下文切換和線程調度等操作,但是volatile變量也存在一些局限:不能用於構建原子的復合操作,因此當一個變量依賴舊值時就不能使用volatile變量。(參考:談談volatiile

volatile只能保證變量對各個線程的可見性,但不能保證原子性。為什么?見我的另外一篇文章:《為什么volatile不能保證原子性而Atomic可以?

Java中的原子操作( atomic operations)

原子操作指的是在一步之內就完成而且不能被中斷。原子操作在多線程環境中是線程安全的,無需考慮同步的問題。在java中,下列操作是原子操作:

  • all assignments of primitive types except for long and double
  • all assignments of references
  • all operations of java.concurrent.Atomic* classes
  • all assignments to volatile longs and doubles

問題來了,為什么long型賦值不是原子操作呢?例如:

1
long  foo = 65465498L;

實時上java會分兩步寫入這個long變量,先寫32位,再寫后32位。這樣就線程不安全了。如果改成下面的就線程安全了:

1
private  volatile  long  foo;

因為volatile內部已經做了synchronized.

CAS無鎖算法

要實現無鎖(lock-free)的非阻塞算法有多種實現方法,其中CAS(比較與交換,Compare and swap)是一種有名的無鎖算法。CAS, CPU指令,在大多數處理器架構,包括IA32、Space中采用的都是CAS指令,CAS的語義是“我認為V的值應該為A,如果是,那么將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什么都不做。CAS無鎖算法的C實現如下:

1
2
3
4
5
6
7
8
9
int  compare_and_swap ( int * reg, int  oldval, int  newval)
{
   ATOMIC();
   int  old_reg_val = *reg;
   if  (old_reg_val == oldval)
      *reg = newval;
   END_ATOMIC();
   return  old_reg_val;
}

CAS(樂觀鎖算法)的基本假設前提

CAS比較與交換的偽代碼可以表示為:

do{   
       備份舊數據;  
       基於舊數據構造新數據;  
}while(!CAS( 內存地址,備份的舊數據,新數據 ))  

ConcurrencyCAS 

(上圖的解釋:CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。)

就是指當兩者進行比較時,如果相等,則證明共享數據沒有被修改,替換成新值,然后繼續往下運行;如果不相等,說明共享數據已經被修改,放棄已經所做的操作,然后重新執行剛才的操作。容易看出 CAS 操作是基於共享數據不會被修改的假設,采用了類似於數據庫的 commit-retry 的模式。當同步沖突出現的機會很少時,這種假設能帶來較大的性能提升。

CAS的開銷(CPU Cache Miss problem)

前面說過了,CAS(比較並交換)是CPU指令級的操作,只有一步原子操作,所以非常快。而且CAS避免了請求操作系統來裁定鎖的問題,不用麻煩操作系統,直接在CPU內部就搞定了。但CAS就沒有開銷了嗎?不!有cache miss的情況。這個問題比較復雜,首先需要了解CPU的硬件體系結構:

2014-02-19_11h35_45

上圖可以看到一個8核CPU計算機系統,每個CPU有cache(CPU內部的高速緩存,寄存器),管芯內還帶有一個互聯模塊,使管芯內的兩個核可以互相通信。在圖中央的系統互聯模塊可以讓四個管芯相互通信,並且將管芯與主存連接起來。數據以“緩存線”為單位在系統中傳輸,“緩存線”對應於內存中一個 2 的冪大小的字節塊,大小通常為 32 到 256 字節之間。當 CPU 從內存中讀取一個變量到它的寄存器中時,必須首先將包含了該變量的緩存線讀取到 CPU 高速緩存。同樣地,CPU 將寄存器中的一個值存儲到內存時,不僅必須將包含了該值的緩存線讀到 CPU 高速緩存,還必須確保沒有其他 CPU 擁有該緩存線的拷貝。

比如,如果 CPU0 在對一個變量執行“比較並交換”(CAS)操作,而該變量所在的緩存線在 CPU7 的高速緩存中,就會發生以下經過簡化的事件序列:

  • CPU0 檢查本地高速緩存,沒有找到緩存線。
  • 請求被轉發到 CPU0 和 CPU1 的互聯模塊,檢查 CPU1 的本地高速緩存,沒有找到緩存線。
  • 請求被轉發到系統互聯模塊,檢查其他三個管芯,得知緩存線被 CPU6和 CPU7 所在的管芯持有。
  • 請求被轉發到 CPU6 和 CPU7 的互聯模塊,檢查這兩個 CPU 的高速緩存,在 CPU7 的高速緩存中找到緩存線。
  • CPU7 將緩存線發送給所屬的互聯模塊,並且刷新自己高速緩存中的緩存線。
  • CPU6 和 CPU7 的互聯模塊將緩存線發送給系統互聯模塊。
  • 系統互聯模塊將緩存線發送給 CPU0 和 CPU1 的互聯模塊。
  • CPU0 和 CPU1 的互聯模塊將緩存線發送給 CPU0 的高速緩存。
  • CPU0 現在可以對高速緩存中的變量執行 CAS 操作了

以上是刷新不同CPU緩存的開銷。最好情況下的 CAS 操作消耗大概 40 納秒,超過 60 個時鍾周期。這里的“最好情況”是指對某一個變量執行 CAS 操作的 CPU 正好是最后一個操作該變量的CPU,所以對應的緩存線已經在 CPU 的高速緩存中了,類似地,最好情況下的鎖操作(一個“round trip 對”包括獲取鎖和隨后的釋放鎖)消耗超過 60 納秒,超過 100 個時鍾周期。這里的“最好情況”意味着用於表示鎖的數據結構已經在獲取和釋放鎖的 CPU 所屬的高速緩存中了。鎖操作比 CAS 操作更加耗時,是因深入理解並行編程 
為鎖操作的數據結構中需要兩個原子操作。緩存未命中消耗大概 140 納秒,超過 200 個時鍾周期。需要在存儲新值時查詢變量的舊值的 CAS 操作,消耗大概 300 納秒,超過 500 個時鍾周期。想想這個,在執行一次 CAS 操作的時間里,CPU 可以執行 500 條普通指令。這表明了細粒度鎖的局限性。

以下是cache miss cas 和lock的性能對比:

2014-02-19_11h43_23

JVM對CAS的支持:AtomicInt, AtomicLong.incrementAndGet()

在JDK1.5之前,如果不編寫明確的代碼就無法執行CAS操作,在JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操作,並且JVM把它們編譯為底層硬件提供的最有效的方法,在運行CAS的平台上,運行時把它們編譯為相應的機器指令,如果處理器/CPU不支持CAS指令,那么JVM將使用自旋鎖。因此,值得注意的是,CAS解決方案與平台/編譯器緊密相關(比如x86架構下其對應的匯編指令是lock cmpxchg,如果想要64Bit的交換,則應使用lock cmpxchg8b。在.NET中我們可以使用Interlocked.CompareExchange函數)。

在原子類變量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支持為數字類型的引用類型提供一種高效的CAS操作,而在java.util.concurrent中的大多數類在實現時都直接或間接的使用了這些原子變量類。

Java 1.6中AtomicLong.incrementAndGet()的實現源碼為:

由此可見,AtomicLong.incrementAndGet的實現用了樂觀鎖技術,調用了sun.misc.Unsafe類庫里面的 CAS算法,用CPU指令來實現無鎖自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的鎖效率倍增。

1
2
3
4
5
6
7
8
9
10
11
12
public  final  int  getAndIncrement() { 
         for  (;;) { 
             int  current = get(); 
             int  next = current + 1
             if  (compareAndSet(current, next)) 
                 return  current; 
        
   
public  final  boolean  compareAndSet( int  expect, int  update) { 
     return  unsafe.compareAndSwapInt( this , valueOffset, expect, update); 
}

下面是測試代碼:可以看到用AtomicLong.incrementAndGet的性能比用synchronized高出幾倍。

2014-02-12_14h56_39

CAS的例子:非阻塞堆棧

下面是比非阻塞自增稍微復雜一點的CAS的例子:非阻塞堆棧/ConcurrentStack 。ConcurrentStack 中的 push() 和pop() 操作在結構上與NonblockingCounter 上相似,只是做的工作有些冒險,希望在 “提交” 工作的時候,底層假設沒有失效。push() 方法觀察當前最頂的節點,構建一個新節點放在堆棧上,然后,如果最頂端的節點在初始觀察之后沒有變化,那么就安裝新節點。如果 CAS 失敗,意味着另一個線程已經修改了堆棧,那么過程就會重新開始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public  class  ConcurrentStack<E> {
     AtomicReference<Node<E>> head = new  AtomicReference<Node<E>>();
     public  void  push(E item) {
         Node<E> newHead = new  Node<E>(item);
         Node<E> oldHead;
         do  {
             oldHead = head.get();
             newHead.next = oldHead;
         } while  (!head.compareAndSet(oldHead, newHead));
     }
     public  E pop() {
         Node<E> oldHead;
         Node<E> newHead;
         do  {
             oldHead = head.get();
             if  (oldHead == null )
                 return  null ;
             newHead = oldHead.next;
         } while  (!head.compareAndSet(oldHead,newHead));
         return  oldHead.item;
     }
     static  class  Node<E> {
         final  E item;
         Node<E> next;
         public  Node(E item) { this .item = item; }
     }
}

在輕度到中度的爭用情況下,非阻塞算法的性能會超越阻塞算法,因為 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及線程掛起和上下文切換,只多了幾個循環迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多(這句話肯定是真的,因為沒有爭用的鎖涉及 CAS 加上額外的處理),而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲。

在高度爭用的情況下(即有多個線程不斷爭用一個內存位置的時候),基於鎖的算法開始提供比非阻塞算法更好的吞吐率,因為當線程阻塞時,它就會停止爭用,耐心地等候輪到自己,從而避免了進一步爭用。但是,這么高的爭用程度並不常見,因為多數時候,線程會把線程本地的計算與爭用共享數據的操作分開,從而給其他線程使用共享數據的機會。

CAS的例子3:非阻塞鏈表

以上的示例(自增計數器和堆棧)都是非常簡單的非阻塞算法,一旦掌握了在循環中使用 CAS,就可以容易地模仿它們。對於更復雜的數據結構,非阻塞算法要比這些簡單示例復雜得多,因為修改鏈表、樹或哈希表可能涉及對多個指針的更新。CAS 支持對單一指針的原子性條件更新,但是不支持兩個以上的指針。所以,要構建一個非阻塞的鏈表、樹或哈希表,需要找到一種方式,可以用 CAS 更新多個指針,同時不會讓數據結構處於不一致的狀態。

在鏈表的尾部插入元素,通常涉及對兩個指針的更新:“尾” 指針總是指向列表中的最后一個元素,“下一個” 指針從過去的最后一個元素指向新插入的元素。因為需要更新兩個指針,所以需要兩個 CAS。在獨立的 CAS 中更新兩個指針帶來了兩個需要考慮的潛在問題:如果第一個 CAS 成功,而第二個 CAS 失敗,會發生什么?如果其他線程在第一個和第二個 CAS 之間企圖訪問鏈表,會發生什么?

對於非復雜數據結構,構建非阻塞算法的 “技巧” 是確保數據結構總處於一致的狀態(甚至包括在線程開始修改數據結構和它完成修改之間),還要確保其他線程不僅能夠判斷出第一個線程已經完成了更新還是處在更新的中途,還能夠判斷出如果第一個線程走向 AWOL,完成更新還需要什么操作。如果線程發現了處在更新中途的數據結構,它就可以 “幫助” 正在執行更新的線程完成更新,然后再進行自己的操作。當第一個線程回來試圖完成自己的更新時,會發現不再需要了,返回即可,因為 CAS 會檢測到幫助線程的干預(在這種情況下,是建設性的干預)。

這種 “幫助鄰居” 的要求,對於讓數據結構免受單個線程失敗的影響,是必需的。如果線程發現數據結構正處在被其他線程更新的中途,然后就等候其他線程完成更新,那么如果其他線程在操作中途失敗,這個線程就可能永遠等候下去。即使不出現故障,這種方式也會提供糟糕的性能,因為新到達的線程必須放棄處理器,導致上下文切換,或者等到自己的時間片過期(而這更糟)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public  class  LinkedQueue <E> {
     private  static  class  Node <E> {
         final  E item;
         final  AtomicReference<Node<E>> next;
         Node(E item, Node<E> next) {
             this .item = item;
             this .next = new  AtomicReference<Node<E>>(next);
         }
     }
     private  AtomicReference<Node<E>> head
         = new  AtomicReference<Node<E>>( new  Node<E>( null , null ));
     private  AtomicReference<Node<E>> tail = head;
     public  boolean  put(E item) {
         Node<E> newNode = new  Node<E>(item, null );
         while  ( true ) {
             Node<E> curTail = tail.get();
             Node<E> residue = curTail.next.get();
             if  (curTail == tail.get()) {
                 if  (residue == null ) /* A */  {
                     if  (curTail.next.compareAndSet( null , newNode)) /* C */  {
                         tail.compareAndSet(curTail, newNode) /* D */  ;
                         return  true ;
                     }
                 } else  {
                     tail.compareAndSet(curTail, residue) /* B */ ;
                 }
             }
         }
     }
}

具體算法相見IBM Developerworks

Java的ConcurrentHashMap的實現原理

Java5中的ConcurrentHashMap,線程安全,設計巧妙,用桶粒度的鎖,避免了put和get中對整個map的鎖定,尤其在get中,只對一個HashEntry做鎖定操作,性能提升是顯而易見的。

8aea11a8-4184-3f1f-aba7-169aa5e0797a

具體實現中使用了鎖分離機制,在這個帖子中有非常詳細的討論。這里有關於Java內存模型結合ConcurrentHashMap的分析。以下是JDK6的ConcurrentHashMap的源碼:

Java的ConcurrentLinkedQueue實現方法

ConcurrentLinkedQueue也是同樣使用了CAS指令,但其性能並不高因為太多CAS操作。其源碼如下:

高並發環境下優化鎖或無鎖(lock-free)的設計思路

服務端編程的3大性能殺手:1、大量線程導致的線程切換開銷。2、鎖。3、非必要的內存拷貝。在高並發下,對於純內存操作來說,單線程是要比多線程快的, 可以比較一下多線程程序在壓力測試下cpu的sy和ni百分比。高並發環境下要實現高吞吐量和線程安全,兩個思路:一個是用優化的鎖實現,一個是lock-free的無鎖結構。但非阻塞算法要比基於鎖的算法復雜得多。開發非阻塞算法是相當專業的訓練,而且要證明算法的正確也極為困難,不僅和具體的目標機器平台和編譯器相關,而且需要復雜的技巧和嚴格的測試。雖然Lock-Free編程非常困難,但是它通常可以帶來比基於鎖編程更高的吞吐量。所以Lock-Free編程是大有前途的技術。它在線程中止、優先級倒置以及信號安全等方面都有着良好的表現。

  • 優化鎖實現的例子:Java中的ConcurrentHashMap,設計巧妙,用桶粒度的鎖和鎖分離機制,避免了put和get中對整個map的鎖定,尤其在get中,只對一個HashEntry做鎖定操作,性能提升是顯而易見的(詳細分析見《探索 ConcurrentHashMap 高並發性的實現機制》)。
  • Lock-free無鎖的例子:CAS(CPU的Compare-And-Swap指令)的利用和LMAX的disruptor無鎖消息隊列數據結構等。有興趣了解LMAX的disruptor無鎖消息隊列數據結構的可以移步slideshare

disruptor無鎖消息隊列數據結構的類圖和技術文檔下載

2014-02-12_16h55_36

另外,在設計思路上除了盡量減少資源爭用以外,還可以借鑒nginx/node.js等單線程大循環的機制,用單線程或CPU數相同的線程開辟大的隊列,並發的時候任務壓入隊列,線程輪詢然后一個個順序執行。由於每個都采用異步I/O,沒有阻塞線程。這個大隊列可以使用RabbitMQueue,或是JDK的同步隊列(性能稍差),或是使用Disruptor無鎖隊列(Java)。任務處理可以全部放在內存(多級緩存、讀寫分離、ConcurrentHashMap、甚至分布式緩存Redis)中進行增刪改查。最后用Quarz維護定時把緩存數據同步到DB中。當然,這只是中小型系統的思路,如果是大型分布式系統會非常復雜,需要分而治理,用SOA的思路,參考這篇文章的圖。(注:Redis是單線程的純內存數據庫,單線程無需鎖,而Memcache是多線程的帶CAS算法,兩者都使用epoll,no-blocking io)

png;base643f17317a5d7e7fe9

深入JVM的OS的無鎖非阻塞算法

如果深入 JVM 和操作系統,會發現非阻塞算法無處不在。垃圾收集器使用非阻塞算法加快並發和平行的垃圾搜集;調度器使用非阻塞算法有效地調度線程和進程,實現內在鎖。在 Mustang(Java 6.0)中,基於鎖的SynchronousQueue 算法被新的非阻塞版本代替。很少有開發人員會直接使用 SynchronousQueue,但是通過Executors.newCachedThreadPool() 工廠構建的線程池用它作為工作隊列。比較緩存線程池性能的對比測試顯示,新的非阻塞同步隊列實現提供了幾乎是當前實現 3 倍的速度。在 Mustang 的后續版本(代碼名稱為 Dolphin)中,已經規划了進一步的改進。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM