文章目錄
一、使用同一個共享變量控制
二、PipedInputStream、PipedOutputStream
三、利用BlockingQueue
四、利用LockSupport
一、使用同一個共享變量控制
Synchronized、wait、notify
public class Demo1 { private final List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo1 demo =new Demo1(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==1){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==0){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); } }
Lock、Condition
public class Task { private final Lock lock = new ReentrantLock(); private final Condition addConditon = lock.newCondition(); private final Condition subConditon = lock.newCondition(); private volatile int num = 0; private List<String> list = new ArrayList<>(); public void add() { for (int i = 0; i < 10; i++) { lock.lock(); try { if (list.size() == 10) { addConditon.await(); } num++; Thread.sleep(100); list.add("add " + num); System.out.println("The list size is " + list.size()); System.out.println("The add thread is " + Thread.currentThread().getName()); System.out.println("-------------"); subConditon.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public void sub() { for (int i = 0; i < 10; i++) { lock.lock(); try { if (list.size() == 0) { subConditon.await(); } num--; Thread.sleep(100); list.remove(0); System.out.println("The list size is " + list.size()); System.out.println("The sub thread is " + Thread.currentThread().getName()); System.out.println("-------------"); addConditon.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public static void main(String[] args) { Task task = new Task(); new Thread(task::add).start(); new Thread(task::sub).start(); } }
利用volatile
volatile修飾的變量值直接存在主內存里面,子線程對該變量的讀寫直接寫住內存,而不是像其它變量一樣在local thread里面產生一份copy。volatile能保證所修飾的變量對於多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
public class Demo2 { private volatile List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo2 demo =new Demo2(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); } }
利用AtomicInteger
和volatile類似, 只是原子操作達到預估值非A即B
二、PipedInputStream、PipedOutputStream
這里用流在兩個線程間通信,但是Java中的Stream是單向的,所以在兩個線程中分別建了一個input和output
public class PipedDemo { private final PipedInputStream inputStream1; private final PipedOutputStream outputStream1; private final PipedInputStream inputStream2; private final PipedOutputStream outputStream2; public PipedDemo(){ inputStream1 = new PipedInputStream(); outputStream1 = new PipedOutputStream(); inputStream2 = new PipedInputStream(); outputStream2 = new PipedOutputStream(); try { inputStream1.connect(outputStream2); inputStream2.connect(outputStream1); } catch (IOException e) { e.printStackTrace(); } } /**程序退出時,需要關閉stream*/ public void shutdown() throws IOException { inputStream1.close(); inputStream2.close(); outputStream1.close(); outputStream2.close(); } public static void main(String[] args) throws IOException { PipedDemo demo =new PipedDemo(); new Thread(()->{ PipedInputStream in = demo.inputStream2; PipedOutputStream out = demo.outputStream2; for (int i = 0; i < 10; i++) { try { byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("go".equals(new String(inArr))) break; } out.write("ok".getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ PipedInputStream in = demo.inputStream1; PipedOutputStream out = demo.outputStream1; for (int i = 0; i < 10; i++) { try { out.write("go".getBytes()); byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("ok".equals(new String(inArr))) break; } } catch (IOException e) { e.printStackTrace(); } } }).start(); // demo.shutdown(); } }
輸出:
Thread-0: 0 go Thread-1: 0 ok Thread-0: 1 go Thread-1: 1 ok Thread-0: 2 go Thread-1: 2 ok Thread-0: 3 go Thread-1: 3 ok Thread-0: 4 go Thread-1: 4 ok Thread-0: 5 go Thread-1: 5 ok Thread-0: 6 go Thread-1: 6 ok Thread-0: 7 go Thread-1: 7 ok Thread-0: 8 go Thread-1: 8 ok Thread-0: 9 go Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定義的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
- offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續。
- poll(time):獲取並刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。
- peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。
- take():獲取並刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。
BlockingQueue有四個具體的實現類:
- ArrayBlockingQueue:數組阻塞隊列,規定大小,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:鏈阻塞隊列,大小不定,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。
- PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,它的內部同時只能夠容納單個元素,對其的操作必須是放和取交替完成的。
- DelayQueue:延遲隊列,注入其中的元素必須實現 java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式類似,以下例子一個線程寫入,一個線程讀取,操作的是同一個Queue:
public class BlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //讀線程 new Thread(() -> { int i =0; while (true) { try { String item = queue.take(); System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); i++; } catch (Exception e) { e.printStackTrace(); } } }).start(); //寫線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String item = "go"+i; System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); queue.put(item); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
四、利用LockSupport
用LockSupport的unpark()和park()方法,實現線程間通信。
五、利用ThreadLocal
ThreadLocal,即線程變量,是一個以 ThreadLocal 對象為鍵、任意對象為值的存儲結構。這個結構被依附在線程上,也就是說一個線程可以根據一個 ThreadLocal 對象查詢到綁定在這個線程上的一個值。
可以通過 set(T) 方法來設置一個值,在當前線程下再通過 get() 方法獲取到原先設置的值。
public class ThreadLocalDemo { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>() { @Override protected Long initialValue() { return System.currentTimeMillis(); } }; public static final void begin() { TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final long end() { return System.currentTimeMillis() - TIME_THREADLOCAL.get(); } public static void main(String[] args) throws InterruptedException { ThreadLocalDemo.begin(); Thread.sleep(2000); System.out.println(ThreadLocalDemo.end()); } } //輸出 2003