目標:假定我們要定義一個類似於HashMap的數組結構,該數據結構要確保即使在高並發多次初始化的背景下,具體存儲的數組的初始化仍然是正確的。我們對這個結構可以簡化一些,先考慮其元素的存儲
實現1:實現一個線程不安全的容器
class TableHolder { static final int DEFAULT_CAPACITY = 16; volatile Element[] table; volatile int sizeCtl; // 當前數組的大小, 默認為0 public void initializeTable() { int sc = sizeCtl; if (sc == 0) {
System.out.pritln("initializing the table"); sc = sizeCtl = DEFAULT_CAPACITY; table = new Element[sc]; } } } class Element { int key; int val; }
上述代碼塊為最為常見的初始化模式為什么說上述實現時不安全呢?請看
@Test public void how2InitializeATableSafely() throws InterruptedException { TableHolder th = new TableHolder(); List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < 50; i++) { Thread t = new Thread(() -> { th.initializeTable(); System.out.println(th.sizeCtl); }); threadList.add(t); t.start(); } for (Thread thread : threadList) { thread.join(); } }
通過上述方法測試發現,初始化Element數組的關鍵區域被多次調用
實現2:實現一個重量級鎖的容器
為解決實現1出現的 table 數組被多次初始化的問題,作為直接的方式是在外層方法直接加鎖,如下:
class TableHolder { static final int DEFAULT_CAPACITY = 16; volatile Element[] table; volatile int sizeCtl; // 當前數組的大小, 默認為0 public synchronized void initializeTable() { int sc = sizeCtl; if (sc == 0) { sc = sizeCtl = DEFAULT_CAPACITY; table = new Element[sc]; } } }
此時再借助我們的 how2InitializeTableSafely 方法我們可以正確的初始化 table 數組了。但有個問題,采用 synchronized 加鎖的方法,性能較差。為此我們引入了第三種,采用CAS的方式進行table的初始化
實現3:實現一個相對高效的容器
相對於實現2中粗暴的直接對方法加鎖的方式,我們可以降低粒度,對控制數組大小的一個屬性 sizeCtl 進行 cas 操作,進而避免對熱區初始化代碼的多次進入
1 class TableHolder { 2 static final int DEFAULT_CAPACITY = 16; 3 volatile Element[] table; 4 volatile int sizeCtl; // 當前數組的大小, 默認為0 5 public void initializeTable() { 6 int sc; 7 8 while (table == null || table.length == 0) { 9 System.out.println("table is 0, go to initialize"); 10 if ((sc = sizeCtl) < 0) { 11 System.err.println("lost race, cause another thread is initializing the table"); 12 Thread.yield(); 13 } 14 else if (U.compareAndSwapInt(this, SIZE_CTL_OFFSET, sc, -1)) { 15 if (table == null || table.length == 0) { 16 int n = sc > 0 ? sc : DEFAULT_CAPACITY; 17 System.out.println("win race, initializing the table of size: " + n); 18 table = new Element[n]; 19 sc = n - (n >>> 2); 20 } 21 sizeCtl = sc; 22 break; 23 } 24 } 25 } 26 27 static Long SIZE_CTL_OFFSET; 28 static final Unsafe U; 29 static { 30 try { 31 U = getUnsafe(); 32 Class<?> k = UnsafeCASTest.TableHolder.class; 33 SIZE_CTL_OFFSET = U.objectFieldOffset(k.getDeclaredField("sizeCtl")); 34 } catch (NoSuchFieldException | IllegalAccessException e) { 35 throw new Error(e); 36 } 37 } 38 39 static Unsafe getUnsafe() throws NoSuchFieldException, IllegalAccessException { 40 Field f = Unsafe.class.getDeclaredField("theUnsafe"); 41 f.setAccessible(true); 42 return (Unsafe) f.get(null); 43 } 44 }
我們可以看到控制初始化代碼的關鍵在於第 14 行,多個線程會在這里競爭更新 sizeCtl 這個 volatile 變量。
如果可以原子化的更新 sizeCtl,則獲得了15-22 這一段熱區代碼的執行權,初始化 table 數組(初始化期間,sizeCtl 為 -1),退出循環。
沒有獲得熱區代碼執行權的線程會循環判斷 table 是否初始化,並爭取 sizeCtl 更新的所有權 (sizeCtl 為 volatile 變量,其他線程對該變量的更改是對當前執行線程可見的)。
如果某個線程可一個成功的更新 sizeCtl(獲得15-22執行權后),此時我們還要額外的判斷一下 table 是否已被初始化(見15),確保 table 不會被初始化多次
