Java中線程安全的集合淺析


1、JDK1.5之前

舊版本的集合主要有兩個Vector和Hashtable,在java.util包下。

這兩個類保證線程安全都是采用synchronized修飾方法的方式。在1.5之前,效率不高,現在已基本棄用。

1.1、Vector

1.2、Hashtable

1.3、Collections工具類

在JDK1.5之前,可以通過java.util.Collections工具類將非線程安全的集合通過public static <T> Collection<T> synchronizedCollection(Collection<T> c)方法轉化為線程安全的集合


2、JDK1.5之后

jdk1.5之后,在java.util.concurrent包下,新加入了幾個高效的線程安全集合類。

常用的有CopyOnWriteArrayList、CopyOnWriteArraySet

2.1、CopyOnWriteArrayList

線程安全的ArrayList,加強版的讀寫分離。

寫加鎖,讀無鎖,讀寫不阻塞,優於讀寫鎖。

線程安全實現方式:寫入時,先copy一個容器副本,再添加新元素,最后替換引用。是一種用空間換取線程安全的實現方式。

使用方式和一般的ArrayList一樣。

2.1.1、使用示例
//jdk1.5之前可以使用collections將線程不安全的集合轉為線程安全的
/*List list = new ArrayList();
List synchronizedList = Collections.synchronizedList(list);*/

//jdk1.5之后可以使用java.util.concurrent包下線程安全的集合
List list = new CopyOnWriteArrayList();

ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    int t = i;
    threadPool.submit(new Runnable() {
        @Override
        public void run() {
            for (int j = 0; j < 10; j++) {
                list.add(Thread.currentThread().getName() +"添加:"+ t + "---" + j);
            }
        }
    });
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {

}
System.out.println(list.toString());
2.1.2、源碼解析
2.1.2.1、add()方法
public boolean add(E e) {
    //獲取鎖對象並加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //得到元素數組
        Object[] elements = getArray();
        int len = elements.length;
        //復制副本
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //添加新元素
        newElements[len] = e;
        //設置新的數組
        setArray(newElements);
        return true;
    } finally {
        //釋放鎖
        lock.unlock();
    }
}
2.1.2.2、set()方法
public E set(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        E oldValue = get(elements, index);
		//判斷新元素是否和就元素相等
        //不相等則替換,相等則不做操作
        if (oldValue != element) {
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            setArray(newElements);
        } else {
            // Not quite a no-op; ensures volatile write semantics
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}
2.1.2.3、get()方法
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}

/**
 * {@inheritDoc}
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public E get(int index) {
    return get(getArray(), index);
}
2.1.2.4、remove方法
public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            Object[] newElements = new Object[len - 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                             numMoved);
            setArray(newElements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

2.2、CopyOnWriteArraySet

線程安全的set集合,底層是一個CopyOnWriteArrayList。

與CopyOnWriteArrayList的不同是添加元素的add()方法使用的CopyOnWriteArrayList的addIfAbsent()方法實現。會遍歷整個數組。如果元素已存在,則不添加,丟棄副本。

2.2.1、使用示例
CopyOnWriteArraySet set = new CopyOnWriteArraySet();
set.add("123");
set.add("456");
set.add("abc");
set.add("123");

set.forEach(o -> System.out.println(o));
2.2.2、源碼解析
2.2.2.1、實例化
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {
    private static final long serialVersionUID = 5457747651344034263L;

    private final CopyOnWriteArrayList<E> al;

    /**
     * Creates an empty set.
     */
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }
}
2.2.2.2、add()方法
public boolean add(E e) {
    return al.addIfAbsent(e);
}

==>CopyOnWriteArrayList
public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    //先判斷元素是否存在與舊數組中
    //存在則直接返回false,不做操作,反之則調用addIfAbsent(e, snapshot)方法
    return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
    addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] current = getArray();
        int len = current.length;
        if (snapshot != current) {
            // Optimize for lost race to another addXXX operation
            int common = Math.min(snapshot.length, len);
            for (int i = 0; i < common; i++)
                if (current[i] != snapshot[i] && eq(e, current[i]))
                    return false;
            if (indexOf(e, current, common, len) >= 0)
                return false;
        }
        Object[] newElements = Arrays.copyOf(current, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
2.2.2.3、remove()方法
public boolean remove(Object o) {
    return al.remove(o);
}

2.3、ConcurrentLinkedQueue

無界隊列,元素個數無上限,可以一直添加。

線程安全,高效可讀寫的隊列,高並發下性能最好。

沒有使用鎖的方式(無鎖),而是采用CAS算法(Compare And Switch ——比較交換算法)實現線程安全。

添加方法包括三個核心參數(V/E/N):V—需要更新的值,E—預期值(V的備份),N—更新值。更新前,先將V賦值給E,拿到N准備更新時,先比較此時的V和E是否相等,相等則更新,否則放棄更新。即:只有當V==E時,才會執行V=N,否則表示已經更新過,取消操作。

CAS是基於硬件的一種原子操作算法,效率極高。

2.3.1、使用示例
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
//        queue.add("1");//添加方法,底層使用offer()實現,會拋出異常
//        queue.add("a");
        queue.offer("2");//添加方法,不會拋出異常
        queue.offer(3);

        System.out.println("---" + queue.size());
        for (int i = 0; i <= queue.size(); i++) {
//            System.out.println(queue.remove());//獲取並移除第一個元素,與add()成對使用,會拋出異常
            System.out.println(queue.poll());//獲取並移除第一個元素,offer()成對使用,不會拋出異常
        }
        System.out.println("===" + queue.size());
2.3.2、源碼解析
2.3.2.1、實例化
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;
    
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
2.3.2.2、offer()方法
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                //這里可以看到采用的是CAS算法
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
2.3.2.3、poll() 方法
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

2.4、BlockingQueue

Queue的子接口,阻塞隊列,元素有個數限制,怎加了兩個線程狀態為無限等待的方法:

  • put()—添加

  • take()—取出

生產者消費者模式的最好實現。

其重要的兩個實現類為:

  • ArrayBlockingQueue:數組實現,需要自定義元素上限。
  • LinkedBlockingQueue:鏈表實現,默認元素上限為Integer.MAX_VALUE
2.4.1、使用示例
2.4.1.1、ArrayBlockingQueue使用
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
try {
    queue.put(1);
    queue.put(2);
    queue.put(3);
    queue.put(4);
    System.out.println("已添加4個元素");
    System.out.println("queue.size():" + queue.size());
    //由於隊列容量為4,所以以下操作會無限等待下去
    queue.put(5);
    System.out.println("已添加5個元素");
    System.out.println("queue.size():" + queue.size());
} catch (InterruptedException e) {
    e.printStackTrace();
}

ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
try {
    queue.put(1);
    queue.put(2);
    queue.put(3);
    queue.put(4);
    System.out.println("已添加4個元素");
    System.out.println("queue.size():" + queue.size());
    queue.take();//取出一個元素
    queue.put(5);
    System.out.println("已添加5個元素");
    System.out.println("queue.size():" + queue.size());
    System.out.println(queue.toString());
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.4.1.2、LinkedBlockingQueue使用

與ArrayBlockingQueue類似

2.4.2、源碼解析
2.4.2.1、ArrayBlockingQueue實例化
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
2.4.2.2、LinkedBlockingQueue實例化
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
2.4.2.3、put()方法
===>ArrayBlockingQueue
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

===>LinkedBlockingQueue
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
2.4.2.4、take()方法
===>ArrayBlockingQueue
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

===>LinkedBlockingQueue
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

2.5、ConcurrentHashMap

jdk1.8之前:

采用分段鎖設計的線程安全Map集合,初始容量默認為16段(Segment)。

不是對整個map加鎖,而是對每個segment加鎖,以提高效率。最理想的狀態是16個對象分別存入16個segment,並發量為16。

jdk1.8開始:

采用CAS算法實現同步。

使用方式與HashMap無異。

2.5.1、使用示例
Map map = new ConcurrentHashMap();
for (int i = 0; i < 5; i++) {
    int t = i;
    new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(t, new Random().nextInt(100));
        }
    }).start();
}

map.forEach((key, value) -> System.out.println(key + " ==> " + value));


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM