-
BlockingQueue
-
先進先出的數據結構。
-
寫:如果隊列滿了,就必須阻塞等待消費。
-
取:如果隊列是空的,就必須阻塞等待生產。
-
一般在多線程並發處理和線程池中使用。
-
有這些實現類,重要的是紅色中的
-
繼承結構其實和List類似
主要API
阻塞隊列有4組API,其實就是添加、移除和查看隊首元素的4組方法。
這4組方法面對無法執行的時候會有着不同的行為,可以根據業務需求去使用。
比如隊列滿了,無法添加元素的時候,add方法會拋出異常,offer則會返回false....
方式 | 拋出異常 | 有返回值 | 阻塞等待 | 超時等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(,int,TimeUnit) |
移除 | remove | poll | take | poll(int,TimeUnit) |
判斷隊列首 | element | peek |
-
拋出異常
//1.拋出異常 public static void test1(){ //實例化一個容納3個元素的隊列 ArrayBlockingQueue<Object> blockQueue = new ArrayBlockingQueue<>(3); //添加元素會返回true false System.out.println(blockQueue.add("A")); System.out.println(blockQueue.add("B")); System.out.println(blockQueue.add("C")); //添加第4個元素會拋出異常 :java.lang.IllegalStateException: Queue full // System.out.println(blockQueue.add("D")); //彈出元素會返回元素值 System.out.println(blockQueue.remove()); System.out.println(blockQueue.remove()); System.out.println(blockQueue.remove()); //空的集合再彈出元素會拋出異常: java.util.NoSuchElementException System.out.println(blockQueue.remove()); }
-
返回值
//2.不拋出異常給返回值 public static void test2(){ ArrayBlockingQueue<Object> blockQueue = new ArrayBlockingQueue<>(3); System.out.println(blockQueue.offer("A")); System.out.println(blockQueue.offer("B")); System.out.println(blockQueue.offer("C")); System.out.println(blockQueue.offer("D")); //不拋出異常,返回false System.out.println(blockQueue.element());//查看隊首元素值,拋異常 System.out.println(blockQueue.poll()); System.out.println(blockQueue.element()); System.out.println(blockQueue.poll()); System.out.println(blockQueue.poll()); System.out.println(blockQueue.poll());//不拋出異常,返回null System.out.println(blockQueue.peek());//查看隊首元素值,返回null System.out.println(blockQueue.element()); }
-
阻塞等待
//3.阻塞,一直等着直到隊列中有空的位置 public static void test3() throws InterruptedException { ArrayBlockingQueue<String> blockQueue = new ArrayBlockingQueue<>(3); blockQueue.put("A"); blockQueue.put("B"); blockQueue.put("C"); // blockQueue.put("D");//會一直阻塞等待 System.out.println(blockQueue.take()); System.out.println(blockQueue.take()); System.out.println(blockQueue.take()); System.out.println(blockQueue.take());//會一直阻塞等待 }
-
超時等待
//超時退出 public static void test4() throws InterruptedException { ArrayBlockingQueue<String> blockQueue = new ArrayBlockingQueue<>(3); blockQueue.offer("A"); blockQueue.offer("B"); blockQueue.offer("C"); blockQueue.offer("D",2, TimeUnit.SECONDS); //等待超過兩秒退出 System.out.println(blockQueue.poll()); System.out.println(blockQueue.poll()); System.out.println(blockQueue.poll()); System.out.println(blockQueue.poll(2,TimeUnit.SECONDS)); }
同步隊列
-
同步隊列:SynchronousQueue
-
實際上是AbstractQueue(非阻塞隊列)的子類,但是又繼承了BlockingQueue(阻塞隊列)
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable
-
這個隊列沒有容量。
-
1個線程要往隊列里插入內容,必須是有另一個往隊列里刪除內容的線程存在,兩個線程共存才能插入、刪除成功。否則無法插入,也沒有元素可以刪除( poll() 會返回 null)。
-
這個隊列也不能迭代,因為沒有容量。
package com.rzp.rw; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; //同步隊列 public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue synchronousQueue = new SynchronousQueue(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
線程池
池化技術
-
程序的運行,本質就是占用系統的資源,為了優化資源的使用,就出現了池化技術。
-
線程池、連接池、內存池、對象池。。。
-
池化技術:因為開啟和關閉特別消耗資源,池化技術的核心就是事先准備好一定的資源,需要用的時候在這里取,用完以后再還回去。
好處
-
因為減少了創建和關閉資源的行為,因此:
-
降低資源的消耗
-
提高響應的速度
-
方便管理
-
-
線程復用,可以控制最大並發數,管理線程。
阿里巴巴
-
摘自阿里巴巴開發手冊
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool :
允許的請求隊列長度為 Integer.MAX_VALUE(約為21億),可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool :
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
三大方法
package com.rzp.pool; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Demo1 { public static void main(String[] args) { ExecutorsTest(); } public static void ExecutorsTest() { //這個就是使用Executor創建線程池的3大方法。 // ExecutorService threadPool = Executors.newSingleThreadExecutor();//單個線程 // ExecutorService threadPool = Executors.newFixedThreadPool(5); //創建一個固定大小的線程池 ExecutorService threadPool = Executors.newCachedThreadPool(); //創建可伸縮線程池 try { for (int i = 0; i < 10; i++) { //從線程池中獲取線程 threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //關閉線程池 threadPool.shutdown(); } } }

七大參數
三大方法的源碼:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService //new ThreadPoolExecutor (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { //new ThreadPoolExecutor return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { //new ThreadPoolExecutor return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
7大參數
-
可以看到,3個方法其實都是調用new ThreadPoolExecutor
// ThreadPoolExecutor構造方法的7個參數,就是所謂的7大參數 public ThreadPoolExecutor(int corePoolSize, //核心線程池大小 int maximumPoolSize, //最大線程池大小 long keepAliveTime, //超時等待時間 TimeUnit unit, //超時單位 BlockingQueue<Runnable> workQueue //阻塞隊列 ThreadFactory threadFactory, //線程工廠 RejectedExecutionHandler handler //拒絕策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;
作用
-
7大參數是如何起作用的:
-
線程工廠:創建線程的工廠,這個不需要改變。
-
核心線程池大小:開啟線程池后,就會啟用的線程數量,設為2。
-
假如核心線程池都被占用,也就是2條都在使用了,這時候如果有第3個程序要求調用,第3個程序就會進入阻塞隊列之中等待:
-
-
阻塞隊列:用於存放等待獲取線程的容器。
-
假如核心線程池中有線程釋放了,3就會獲得線程。
-
假如核心線程一直在執行,這時又來了4、5...,這些都會進入阻塞隊列,一旦阻塞隊列也滿了,這時候就會開啟新的線程:
-
-
最大線程池大小:最多允許開始的線程數量,阻塞隊列滿員后逐個開放使用,直到達到最大值。
-
一旦達到最大值了、並且阻塞隊列也滿了,就會啟用拒絕策略:
-
-
拒絕策略:最大線程池、阻塞隊列都滿員的情況下,如果還有新的程序要求調用線程,就會按拒絕策略對該程序反饋。
-
超時等待時間、超時單位:非核心線程池中的線程,超過等待時間都沒有被調用,就會被關閉。
-
也就是說,上面的例子中,如果最后阻塞隊列空了,所有線程也都釋放了,那么除了核心線程中的2條線程會持續開啟以外,其他線程超過等待時間后就會關閉掉。
-
創建線程池
public static void ThreadPoolTest() { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (int i = 0; i < 15; i++) { int num = i; //從線程池中獲取線程 threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "=="+num+"ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //關閉線程池 threadPool.shutdown(); } }
拒絕策略
-
就是這4個類
/** 拋出RejectedExecutionException異常 */ public static class AbortPolicy implements RejectedExecutionHandler //測試結果:拋出異常 /** 在原線程中直接調用run方法,讓原線程執行 * 在下面例子中,把for循環數量調高,可以看到線程名稱是main * 如果再來就會阻塞等待 */ public static class CallerRunsPolicy implements RejectedExecutionHandler //測試結果:表面看沒有異常,而且所有任務都會執行完 /** 把隊首的任務丟棄,然后把新任務放在隊尾 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler //測試結果,會有任務丟失 /**隊列滿了,不會拋出異常,但是任務會被拋棄 */ public static class DiscardPolicy implements RejectedExecutionHandler //測試結果,會有任務丟失
使用策略
-
如何定義線程池的最大數量?一般有兩種策略
-
Cpu密集型 :按處理器數量定義,Cpu利用率最高。
//獲取處理器數量 System.out.println(Runtime.getRuntime().availableProcessors()); //用處理器數量定義 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
-
IO密集型 :判斷程序中消耗IO的線程,定義為IO線程的兩倍
學習ForkJoin前需要對1.8的函數式接口學習
四大函數式接口
函數式接口
-
只有一個抽象方法的接口。
四大函數式接口
-
是指
java.util.function
這個包下的這四個接口:
Consumer
Function
Predicate
Supplier
-
這四個是function這個包下主要的四個接口,其他都是這四個接口的輔助包,可以從一些名字看出來
Function
-
Function這個接口要調用apply()方法,輸入和輸入都可以是object(任意類型)
public static void main(String[] args) { //Function函數型接口原始寫法 Function function = new Function<String,String>() { @Override public String apply(String o) { return o; } }; System.out.println(function.apply("asd0"));; //Function lamda表達式簡寫 ,寫一個工具方法就很簡單了 Function<String,String> functiong = (str)->{return str;}; }
對比scala語言,scala寫一個函數,雖然沒scala簡單,但是確實比沒函數式接口之前簡單很多了:
val f2 = (n1:Int,n2:Int) => n1+n2
Predicate
-
調用test方法,輸入是object,返回是Boolean,,用於判斷
//predicate的特點是輸入是object,返回的只能是Boolean,調用的方法名是test,用於判斷 public static void predicate(){ Predicate<String> emptyIf = str -> { return str.isEmpty();}; System.out.println(emptyIf.test(" "));; }
Consumer
-
調用accept()方法,輸入是object,沒有返回值
public static void ComsumeTest(){ //Consumer accept(T t); 只有輸入,沒有返回值 Consumer<String> consumer = str ->{System.out.println(str); }; consumer.accept("123"); }
Supplier
-
調用get(T t); 沒有輸入,只有返回值
public static void SupplierTest(){ //Consumer get(T t); 沒有輸入,只有返回值 Supplier supplier = () ->{return 1024; }; System.out.println(supplier.get()); }
Stream流式計算
-
和scala那套那不多,寫起來還復雜點
package com.rzp.streamCount; import com.sun.xml.internal.ws.api.model.wsdl.WSDLOutput; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Stream; public class Demo1 { public static void main(String[] args) { User user1 = new User(1, "a", 21); User user2 = new User(2, "b", 22); User user3 = new User(3, "c", 23); User user4 = new User(4, "d", 24); User user5 = new User(5, "e", 25); //存儲交給集合 List<User> users = Arrays.asList(user1, user2, user3, user4, user5); //計算交給流 //轉換成流 Stream<User> stream = users.stream(); //使用流的filter函數式接口,filter只能輸入Predicate函數 //和scala非常類似,就是把容器中的每個元素放在函數里,把判定為true的放在一個新的數組里 Stream<User> filterStream = stream.filter(u -> { return u.getId() % 2 == 0; }); //map,輸入Function函數 Stream<String> userStream = filterStream.map(u -> { return u.getName().toUpperCase(); }); //排序 Stream<String> sorted = userStream.sorted((u1, u2) -> { return u1.compareTo(u2); }); //只要第一個 Stream<String> limit = sorted.limit(1); //foreach,只能輸入Consumer函數遍歷每個元素 limit.forEach(System.out::println); //和scala一樣,寫成一行代碼 users.stream().filter(u->{return u.getId() % 2 == 0;}) .map(u -> {return u.getName().toUpperCase();}) .sorted((u1, u2) -> {return u1.compareTo(u2);}).limit(1).forEach(System.out::println); } }