自定義線程池--拒絕策略


自定義線程池的實現   

使用了設計模式之 策略模式 

  • 阻塞隊列BlockingQueue用於暫存來不及被線程執行的任務
    • 也可以說是平衡生產者和消費者執行速度上的差異
    • 里面的獲取任務和放入任務用到了生產者消費者模式
  • 線程池中對線程Thread進行了再次的封裝,封裝為了Worker
    • 在調用任務的run方法時,線程會去執行該任務,執行完畢后還會到阻塞隊列中獲取新任務來執行
  • 線程池中執行任務的主要方法為execute方法
    • 執行時要判斷正在執行的線程數是否大於了線程池容量
線程池組件  Thread pool(消費者)   Blocking Queue   put <- main(生產者)
任務隊列里面也放不下  put 的時候會一直在wait處一直阻塞  線程池的兩個任務一直在執行一個時間很長的任務  對主線程很不友好   我們可以設計一個拒絕策略

 

import com.sun.corba.se.spi.orbutil.threadpool.Work;
import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 思考 如果 任務隊列里面也放不下了怎么辦 * put 的時候會一直在wait處一直阻塞 線程池的兩個任務一直在執行一個時間很長的任務 對主線程很不友好 * 我們可以設計一個拒絕策略 帶超時時間的添加方法 */ @Slf4j public class MyThreadPool { // 線程池 享元模式 // 組件 Thread pool Blocking Queue put <- main public static void main(String[] args) { final ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> { //1.死等  queue.put(task); //2.超時 queue.offer //3.啥也不做 //4.拋出異常 throw new RuntimeException("任務執行失敗",+task) //5.task.run() 讓主線程自己去執行  }); for (int i = 0; i < 15; i++) { // i 是變化的不能直接被 里面的lambda 直接使用 int j = i; threadPool.excute(()->{ try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}",j); }); } } } @FunctionalInterface // 拒絕策略 interface RejectPolicy<T>{// BlockingQueue實現了泛型 我們這個接口也可以設計為泛型 今后可以接收更多的參數類型 // 而不一定是Runnable 以后也可能是Callable void reject(BlockingQueue<T> queue , T task); // 參數看 需要把哪些信息傳遞給哪個接口 } @Slf4j class ThreadPool{ class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run(){ //執行任務 // 1) task不為空 執行任務 // 2) task 執行完畢 再接着從任務隊列取出任務並執行 while (task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){// take還是poll對應兩種策略 //使用take 線程不會讓線程移除 而是阻塞等待 // 使用poll 超時后 沒有新任務后 移除線程 try { log.debug("正在執行{}",task); task.run(); }catch(Exception e) { e.printStackTrace(); }finally { task = null; } } synchronized (workers){ log.debug("worker被移除{}",this); workers.remove(this);//退出循環 移除Worker對象  } } } // 任務隊列 private BlockingQueue<Runnable> taskQueue; // 線程集合 // 直接使用Thread 能夠包含的信息有限 所以包裝成一個worker類 private HashSet<Worker> workers = new HashSet<>(); //核心線程數 private int coreSize; //獲取任務的超時時間 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; // 執行任務 public void excute(Runnable task){ //注意 workers 沒有線程安全的保障 且屬於是共享資源 // 當任務數沒有超過coreSize時 直接交給 任務對象 超過了加入任務隊列暫存起來 synchronized (workers){ if(workers.size()<coreSize){ //線程數不夠 Worker worker = new Worker(task); log.debug("新增 worker {},{}",worker,task); workers.add(worker); worker.start(); }else { // taskQueue.put(task); // 1.死等 2.帶超時等待 3.放棄任務執行 4.拋出異常 5.讓調用者自己執行任務 // 這里可能有多種策略 我們不應該寫死在線程池的代碼中 寫好多else if // 設計模式 策略模式 決策下放 讓調用者來決策 給出具體的實現 // 策略模式 把具體的操作抽象成一個接口 具體的實現由調用者傳遞進來 函數式接口 // 把隊列滿時的操作抽象成一個 接口的抽象方法  taskQueue.tryPut(rejectPolicy,task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } } @Slf4j class BlockingQueue<T>{ //1.任務隊列 private Deque<T> queue = new ArrayDeque<>(); // lock 使用這個更加靈活 private ReentrantLock lock = new ReentrantLock(); //3.消費者 和生產者的條件變量 private Condition fullWaitSet = lock.newCondition(); //生產者 private Condition emptyWaitSet = lock.newCondition(); //容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } //帶超時時間 public T poll(long timeout, TimeUnit unit){ lock.lock(); try { long nanos = unit.toNanos(timeout); // 看看是否已經空了 空了要等待 while (queue.isEmpty()){ // 為什么要用while 不用if try { //沒等夠 下次還會等nanos //但是 這個函數返回值是剩余時間 if(nanos < 0) return null; nanos = emptyWaitSet.awaitNanos(nanos);//可能還是會存在一個虛假喚醒的問題??  }catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞獲取 public T take(){ lock.lock(); try { // 看看是否已經空了 空了要等待 while (queue.isEmpty()){ // 為什么要用while 不用if try { emptyWaitSet.await(); }catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T element){ lock.lock(); try { while (queue.size()==capcity){ log.debug("等待加入任務隊列{}...",element); fullWaitSet.await(); } log.debug("加入任務隊列{}",element); queue.addLast(element); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } // 帶超時時間的阻塞添加 public boolean offer(T element,long timeout,TimeUnit timeUnit){ lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size()==capcity){ try { log.debug("等待加入任務隊列{}...",element); if(nanos <=0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任務隊列{}",element); queue.addLast(element); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } public int size(){ lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if(queue.size()==capcity){ rejectPolicy.reject(this,task); }else { log.debug("加入任務隊列{}",task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM