Java並發——線程間通信與同步技術


      傳統的線程間通信與同步技術為Object上的wait()、notify()、notifyAll()等方法,Java在顯示鎖上增加了Condition對象,該對象也可以實現線程間通信與同步。本文會介紹有界緩存的概念與實現,在一步步實現有界緩存的過程中引入線程間通信與同步技術的必要性。首先先介紹一個有界緩存的抽象基類,所有具體實現都將繼承自這個抽象基類:

public abstract class BaseBoundedBuffer<V> {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;

	protected BaseBoundedBuffer(int capacity) {
		this.buf = (V[]) new Object[capacity];
	}

	protected synchronized final void doPut(V v) {
		buf[tail] = v;
		if (++tail == buf.length)
			tail = 0;
		++count;
	}

	protected synchronized final V doTake() {
		V v = buf[head];
		buf[head] = null;
		if (++head == buf.length)
			head = 0;
		--count;
		return v;
	}

	public synchronized final boolean isFull() {
		return count == buf.length;
	}

	public synchronized final boolean isEmpty() {
		return count == 0;
	}
}

      在向有界緩存中插入或者提取元素時有個問題,那就是如果緩存已滿還需要插入嗎?如果緩存為空,提取的元素又是什么?以下幾種具體實現將分別回答這個問題。

1、將異常傳遞給調用者

      最簡單的實現方式是:如果緩存已滿,向緩存中添加元素,我們就拋出異常:

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public GrumpyBoundedBuffer() {
		this(100);
	}

	public GrumpyBoundedBuffer(int size) {
		super(size);
	}

	public synchronized void put(V v) throws BufferFullException {
		if (isFull())
			throw new BufferFullException();
		doPut(v);
	}

	public synchronized V take() throws BufferEmptyException {
		if (isEmpty())
			throw new BufferEmptyException();
		return doTake();
	}
}

  這種方法實現簡單,但是使用起來卻不簡單,因為每次put()與take()時都必須准備好捕捉異常,這或許滿足某些需求,但是有些人還是希望插入時檢測到已滿的話,可以阻塞在那里,等隊列不滿時插入對象。

2、通過輪詢與休眠實現簡單的阻塞

      當隊列已滿插入數據時,我們可以不拋出異常,而是讓線程休眠一段時間,然后重試,此時可能隊列已經不是已滿狀態:

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
	int SLEEP_GRANULARITY = 60;

	public SleepyBoundedBuffer() {
		this(100);
	}

	public SleepyBoundedBuffer(int size) {
		super(size);
	}

	public void put(V v) throws InterruptedException {
		while (true) {
			synchronized (this) {
				if (!isFull()) {
					doPut(v);
					return;
				}
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}

	public V take() throws InterruptedException {
		while (true) {
			synchronized (this) {
				if (!isEmpty())
					return doTake();
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}
}

  這種實現方式最大的問題是,我們很難確定合適的休眠間隔,如果休眠間隔過長,那么程序的響應性會變差,如果休眠間隔過短,那么會浪費大量CPU時間。

3、使用條件隊列實現有界緩存

      使用休眠的方式會有響應性問題,因為我們無法保證當隊列為非滿狀態時線程就會立刻sleep結束並且檢測到,所以,我們希望能有另一種實現方式,當緩存非滿時,會主動喚醒線程,而不是需要線程去輪詢緩存狀態,Object對象上的wait()與notifyAll()能夠實現這個需求。當調用wait()方法時,線程會自動釋放鎖,並請求請求操作系統掛起當前線程;當其他線程檢測到條件滿足時,會調用notifyAll()方法喚醒掛起線程,實現線程間通信與同步:

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public BoundedBuffer() {
		this(100);
	}

	public BoundedBuffer(int size) {
		super(size);
	}

	public synchronized void put(V v) throws InterruptedException {
		while (isFull())
			wait();
		doPut(v);
		notifyAll();
	}

	public synchronized V take() throws InterruptedException {
		while (isEmpty())
			wait();
		V v = doTake();
		notifyAll();
		return v;
	}

	public synchronized void alternatePut(V v) throws InterruptedException {
		while (isFull())
			wait();
		boolean wasEmpty = isEmpty();
		doPut(v);
		if (wasEmpty)
			notifyAll();
	}
}

  注意,上面的例子中我們使用了notifyAll()喚醒線程而不是notify()喚醒線程,如果我們改用notify()喚醒線程的話,將導致錯誤的,notify()會在等待隊列中隨機選擇一個線程喚醒,而notifyAll()會喚醒所有等待線程。對於上面的例子,如果現在是非滿狀態,我們使用notify()喚醒線程,由於只能喚醒一個線程,那么我們喚醒的可能是在等待非空狀態的線程,將導致信號丟失。只有同時滿足以下兩個條件時,才能用單一的notify而不是notifyAll:

  1. 所有等待線程的類型都相同。只有一個條件謂詞與條件隊列相關,並且每個線程在從wait返回后將執行相同的操作。
  2. 單進單出。在條件變量上的每次通知,最多只能喚醒一個線程來執行。

4、使用顯示的Condition實現有界緩存     

      內置條件隊列存在一些缺陷,每個內置鎖都只能有一個相關聯的條件隊列,因而像上個例子,多個線程都要在同一個條件隊列上等待不同的條件謂詞,如果想編寫一個帶有多個條件謂詞的並發對象,就可以使用顯示的鎖和Condition,與內置鎖不同的是,每個顯示鎖可以有任意數量的Condition對象。以下代碼給出了有界緩存的另一種實現,即使用兩個Condition,分別為notFull和notEmpty,用於表示"非滿"與"非空"兩個條件謂詞。

public class ConditionBoundedBuffer<T> {
	protected final Lock lock = new ReentrantLock();
	private final Condition notFull = lock.newCondition();
	private final Condition notEmpty = lock.newCondition();
	private static final int BUFFER_SIZE = 100;
	private final T[] items = (T[]) new Object[BUFFER_SIZE];
	private int tail, head, count;

	public void put(T x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				notFull.await();
			items[tail] = x;
			if (++tail == items.length)
				tail = 0;
			++count;
			notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	public T take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				notEmpty.await();
			T x = items[head];
			items[head] = null;
			if (++head == items.length)
				head = 0;
			--count;
			notFull.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}

      注意,在上面的例子中,由於使用了兩個Condition對象,我們的喚醒方法調用的是signal()方法,而不是signalAll()方法。

      使用條件隊列時,需要特別注意鎖、條件謂詞和條件變量之間的三元關系:在條件謂詞中包含的變量必須由鎖保護,在檢查條件謂詞以及調用wait和notify(或者await和signal)時,必須持有鎖對象。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM