java中並發包簡要分析01


參考《分布式java應用》一書,簡單過一遍並發包(java.util.concurrent)

 

 

ConcurrentHashMap

ConcurrentHashMap是線程安全的HashMap的實現。

1)添加

put(Object key , Object value)

ConcurrentHashMap並沒有采用synchronized進行控制,而是使用了ReentrantLock。

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

這里計算出key的hash值,根據hash值獲取對應的數組中的segment對象。接下來的工作都交由segment完成。

segment可以看成是HashMap的一個部分,(ConcurrentHashMap基於concurrencyLevel划分出了多個segment來對key-value進行存儲)每次操作都只對當前segment進行鎖定,從而避免每次put操作鎖住整個map。

V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

這個方法進來就上鎖(lock),並在finally中確保釋放鎖(unlock)。

添加key-value的過程中,先判斷當前存儲對象個數加1后是否大於threshold,如果大於則進行擴容(對象數組擴大兩倍,進行重新hash,轉移到新數組)。

如果不大於,則進行后續操作。通過對hash值和對象數組大小減1的值進行按位與操作(取余),得到當前key需要放入數組的位置,接着尋找對應位置上的hashEntry對象鏈表,並進行遍歷。

如果找到相同key值的Entry,則替換該Entry對象的value。

如果沒有找到就創建一個Entry對象,賦值給對應位置的數組對象,並構成鏈表。

注意:采用segment這種方式,在並發操作過程中,可以在很多程度上減少阻塞現象。

 

2)刪除

remove(Object key)

public V remove(Object key) {
	int hash = hash(key.hashCode());
        return segmentFor(hash).remove(key, hash, null);
    }

和put類似,刪除也要根據hash先獲得segment,然后在segment上執行remove操作。

V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

segment的remove操作,首先加鎖,然后對hash值與數組大小減1的值按位與操作,得到數組對應位置上的HashEntry對象,接下來遍歷此鏈表,查找hash值相等並且key相等(equals)的對象。

如果沒有找到,返回null,釋放鎖。

如果找到了,則重新創建位於刪除元素之前的所有HashEntry,位於其后的不用處理。釋放鎖!

 

3)獲取

get(Object key)

直接看看segment中的get操作,如下:

 V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

可以看出並沒有加鎖操作,只有v==null時,進入readValueUnderLock才有加鎖操作。

這里假設一種情況,例如兩條線程a、b,a執行get操作,b執行put操作。

當a執行到getFirst,與當前數組長度減1按位與操作后得到指定位置index,此時cpu將執行權交給b,b線程put一對key-value,導致擴容並重新hash排列,然后cpu又將執行權還給a,a然后根據之前的index去獲取HashEntry就會發生問題。

當然這種情況發生的概率很小。

 

4)遍歷

其實這個過程和讀取過程類似,讀取所有分段中的數據即可。

 

ConcurrentHashMap默認情況下采用將數據分為16個段進行存儲,並且每個段各自擁有自己的鎖,鎖僅用於put和remove等改變集合對象的操作,基於voliate及hashEntry鏈表的不變性實現讀取的不加鎖。

這些方式使得ConcurrentHashMap能夠保持極好的並發操作,尤其是對於讀遠比插入和刪除頻繁的map而言,而它采用的這些方法也可謂是對於java內存模型、並發機制深刻掌握的體現,是一個設計得非常不錯的支持高並發的集合對象。

——摘自《分布式java應用》

 

CopyOnWriteArrayList

CopyOnWriteArrayList是一個線程安全、並且在讀操作時無鎖的ArrayList。

1)添加

add(E e)

public boolean add(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    Object[] elements = getArray();
	    int len = elements.length;
	    Object[] newElements = Arrays.copyOf(elements, len + 1);//復制數組
	    newElements[len] = e;//添加到末尾
	    setArray(newElements);
	    return true;
	} finally {
	    lock.unlock();
	}
    }

這里同樣沒有使用synchronized關鍵字,而是使用ReentrantLock。

和ArrayList不同的是,這里每次都會創建一個新的object數組,大小比之前數組大1。將之前的數組復制到新數組,並將新加入的元素加到數組末尾。

 

2)刪除

remove(Object o)

public boolean remove(Object o) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    Object[] elements = getArray();
	    int len = elements.length;
	    if (len != 0) {
		// Copy while searching for element to remove
		// This wins in the normal case of element being present
		int newlen = len - 1;
		Object[] newElements = new Object[newlen];//新建數組

		for (int i = 0; i < newlen; ++i) {
		    if (eq(o, elements[i])) {
			// found one;  copy remaining and exit
			for (int k = i + 1; k < len; ++k)
			    newElements[k-1] = elements[k];
			setArray(newElements);
			return true;
		    } else
			newElements[i] = elements[i];
		}

		// special handling for last cell
		if (eq(o, elements[newlen])) {
		    setArray(newElements);
		    return true;
		}
	    }
	    return false;
	} finally {
	    lock.unlock();
	}
    }

此方法為什么這么直接進行數組的復制呢?為何不適用system的arrayCopy來完成?

 

3)獲取

get(int index)

public E get(int index) {
        return (E)(getArray()[index]);
    }

這里有可能臟讀。但是銷量非常高。

//通過看集合包和並發包可以看出一些不同的編程思路。這里為什么就不事先做范圍的檢查?

 

從上可見,CopyOnWriteArrayList基於ReentrantLock保證了增加元素和刪除元素動作的互斥。在讀操作上沒有任何鎖,這樣就保證了讀的性能,帶來的副作用是有時候可能會讀取到臟數據。

 

CopyOnWriteArraySet

CopyOnWriteArraySet是基於CopyOnWriteArrayList的,可以知道set是不容許重復數據的,因此add操作和CopyOnWriteArrayList有所區別,他是調用CopyOnWriteArrayList的addIfAbsent方法。

 public boolean addIfAbsent(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    // Copy while checking if already present.
	    // This wins in the most common case where it is not present
	    Object[] elements = getArray();
	    int len = elements.length;
	    Object[] newElements = new Object[len + 1];
	    for (int i = 0; i < len; ++i) {
		if (eq(e, elements[i])) //如果存在,直接返回!
		    return false; // exit, throwing away copy
		else
		    newElements[i] = elements[i];
	    }
	    newElements[len] = e;
	    setArray(newElements);
	    return true;
	} finally {
	    lock.unlock();
	}
    }

由此可見,addIfAbsent需要每次都遍歷,在add方面,CopyOnWriteArraySet效率要比CopyOnWriteArrayList低一點。

 

ArrayBlockingQueue

ArrayBlockingQueue是一個基於數組、先進先出、線程安全的集合類,其特點是實現指定時間的阻塞讀寫,並且容量是可以限制的。

1)創建

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

初始化鎖和兩個鎖上的Condition,一個為notEmpty,一個為notFull。

 

2)添加

offer(E e , long timeout , TimeUtil unit)

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
	long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != items.length) {
                    insert(e);
                    return true;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }

這個方法將元素插入數組的末尾,如果數組滿,則進入等待,只到以下三種情況發生才繼續:

被喚醒、達到指定的時間、當前線程被中斷。

該方法首先將等待時間轉換成納秒。然后加鎖,如果數組未滿,則在末尾插入數據,如果數組已滿,則調用notFull.awaitNanos進行等待。如果被喚醒或超時,重新判斷是否滿。如果線程被interrupt,則直接拋出異常。

 

另外一個不帶時間的offer方法在數組滿的情況下不進去等待,而是直接返回false。

 public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

同時還可以選擇put方法,此方法在數組已滿的情況下會一直等待,知道數組不為空或線程被interrupt。

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }

 

3)獲取

poll(long timeout, TimeUnit unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != 0) {
                    E x = extract();
                    return x;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    }

poll獲取隊列中的第一個元素,如果隊列中沒有元素,則進入等待。

poll首先將制定timeout轉換成納秒,然后加鎖,如果數組個數不為0,則從當前對象數組中獲取最后一個元素,在獲取后將位置上的元素置為null。

如果數組中的元素個數為0,首先判斷timeout是否小於等於0,若小於0則直接返回null。若大於則進行等待,如果被喚醒或者超時,重新判斷數據元素個數是否大於0。

如果線程被interrupt,則直接拋出InterruptedException。

和offer一樣,不帶時間的poll方法在數組元素個數為0直接返回null,不進行等待。

take方法在數據為空的情況下會一直等待,只到數組不為空或者interrupt。

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM