自定义线程池的实现
使用了设计模式之 策略模式
- 阻塞队列BlockingQueue用于暂存来不及被线程执行的任务
- 也可以说是平衡生产者和消费者执行速度上的差异
- 里面的获取任务和放入任务用到了生产者消费者模式
- 线程池中对线程Thread进行了再次的封装,封装为了Worker
- 在调用任务的run方法时,线程会去执行该任务,执行完毕后还会到阻塞队列中获取新任务来执行
- 线程池中执行任务的主要方法为execute方法
- 执行时要判断正在执行的线程数是否大于了线程池容量
线程池组件 Thread pool(消费者) Blocking Queue put <- main(生产者)
任务队列里面也放不下 put 的时候会一直在wait处一直阻塞 线程池的两个任务一直在执行一个时间很长的任务 对主线程很不友好 我们可以设计一个拒绝策略
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 思考 如果 任务队列里面也放不下了怎么办 * put 的时候会一直在wait处一直阻塞 线程池的两个任务一直在执行一个时间很长的任务 对主线程很不友好 * 我们可以设计一个拒绝策略 带超时时间的添加方法 */ @Slf4j public class MyThreadPool { // 线程池 享元模式 // 组件 Thread pool Blocking Queue put <- main public static void main(String[] args) { final ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> { //1.死等 queue.put(task); //2.超时 queue.offer //3.啥也不做 //4.抛出异常 throw new RuntimeException("任务执行失败",+task) //5.task.run() 让主线程自己去执行 }); for (int i = 0; i < 15; i++) { // i 是变化的不能直接被 里面的lambda 直接使用 int j = i; threadPool.excute(()->{ try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}",j); }); } } } @FunctionalInterface // 拒绝策略 interface RejectPolicy<T>{// BlockingQueue实现了泛型 我们这个接口也可以设计为泛型 今后可以接收更多的参数类型 // 而不一定是Runnable 以后也可能是Callable void reject(BlockingQueue<T> queue , T task); // 参数看 需要把哪些信息传递给哪个接口 } @Slf4j class ThreadPool{ class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run(){ //执行任务 // 1) task不为空 执行任务 // 2) task 执行完毕 再接着从任务队列取出任务并执行 while (task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){// take还是poll对应两种策略 //使用take 线程不会让线程移除 而是阻塞等待 // 使用poll 超时后 没有新任务后 移除线程 try { log.debug("正在执行{}",task); task.run(); }catch(Exception e) { e.printStackTrace(); }finally { task = null; } } synchronized (workers){ log.debug("worker被移除{}",this); workers.remove(this);//退出循环 移除Worker对象 } } } // 任务队列 private BlockingQueue<Runnable> taskQueue; // 线程集合 // 直接使用Thread 能够包含的信息有限 所以包装成一个worker类 private HashSet<Worker> workers = new HashSet<>(); //核心线程数 private int coreSize; //获取任务的超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; // 执行任务 public void excute(Runnable task){ //注意 workers 没有线程安全的保障 且属于是共享资源 // 当任务数没有超过coreSize时 直接交给 任务对象 超过了加入任务队列暂存起来 synchronized (workers){ if(workers.size()<coreSize){ //线程数不够 Worker worker = new Worker(task); log.debug("新增 worker {},{}",worker,task); workers.add(worker); worker.start(); }else { // taskQueue.put(task); // 1.死等 2.带超时等待 3.放弃任务执行 4.抛出异常 5.让调用者自己执行任务 // 这里可能有多种策略 我们不应该写死在线程池的代码中 写好多else if // 设计模式 策略模式 决策下放 让调用者来决策 给出具体的实现 // 策略模式 把具体的操作抽象成一个接口 具体的实现由调用者传递进来 函数式接口 // 把队列满时的操作抽象成一个 接口的抽象方法 taskQueue.tryPut(rejectPolicy,task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } } @Slf4j class BlockingQueue<T>{ //1.任务队列 private Deque<T> queue = new ArrayDeque<>(); // lock 使用这个更加灵活 private ReentrantLock lock = new ReentrantLock(); //3.消费者 和生产者的条件变量 private Condition fullWaitSet = lock.newCondition(); //生产者 private Condition emptyWaitSet = lock.newCondition(); //容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } //带超时时间 public T poll(long timeout, TimeUnit unit){ lock.lock(); try { long nanos = unit.toNanos(timeout); // 看看是否已经空了 空了要等待 while (queue.isEmpty()){ // 为什么要用while 不用if try { //没等够 下次还会等nanos //但是 这个函数返回值是剩余时间 if(nanos < 0) return null; nanos = emptyWaitSet.awaitNanos(nanos);//可能还是会存在一个虚假唤醒的问题?? }catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞获取 public T take(){ lock.lock(); try { // 看看是否已经空了 空了要等待 while (queue.isEmpty()){ // 为什么要用while 不用if try { emptyWaitSet.await(); }catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T element){ lock.lock(); try { while (queue.size()==capcity){ log.debug("等待加入任务队列{}...",element); fullWaitSet.await(); } log.debug("加入任务队列{}",element); queue.addLast(element); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } // 带超时时间的阻塞添加 public boolean offer(T element,long timeout,TimeUnit timeUnit){ lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size()==capcity){ try { log.debug("等待加入任务队列{}...",element); if(nanos <=0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列{}",element); queue.addLast(element); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } public int size(){ lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if(queue.size()==capcity){ rejectPolicy.reject(this,task); }else { log.debug("加入任务队列{}",task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }