线程、线程池、CompletableFuture异步编排


java的线程是通过java.lang.Thread类来实现的。

在Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。
  第一是创建状态。在生成线程对象,并没有调用该对象的start方法,这是线程处于创建状态。
  第二是就绪状态。当调用了线程对象的start方法之后,该线程就进入了就绪状态,但是此时线程调度程序还没有把该线程设置为当前线程,此时处于就绪状态。在线程运行之后,从等待或者睡眠中回来之后,也会处于就绪状态。
  第三是运行状态。线程调度程序将处于就绪状态的线程设置为当前线程,此时线程就进入了运行状态,开始运行run函数当中的代码。
  第四是阻塞状态。线程正在运行的时候,被暂停,通常是为了等待某个时间的发生(比如说某项资源就绪)之后再继续运行。sleep,suspend,wait等方法都可以导致线程阻塞。
  第五是死亡状态。如果一个线程的run方法执行结束或者调用stop方法后,该线程就会死亡。对于已经死亡的线程,无法再使用start方法令其进入就绪。

线程的初始化

1)继承Thread

       

  run()与start()方法的区别

    a.start()方法来启动线程,真正实现了多线程运行。这时无需等待run方法体代码执行完毕,可以直接继续执行下面的代码;通过调用Thread类的start()方法来启动一个线程, 这时此线程是处于就绪状态, 并没有运行。 然后通过此Thread类调用方法run()来完成其运行操作的, 这里方法run()称为线程                 体,它包含了要执行的这个线程的内容, Run方法运行结束, 此线程终止。然后CPU再调度其它线程。

            b.run()方法当作普通方法的方式调用。程序还是要顺序执行,要等待run方法体执行完毕后,才可继续执行下面的代码; 程序中只有主线程——这一个线程, 其程序执行路径还是只有一条, 这样就没有达到写线程的目的。

2)实现Runnable接口

  

3)实现Callable接口 + FutureTask

  

  可以拿到返回结果,可以处理异常

  多线程运行FutureTask,只会运行一次,其他线程会引用第一次计算的结果

4)线程池

  方式一:启动线程池(Executors可根据相关需求创建线程方法)

//执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
ExecutorService service = Executors.newCachedThreadPool(10);

//一个任务一个任务的执行,一池一线程
ExecutorService service = Executors.newSingleThreadExecutor()

//执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
ExecutorService service = Executors.newCachedThreadPool()

  方式二:启动线程池

int corePoolSize = 10;//核心线程数;线程池启动后默认创建的线程数量【除非设置了allowCoreThreadTimeOut(回收核心线程)】,等待调用去执行异步任务
int maximumPoolSize = 200;//最大线程数量;控制资源
long keepAliveTime = 10;//存活时间;在当前的线程数 > 核心线程数时,释放空闲线程的时间
TimeUnit unit = TimeUnit.SECONDS;//时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();//阻塞队列,如果任务有很多就会放在这里,有空闲线程就会取出该队列里的任务(队列方式有多种,看相关接口介绍,设置队列的数量根据系统测试后能承受的最大数量,否则会导致内存不够)
ThreadFactory threadFactory = Executors.defaultThreadFactory();//线程创建工厂(可以根据自己相关需求,重写这个创建工厂)
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//如果队列满了,按照指定的拒绝策略执行任务(根据相关需求,制定一些相关拒绝策略)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);

       工作流程:

             a.线程池创建,启动好核心的线程数量,准备接受任务

             b.新的任务来了,用先用核心空闲线程去执行,如果核心线程都在运行状态 ,就将任务放置到阻塞队列中,核心线程空闲后去阻塞队列获取任务执行

             c.阻塞线程满了,就创建新的线程执行(创建新的线程最大只能开到maximumPoolSize指定的数量)

     d.如最大线程都满了,且 就执行handler(拒绝策略)

      e.线程把所有任务都执行完,有空闲线程,就会在指定存活的时间,释放空闲线程

       submit()与execute()区别

  submit()是可以取得返回值 

以上四种方法的区别 

        a.Thread与Runnable不能得到返回值,使用Callable+FutrueTask可以取得

        b.Thread、Runnable、Callable+FutrueTask不能控制资源,线程池可以

wait与sleep的区别

  功能都是当前线程暂停

  wait会放开手去睡,放开手里的锁

  sleep握紧手去睡,醒了手里还有锁

CountDownLatchDemo(减少计数)

  CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。

  其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)

  当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

public class CountDownLatchDemo{   
    public static void main(String[] args) throws InterruptedException   {                         
            CountDownLatch countDownLatch = new CountDownLatch(6); 
             //6个上自习的同学,各自离开教室的时间不一致
             for (int i = 1; i <=6; i++)  {
                  new Thread(() -> {              
                       System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室");              
                       countDownLatch.countDown();          
                  }, String.valueOf(i)).start();       
              }       
              countDownLatch.await();       
              System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长");             
     }  
}     ` 

CyclicBarrier(循环栅栏)

* 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

 * 直到最后一个线程到达屏障时,屏障才会开门,所有 

* 被屏障拦截的线程才会继续干活。 

* 线程进入屏障通过CyclicBarrier的await()方法。

public class CyclicBarrierDemo{  
     private static final int NUMBER = 7;    
     public static void main(String[] args)  {     
          //CyclicBarrier(int parties, Runnable barrierAction)           
          CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{
               System.out.println("*****集齐7颗龙珠就可以召唤神龙");
          }) ;    
      
          for (int i = 1; i <= 7; i++) {       
               new Thread(() -> {          
                  try { 
                      System.out.println(Thread.currentThread().getName()+"\t 星龙珠被收集 ");            
                      cyclicBarrier.await();          
                 } catch (InterruptedException | BrokenBarrierException e) { 
                      // TODO Auto-generated catch block            
                     e.printStackTrace();          
                 }              
             }, String.valueOf(i)).start();     
         }        
     }
}    

Semaphore(信号灯)

 * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), 

 * 要么一直等下去,直到有线程释放信号量,或超时。

 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 

 * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

public class SemaphoreDemo{  
    public static void main(String[] args)  {     
        Semaphore semaphore = new Semaphore(3);//模拟3个停车位          
        for (int i = 1; i <=6; i++) //模拟6部汽车     {       
             new Thread(() -> {          
                 try{            
                     semaphore.acquire();            
                     System.out.println(Thread.currentThread().getName()+"\t 抢到了车位");            
                     TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                     System.out.println(Thread.currentThread().getName()+"\t------- 离开");          
                } catch (InterruptedException e) { 
                     e.printStackTrace();          
                }finally {            
                     semaphore.release();          
                }       
             }, String.valueOf(i)).start();     
        }       
    }
} 

  

CompletableFuture

1.创建异步对象

CompletableFuture提供了四个静态方法来创建一个异步操作

public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}

a.以run开头的方法都是没有返回结果的,supply都是可以获取到返回结果的

b.有Executor参数的方法是可以传入自定义的线程池,没有的就使用默认的线程池

1) runAsync方法

 

 

 2)supplyAsync 方法

 

 

 2.计算完成时回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1)whenComplete方法

    返回异常处理结果 

    

 

 

    使用handle方法执行完成后的处理

   

   2)whenCompleteAsync 

whenComplete 可以处理正常和异常的计算结果,exceptionally处理异常情况
whenComplete 和 whenCompleteAsync 的区别
whenComplete :是执行当前任务的线程继续执行whenComplete的任务
whenCompleteAsync :是执行的当前任务继续提交给线程池来执行(可能是同一个线程(是相同的线程池)执行,也可能是其他线程执行)

 

 

 

3.线程串行化方法(上个任务执行完,才能处理下一步任务的)

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);public CompletableFuture<Void> thenAccept(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

  1).加了async的方法,是执行完上一个任务,会再开另一个线程来处理下一个任务,默认是异步执行的,否则是跟上一个任务用同一个线程

  2).thenAccept方法,消费处理结果,接收上一个任务的结果进行处理

  3).thenRun方法,无法获取上一步的执行结果

  4).thenApply方法,当一个线程依赖另一个线程时,获取上一个任务的结果,并返回当前任务的返回值 

4.两任务组合(都需求完成后)

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值 

  2)thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值

  3)runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

5.两个任务组合有一个完成

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)applyToEither:两个任务有一个执行完成,获取它的返回值 ,并处理任务并有新的返回值 

  2)acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值

  3)runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值

6.多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

  1)allOf:等待所有任务完成 

  2)anyOf:只要有一个任务完成 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM