1.創建多線程
public class MultiThread { public static void main(String[] args) { // 通過繼承Thread類 Thread thread = new Thread(){ @Override public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1:" + Thread.currentThread().getName()); System.out.println("2:" + this.getName()); } } }; thread.start(); // 通過實現Runnable接口 Thread thread2 = new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1:" + Thread.currentThread().getName()); } } }); thread2.start(); // 如果既繼承runnable接口又實現了Thread類, 會執行哪個? new Thread( new Runnable(){ public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("runnable :" + Thread.currentThread().getName()); } } } ){ public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread :" + Thread.currentThread().getName()); } } }.start(); } }
2.定時器Timer
定時任務就是靠多線程實現的
public class TimerTest { private static int count = 0;
public static void main(String[] args) { class MyTimerTask extends TimerTask{ @Override public void run() { count = (count+1)%2; System.out.println("bombing!"); new Timer().schedule(new MyTimerTask(),2000+2000*count); } } new Timer().schedule(new MyTimerTask(), 2000); while(true){ System.out.println(new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3.互斥 synchronized
保證線程安全(數據完整性)
public class MultiThreadMutex { public static void main(String[] args) { new MultiThreadMutex().init(); } private void init(){ final Outputer outputer = new Outputer(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage"); } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output3("c++IsAMulti-paradigmSystems-levelProgrammingLanguage"); } } }).start(); } static class Outputer{ public void output(String name){ int len = name.length(); synchronized (Outputer.class) { for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); } } public synchronized void output2(String name){ int len = name.length(); for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); } public static synchronized void output3(String name){ int len = name.length(); for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); } } }
4.同步 wait/notify
保證線程間執行次序
// 1. wait notify成對出現, 並且處於互斥鎖的范圍內 // 2. 要用while(condition)圍住mutex.wait(), 因為存在虛假喚醒 public class MultiThreadSynchronization { public static void main(String[] args) { final Business business = new Business(); new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } } ).start(); for(int i=1;i<=50;i++){ business.main(i); } } } class Business { private boolean bShouldSub = true; public synchronized void sub(int i) { while (!bShouldSub) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; this.notify(); } public synchronized void main(int i) { while (bShouldSub) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for (int j = 1; j <= 100; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; this.notify(); } }
5.線程間傳遞參數
共享變量
/ 多個線程共享變量 // 以類中變量為中介; 以傳入的共同參數為中介; 匿名內部類以主線程main中變量為中介; public class MultiThreadShareData { public static void main(String[] args) { // 傳入共享參數 每個線程執行相同的代碼 ShareData1 data1 = new ShareData1(); new Thread(data1).start(); new Thread(data1).start(); // 傳入共享參數 ShareData2 data2 = new ShareData2(); new Thread(new MyRunnable1(data2)).start(); new Thread(new MyRunnable2(data2)).start(); // 匿名內部類實現變量的寫法更簡潔, 不需要傳參 final ShareData2 data3 = new ShareData2(); new Thread(new Runnable(){ @Override public void run() { data3.decrement(); } }).start(); new Thread(new Runnable(){ @Override public void run() { data3.increment(); } }).start(); } } // 方式1. 如果每個線程執行相同的代碼 -> 多個Thread共享同一個runnable中的對象 少有可能 class ShareData1 implements Runnable { private int count = 100; @Override public void run() { while (true) { synchronized(this) { count--; } } } } // 方式2. class ShareData2 { private int j = 0; public synchronized void increment() { j++; } public synchronized void decrement() { j--; } } class MyRunnable1 implements Runnable { private ShareData2 data1; public MyRunnable1(ShareData2 data1) { this.data1 = data1; } public void run() { data1.decrement(); } } class MyRunnable2 implements Runnable { private ShareData2 data1; public MyRunnable2(ShareData2 data1) { this.data1 = data1; } public void run() { data1.increment(); } }
管道
public class MultiThreadPipe { public static void main(String[] args) { PipedOutputStream pos = new PipedOutputStream(); PipedInputStream pis = new PipedInputStream(); try { pos.connect(pis); } catch (IOException e) { e.printStackTrace(); } new Consumer(pis).start(); new Producer(pos).start(); } } class Producer extends Thread { private PipedOutputStream pos; public Producer(PipedOutputStream pos) { this.pos = pos; } public void run() { int i = 8; try { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } pos.write(i); } catch (IOException e) { e.printStackTrace(); } } } class Consumer extends Thread { private PipedInputStream pis; public Consumer(PipedInputStream pis) { this.pis = pis; } public void run() { try { System.out.println(pis.read()); } catch (IOException e) { e.printStackTrace(); } } }
6.ThreadLocal
該變量形式上共享, 但卻是by線程獨立
public class ThreadLocalExample { private static ThreadLocal<Integer> x = new ThreadLocal<Integer>(); public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new Runnable() { @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); x.set(data); Person.getInstance().setName("name" + data); Person.getInstance().setAge(data); new A().print(); new B().print(); } }).start(); } } static class A{ public void print(){ int data = x.get(); System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data); Person myData = Person.getInstance(); System.out.println("A from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } static class B{ public void print(){ int data = x.get(); System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data); Person myData = Person.getInstance(); System.out.println("B from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } } // javaBean的by線程的單例 class Person { private static ThreadLocal<Person> personThreadLocal = new ThreadLocal<Person>(); private Person(){} public static /*無需synchronized*/ Person getInstance(){ Person instance = personThreadLocal.get(); if(instance == null){ instance = new Person(); personThreadLocal.set(instance); } return instance; } private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
ThreadLocal實現原理
public class ThreadLocalSimulation { private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); //核心 public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new Runnable() { @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data); } } static class B{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data); } } }
7. 線程池
池化技術都是防止頻繁開關來提高系統性能, 代價是必須損耗一定空間來保存池
// 池化技術之線程池 public class ThreadPoolTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(3); // 限制線程數量 //ExecutorService threadPool = Executors.newCachedThreadPool(); // 動態控制線程數量 //ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 跟一個線程類似, 但可以保證線程掛了有新線程接替 for(int i=1; i<=10; i++){ final int task = i; threadPool.execute(new Runnable(){ @Override public void run() { for(int j = 1; j <= 10; j++){ try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " task:" + task + " loop:" + j); } } }); } System.out.println("all of 10 tasks have committed!"); threadPool.shutdown(); // 如果是shutdownNow方法會停止正在執行的任務 // 帶定時器的線程池 schedule方法:xx時間以后執行; scheduleAtFiexedRate方法:xx時間后每隔yy時間執行 Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("bombing!"); } }, 6, 2, TimeUnit.SECONDS); } }
8. Callable接口與Future
能實現返回線程執行結果 的效果
// 返回結果的任務 public class CallableAndFuture { public static void main(String[] args) { // 其一 ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<String> future = threadPool.submit( // submit Callable<resultType>而非execute Runnable new Callable<String>() { public String call() throws Exception { // 模擬handling Thread.sleep(2000); return "hello"; }; }); System.out.println("等待結果"); try { System.out.println("拿到結果:" + future.get()); //阻塞等待結果, 還有個get方法的重載版本,帶超時參數, 超時拋異常. future/get的特點在於, 我們可以把任務合理分解, 在需要任務結果時調用get } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } threadPool.shutdown(); //不用該函數主線程是不會退出的 // 其二 // ExecutorCompletionService包裝線程池, take方法返回最先完成的Future任務 ExecutorService threadPool2 = Executors.newFixedThreadPool(10); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2); for (int i = 1; i <= 10; i++) { final int seq = i; completionService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { // 模擬handling Thread.sleep(new Random().nextInt(5000)); return seq; } }); } for (int i = 0; i < 10; i++) { try { System.out.println(completionService.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadPool2.shutdown(); } }
9.Lock
ReentrantLock是具有synchronized功能的類
ReentrantReadWriteLock 粒度更細, 讀與讀不互斥, 寫與寫互斥, 讀與寫互斥
// 使用Lock改寫synchronized例子 public class LockTest { public static void main(String[] args) { new LockTest().init(); } private void init(){ final Outputer outputer = new Outputer(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage"); } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("c++IsAMulti-paradigmSystems-levelProgrammingLanguage"); } } }).start(); } static class Outputer { Lock lock = new ReentrantLock(); public void output(String name) { int len = name.length(); lock.lock(); try { for (int i = 0; i < len; i++) { System.out.print(name.charAt(i)); } System.out.println(); } finally { lock.unlock(); } } } }
使用讀寫鎖模擬緩存
// 模擬緩存 // 加鎖解鎖要一致: 解沒加過的鎖會拋出異常; 加鎖不解會造成死鎖 public class CacheSimulation { public static void main(String[] args) { for (int i = 0; i < 10; ++i) { new Thread(new Runnable() { @Override public void run() { String i = (String) getData("key"); // out.println()參數為常量無並發問題; 為表達式時存在並發問題 System.out.println(i); } }).start(); } } private static Map<String, Object> cache = new HashMap<String, Object>(); //保存緩存 private static ReadWriteLock rwl = new ReentrantReadWriteLock(); public static Object getData(String key) { rwl.readLock().lock(); Object value = cache.get(key); if (value == null) { rwl.readLock().unlock(); rwl.writeLock().lock(); if (cache.get(key) == null) { // 防止幾個線程都阻塞在writeLock.lock() value = "abcde"; // 模擬獲取數據 System.out.println("get"); cache.put(key, value); } rwl.writeLock().unlock(); } return value; } }
10.Condition
Condition具有wait/notify功能的類, 同樣要配合Lock使用. 但與synchronized的waitnotify不同, 這里同一個Lock下可以創建多個Condition對象, 來實現粒度更細的控制
一個condition
// 使用Condition改寫線程同步示例, Condition由Lock.newCondition()而來 // Condition.await/signal 對應 Mutex.wait/notify public class ConditionTest { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { business.sub(i); } } }).start(); for (int i = 1; i <= 30; i++) { business.main(i); } } static class Business { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); private boolean bShouldSub = true; public void sub(int i) { lock.lock(); try { while (!bShouldSub) { try { condition.await(); } catch (Exception e) { e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; condition.signal(); } finally { lock.unlock(); } } public void main(int i) { lock.lock(); try { while (bShouldSub) { try { condition.await(); } catch (Exception e) { e.printStackTrace(); } } for (int j = 1; j <= 20; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; condition.signal(); } finally { lock.unlock(); } } } }
兩個condition, 下面模擬了數組阻塞隊列
// 有界緩沖區/數組阻塞隊列 的模擬 class ArrayBlockingQueueSimulation { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; // 長度 int putptr, takeptr, count; // 初始為0 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[putptr] = x; if (++putptr == items.length) { putptr = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) { takeptr = 0; } --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
三個condition, 如下實現了三個線程輪流執行
public class ThreeThreadsSynchronization { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { business.sub2(i); } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { business.sub3(i); } } }).start(); for (int i = 1; i <= 50; i++) { business.main(i); } } static class Business { Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); private int shouldSub = 1; public void sub2(int i) { lock.lock(); try { while (shouldSub != 2) { try { condition2.await(); } catch (Exception e) { e.printStackTrace(); } } for (int j = 1; j <= 20; j++) { System.out.println("sub2 thread sequence of " + j + ",loop of " + i); } shouldSub = 3; condition3.signal(); } finally { lock.unlock(); } } public void sub3(int i) { lock.lock(); try { while (shouldSub != 3) { try { condition3.await(); } catch (Exception e) { e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { System.out.println("sub3 thread sequence of " + j + ",loop of " + i); } shouldSub = 1; condition1.signal(); } finally { lock.unlock(); } } public void main(int i) { lock.lock(); try { while (shouldSub != 1) { try { condition1.await(); } catch (Exception e) { e.printStackTrace(); } } for (int j = 1; j <= 30; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } shouldSub = 2; condition2.signal(); } finally { lock.unlock(); } } } }
11. Semaphore
Semaphore信號量, 互斥鎖保證多個線程同時訪問同一個資源時的線程安全性, 信號量讓線程動態匹配現有資源數, 來保證同時訪問多個資源時的線程安全性, 並發更高.
Lock是哪個線程拿哪個線程負責釋放; 信號量可以是一個線程獲取, 另一個線程釋放, 這個特性能用於死鎖恢復.
public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { Runnable runnable = new Runnable() { public void run() { try { semaphore.acquire(); //線程進入時獲取信號量 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "進入,當前已有" + (3 - semaphore.availablePermits()) + "個並發"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "即將離開"); semaphore.release(); //線程結束時釋放信號量 // 下面代碼有時候執行不准確,因為其沒有和上面的代碼合成原子單元 System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有" + (3 - semaphore.availablePermits()) + "個並發"); } }; service.execute(runnable); }
service.shutdown(); } }
12. CyclicBarrier
多個線程階段點同步
public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { public void run() { try { // 模擬handling Thread.sleep((long) (Math.random() * 1000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候")); cb.await(); //第一個同步點 Thread.sleep((long) (Math.random() * 1000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候")); cb.await(); //第二個同步點 Thread.sleep((long) (Math.random() * 1000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候")); cb.await(); //第三個同步點 } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
13.CountDownLatch
線程通過等待計數器歸零來實現同步 實現一個人/多個人等待一個人/多個人的完成
public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); //初始計數器的數為1 final CountDownLatch cdAnswer = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { public void run() { try { System.out.println("線程" + Thread.currentThread().getName() + "准備接受執行命令"); cdOrder.await(); System.out.println("線程" + Thread.currentThread().getName() + "已接到命令, 開始執行"); // 模擬handling Thread.sleep((long) (Math.random() * 5000)); System.out.println("線程" + Thread.currentThread().getName() + "的分任務完成"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 5000)); System.out.println("線程" + Thread.currentThread().getName() + "即將發送執行命令"); cdOrder.countDown(); System.out.println("線程" + Thread.currentThread().getName() + "已發送命令, 任務正在處理"); cdAnswer.await(); System.out.println("線程" + Thread.currentThread().getName() + "主管的所有任務完成"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
14. Exchanger
兩個線程間互相交換數據
public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger<String> exchanger = new Exchanger<>(); service.execute(new Runnable(){ public void run() { try { String data1 = "王老吉"; System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 +" 換出去"); Thread.sleep((long)(Math.random()*2000)); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換回的數據為 " + data2); }catch(Exception e){ e.printStackTrace(); } } }); service.execute(new Runnable(){ public void run() { try { String data1 = "加多寶"; System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 +" 換出去"); Thread.sleep((long)(Math.random()*2000)); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換回的數據為 " + data2); }catch(Exception e){ e.printStackTrace(); } } }); service.shutdown(); } }
15. 阻塞隊列
阻塞隊列實現了BlockingQueue接口, 是生產者消費者模型的典范, 通過鎖實現
put和take方法才具有阻塞功能
阻塞隊列與線程同步 : 兩個大小為1的空/滿阻塞隊列可以實現condition或wait/notify的效果
阻塞隊列與Semaphore : 阻塞隊列是一個線程存入數據, 一個線程取出數據; Semaphore一般用作同一線程獲取和釋放
public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3); for (int i = 0; i < 2; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); System.out.println(Thread.currentThread().getName() + "准備放數據!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "已經放入數據, " + "隊列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread() { public void run() { while (true) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "准備取數據!"); queue.take(); System.out.println(Thread.currentThread().getName() + "已經取走數據, " + "隊列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
兩個長度為1的空/滿隊列實現condition的效果
public class BlockingQueueImplSynchronization { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 20; i++) { business.sub1(i); } } }).start(); for (int i = 1; i <= 20; i++) { business.sub2(i); } } static class Business { BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); { try { System.out.println("init"); queue2.put(1); //queue1為空 queue為滿 } catch (InterruptedException e) { e.printStackTrace(); } } public void sub1(int i) { try { queue1.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 1; j <= 10; j++) { System.out.println("sub thread sequece of " + j + ", loop of " + i); } try { queue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub2(int i) { try { queue2.put(1); } catch (InterruptedException e1) { e1.printStackTrace(); } for (int j = 1; j <= 20; j++) { System.out.println("main thread sequece of " + j + ", loop of " + i); } try { queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
16. 線程安全的非阻塞容器
並發集合
在JDK5之前, 多線程中對容器的操作部分需要手動加synchronized塊保證線程安全
稍微輕便點的方式是使用Collections.synchronizedXXX()生成集合. 實現原理:通過裝飾器模式在同名方法前添加synchronized(this), 來達到實現線程安全
但這是不完整的解決方案, 因為
1)裝飾類的迭代器相關的代碼沒有加synchronized. 涉及到迭代還依然需要手動加synchronized塊
2)迭代器遍歷過程中除該迭代器外不能用其他方式增刪元素(單線程在自身循環內, 多線程在不同線程執行不同部分), 否則拋出並發修改異常
3)最重要的, 並發低
// 在集合的迭代器迭代過程中, 除了迭代器外不能對集合進行修改, 否則會拋出ConcurrentModificationException // ConcurrentModificationException的實現: 樂觀鎖, 記錄一個版本號, 版本號不對拋異常 public class ConcurrentModificationExceptionExample { public static void main(String[] args) { // Collection users = new CopyOnWriteArrayList(); //若使用同步集合, 非迭代器修改就正常 Collection<User> users = new ArrayList<>(); users.add(new User("張三", 28)); users.add(new User("李四", 25)); users.add(new User("王五", 31)); Iterator itrUsers = users.iterator(); while (itrUsers.hasNext()) { System.out.println("mark"); User user = (User) itrUsers.next(); if ("張三".equals(user.getName())) { users.remove(user); // 非迭代器修改拋出異常 //itrUsers.remove(); // 若使用迭代器修改, 則正常 } else { System.out.println(user); } } } }
JDK5之后提出了很多線程安全的容器, 與前輩synchronized方式比起來, 它們的亮點並不是保證了線程安全, 而是它們在保證線程安全的同時盡量避免並發瓶頸
基本上限制條件多的容器都能實現Concurrent版本, 保持一定的讀寫並發; 像ArrayList LinkedList很難避開並發瓶頸, 退而求其次ArrayList實現了CopyOn保證了讀並發;
LinkedList只能是通過Collections.synchronizedList()的synchronized方式(讀|讀都有鎖), 盡量用其他集合替代.
ps:Collections.synchronizedList()或Vector的區別: 1.擴容量不同Vector 100%, SynchronizedList 50%. 2.Vector已對迭代器加鎖, SynchronizedList需要手動加鎖
原有集合 | 並發集合 | 原理 |
HashMap | ConcurrentHashMap | 鎖分段技術 |
HashSet | Collections.newSetFromMap(new ConcurrentHashMap()) | 用map版本實現 |
TreeMap | ConcurrentSkipListMap | 用SkipList替代紅黑樹, CAS |
TreeSet | ConcurrentSkipListSet | 用map版本實現 |
Queue接口 | ConcurrentLinkedQueue 非阻塞 | CAS |
ArrayList | CopyOnWriteArrayList | 寫時復制, 提高了讀並發度; 以空間換取了部分寫並發, 這點好壞需測試 |
Set接口 | CopyOnWriteArraySet | 用ArrayList版本實現, 用addIfAbsent()方法實現元素去重,寫時還要復制, 因此寫效率不佳 |
CAS原理類似樂觀鎖, 處理器保證底層實現, 理念:多次嘗試肯定有一個能成(版本匹配則操作成功->該操作即具有原子性), 但會做很多無用功; 相比加鎖能提高並發
17. 原子類
AtomicInteger等原子類用的是CAS, 並發比加鎖高
實現多線程下安全性的要素: 原子性(不能被其他影響某變量的程序段打斷) + 內存可見性 (一個線程修改, 另一個線程馬上能看到)
synchronized: 實現了原子性和可見性
volatile: 實現內存可見性 CAS: 實現原子性