java多線程系列(三)---等待通知機制


等待通知機制

前言:本系列將從零開始講解java多線程相關的技術,內容參考於《java多線程核心技術》與《java並發編程實戰》等相關資料,希望站在巨人的肩膀上,再通過我的理解能讓知識更加簡單易懂。

目錄

非等待通知

public void run() {
		try {
			for (int i = 0; i < 10; i++) {
				list.add();
				System.out.println("添加了" + (i + 1) + "個元素");
				Thread.sleep(1000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void run() {
		try {
			while (true) {
				if (list.size() == 5) {
					System.out.println("==5了,線程b要退出了!");
					throw new InterruptedException();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
  • 兩個線程實現了通信,但list大小為5的時候,線程B退出了,但是線程B不停地輪詢是否為5,這個時候是很占資源的
  • 如果輪詢的時間間隔小,這個時候更加浪費資源
  • 如果輪詢的時間間隔大,那么還可能錯過了想要的數據,比如可能錯過了5
  • 這里共享了list,所以實現了通信,但是因為不知道什么時候通信,所以不停地輪詢,這種通信有缺點,一是浪費cpu資源,二是可能讀取到錯誤的數據

什么是等待通知機制

  • 線程A要等待線程B發出通知才執行,這個時候線程A可以執行wait方法,等待線程B執行notify方法喚醒線程A

等待通知機制實現

public void run() {
		try {
			synchronized (lock) {
				if (MyList.size() != 5) {
					System.out.println("wait begin "
							+ System.currentTimeMillis());
					lock.wait();
					System.out.println("wait end  "
							+ System.currentTimeMillis());
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	
public void run() {
		try {
			synchronized (lock) {
				for (int i = 0; i < 10; i++) {
					MyList.add();
					if (MyList.size() == 5) {
						lock.notify();
						System.out.println("已發出通知!");
					}
					System.out.println("添加了" + (i + 1) + "個元素!");
					Thread.sleep(1000);
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
  • 將上面的代碼進行更改,當大小不等於5的時候,線程A處於wait狀態,直到線程B發出通知,喚醒線程A,通過等待通知機制,避免了線程A不停輪詢造成的資源浪費

消息通知機制注意點

  • wait和notify必須是在同步方法和同步代碼塊里面調用,要不然會拋出異常
  • notify方法是繼承自Object類,可以喚醒在此對象監視器等待的線程,也就是說喚醒的是同一個鎖的線程
  • notify方法調用之后,不會馬上釋放鎖,而是運行完該同步方法或者是運行完該同步代碼塊的代碼
  • 調用notify后隨機喚醒的是一個線程
  • 調用wait方法后會將鎖釋放
  • wait狀態下中斷線程會拋出異常
  • wait(long),超過設置的時間后會自動喚醒,還沒超過該時間也可以通過其他線程喚醒
  • notifyAll可以喚醒同一鎖的所有線程
  • 如果線程還沒有處於等待狀態,其他線程進行喚醒,那么不會起作用,此時會打亂程序的正常邏輯

案例:生產者消費者模式

一個生產者,一個消費者

public void setValue() {
		try {
			synchronized (lock) {
				if (!ValueObject.value.equals("")) {
					lock.wait();
				}
				String value = System.currentTimeMillis() + "_"
						+ System.nanoTime();
				System.out.println("set"+ value);
				ValueObject.value = value;
				lock.notify();
			}

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void getValue() {
		try {
			synchronized (lock) {
				if (ValueObject.value.equals("")) {
					lock.wait();
				}
				System.out.println("get"+ ValueObject.value);
				ValueObject.value = "";
				lock.notify();
			}

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void run() {
		while (true) {
			r.getValue();
		}
	}
public void run() {
		while (true) {
			p.setValue();
		}
	}
  • 如果我們創建一個生產線程,一個消費線程,那么這個時候會交替運行

多個生產者,多個消費者

public void getValue() {
		try {
			synchronized (lock) {
				while (ValueObject.value.equals("")) {
					System.out.println("消費者 "
							+ Thread.currentThread().getName() + " WAITING了☆");
					lock.wait();
				}
				System.out.println("消費者 " + Thread.currentThread().getName()
						+ " RUNNABLE了");
				ValueObject.value = "";
				lock.notify();
			}

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void run() {
		while (true) {
			r.getValue();
		}
	}
public void setValue() {
		try {
			synchronized (lock) {
				while (!ValueObject.value.equals("")) {
					System.out.println("生產者 "
							+ Thread.currentThread().getName() + " WAITING了★");
					lock.wait();
				}
				System.out.println("生產者 " + Thread.currentThread().getName()
						+ " RUNNABLE了");
				String value = System.currentTimeMillis() + "_"
						+ System.nanoTime();
				ValueObject.value = value;
				lock.notify();
			}

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void run() {
		while (true) {
			p.setValue();
		}
	}
  • 如果這個時候創建多個生產者,多個消費者,如果連續喚醒的是同類線程,那么會出現假死狀態,就是線程都處於waiting狀態,因為notify隨機喚醒一個線程,如果喚醒的同類的,那么就浪費了一次喚醒,如果這個時候無法再喚醒異類線程,那么就會假死。這種情況把notify改成notifyAll()就行了。

消息通知機制需要注意的地方

  • 是否線程喚醒的是同類線程會造成影響
  • 生產者消費模式,判斷條件if和while應該使用哪一個

通過管道進行線程間通信

public class ThreadWrite extends Thread {

	private WriteData write;
	private PipedOutputStream out;

	public ThreadWrite(WriteData write, PipedOutputStream out) {
		super();
		this.write = write;
		this.out = out;
	}

	@Override
	public void run() {
		write.writeMethod(out);
	}

}
public class ThreadRead extends Thread {

	private ReadData read;
	private PipedInputStream input;

	public ThreadRead(ReadData read, PipedInputStream input) {
		super();
		this.read = read;
		this.input = input;
	}

	@Override
	public void run() {
		read.readMethod(input);
	}
}
public class Run {

	public static void main(String[] args) {

		try {
			WriteData writeData = new WriteData();
			ReadData readData = new ReadData();

			PipedInputStream inputStream = new PipedInputStream();
			PipedOutputStream outputStream = new PipedOutputStream();

			// inputStream.connect(outputStream);
			outputStream.connect(inputStream);//關鍵

			ThreadRead threadRead = new ThreadRead(readData, inputStream);
			threadRead.start();

			Thread.sleep(2000);

			ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
			threadWrite.start();

		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}
  • PipedInputStream和PiepedOutputStream(對應字符流PipedReader和PipedOutputWriter)這幾個類可以實現線程間流的通信,將管道輸出流和輸出流連接,實現一個線程往管道發送數據,一個線程從管道讀取數據

join方法

public static void main(String[] args) {
		try {
			MyThread threadTest = new MyThread();
			threadTest.start();
			threadTest.join();

			System.out.println("threadTest對象執行完,我再執行");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
  • 當前線程阻塞(main線程),調用線程(threadTest)正常執行,執行完后當前線程(main)繼續執行
public class ThreadB extends Thread {

	@Override
	public void run() {
		try {
			ThreadA a = new ThreadA();
			a.start();
			a.join();

			System.out.println("線程B在run end處打印了");
		} catch (InterruptedException e) {
			System.out.println("線程B在catch處打印了");
			e.printStackTrace();
		}
	}

}

  • 如果線程B執行完了join方法,此時線程B被中斷,那么這個時候拋出異常,但是線程A正常運行

join(long)和sleep(long)的區別

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 從join方法的源代碼可以發現,他的核心方法是wait,在前面已經提到wait方法會釋放鎖,說明join方法也會釋放鎖,但是sleep是不會釋放鎖的。
  • join方法是非靜態的,而sleep是靜態的

ThreadLocal

  • 解決變量在各個線程的隔離性,每個線程綁定自己的值
public void run() {
		try {
			for (int i = 0; i < 100; i++) {
				if (Tools.tl.get() == null) {
					Tools.tl.set("ThreadA" + (i + 1));
				} else {
					System.out.println("ThreadA get Value=" + Tools.tl.get());
				}
				Thread.sleep(200);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public void run() {
		try {
			for (int i = 0; i < 100; i++) {
				if (Tools.tl.get() == null) {
					Tools.tl.set("ThreadB" + (i + 1));
				} else {
					System.out.println("ThreadB get Value=" + Tools.tl.get());
				}
				Thread.sleep(200);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
public class Tools {

	public static ThreadLocal tl = new ThreadLocal();

}
  • 每個線程都設置了值,但是得到的值卻是自己的,互相隔離
  • 如果不開始不設置值,那么得到的值都是null,可以通過繼承ThreadLocal,重載initalValue方法,設置初始值
public class ThreadLocalExt extends ThreadLocal {
	@Override
	protected Object initialValue() {
		return new Date().getTime();
	}
}
  • InheritableThreadLocal,子線程可以繼承父線程的值
public class InheritableThreadLocalExt extends InheritableThreadLocal {
	@Override
	protected Object initialValue() {
		return new Date().getTime();
	}
}
public static void main(String[] args) {
		try {
			for (int i = 0; i < 10; i++) {
				System.out.println("       在Main線程中取值=" + Tools.tl.get());
				Thread.sleep(100);
			}
			Thread.sleep(5000);
			ThreadA a = new ThreadA();
			a.start();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	//main線程和A線程輸出的一樣
  • 在上面代碼的基礎上,重寫childValue方法可以設置子線程的值

我覺得分享是一種精神,分享是我的樂趣所在,不是說我覺得我講得一定是對的,我講得可能很多是不對的,但是我希望我講的東西是我人生的體驗和思考,是給很多人反思,也許給你一秒鍾、半秒鍾,哪怕說一句話有點道理,引發自己內心的感觸,這就是我最大的價值。(這是我喜歡的一句話,也是我寫博客的初衷)

作者:jiajun 出處: http://www.cnblogs.com/-new/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】,希望能夠持續的為大家帶來好的技術文章!想跟我一起進步么?那就【關注】我吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM