java的线程是通过java.lang.Thread类来实现的。
在Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。
第一是创建状态。在生成线程对象,并没有调用该对象的start方法,这是线程处于创建状态。
第二是就绪状态。当调用了线程对象的start方法之后,该线程就进入了就绪状态,但是此时线程调度程序还没有把该线程设置为当前线程,此时处于就绪状态。在线程运行之后,从等待或者睡眠中回来之后,也会处于就绪状态。
第三是运行状态。线程调度程序将处于就绪状态的线程设置为当前线程,此时线程就进入了运行状态,开始运行run函数当中的代码。
第四是阻塞状态。线程正在运行的时候,被暂停,通常是为了等待某个时间的发生(比如说某项资源就绪)之后再继续运行。sleep,suspend,wait等方法都可以导致线程阻塞。
第五是死亡状态。如果一个线程的run方法执行结束或者调用stop方法后,该线程就会死亡。对于已经死亡的线程,无法再使用start方法令其进入就绪。
线程的初始化
1)继承Thread
run()与start()方法的区别
a.start()方法来启动线程,真正实现了多线程运行。这时无需等待run方法体代码执行完毕,可以直接继续执行下面的代码;通过调用Thread类的start()方法来启动一个线程, 这时此线程是处于就绪状态, 并没有运行。 然后通过此Thread类调用方法run()来完成其运行操作的, 这里方法run()称为线程 体,它包含了要执行的这个线程的内容, Run方法运行结束, 此线程终止。然后CPU再调度其它线程。
b.run()方法当作普通方法的方式调用。程序还是要顺序执行,要等待run方法体执行完毕后,才可继续执行下面的代码; 程序中只有主线程——这一个线程, 其程序执行路径还是只有一条, 这样就没有达到写线程的目的。
2)实现Runnable接口
3)实现Callable接口 + FutureTask
可以拿到返回结果,可以处理异常
多线程运行FutureTask,只会运行一次,其他线程会引用第一次计算的结果
4)线程池
方式一:启动线程池(Executors可根据相关需求创建线程方法)
//执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
ExecutorService service = Executors.newCachedThreadPool(10);
//一个任务一个任务的执行,一池一线程
ExecutorService service = Executors.newSingleThreadExecutor()
//执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
ExecutorService service = Executors.newCachedThreadPool()
方式二:启动线程池
int corePoolSize = 10;//核心线程数;线程池启动后默认创建的线程数量【除非设置了allowCoreThreadTimeOut(回收核心线程)】,等待调用去执行异步任务 int maximumPoolSize = 200;//最大线程数量;控制资源 long keepAliveTime = 10;//存活时间;在当前的线程数 > 核心线程数时,释放空闲线程的时间 TimeUnit unit = TimeUnit.SECONDS;//时间单位 BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();//阻塞队列,如果任务有很多就会放在这里,有空闲线程就会取出该队列里的任务(队列方式有多种,看相关接口介绍,设置队列的数量根据系统测试后能承受的最大数量,否则会导致内存不够) ThreadFactory threadFactory = Executors.defaultThreadFactory();//线程创建工厂(可以根据自己相关需求,重写这个创建工厂) RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//如果队列满了,按照指定的拒绝策略执行任务(根据相关需求,制定一些相关拒绝策略) ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
工作流程:
a.线程池创建,启动好核心的线程数量,准备接受任务
b.新的任务来了,用先用核心空闲线程去执行,如果核心线程都在运行状态 ,就将任务放置到阻塞队列中,核心线程空闲后去阻塞队列获取任务执行
c.阻塞线程满了,就创建新的线程执行(创建新的线程最大只能开到maximumPoolSize指定的数量)
d.如最大线程都满了,且 就执行handler(拒绝策略)
e.线程把所有任务都执行完,有空闲线程,就会在指定存活的时间,释放空闲线程
submit()与execute()区别
submit()是可以取得返回值
以上四种方法的区别
a.Thread与Runnable不能得到返回值,使用Callable+FutrueTask可以取得
b.Thread、Runnable、Callable+FutrueTask不能控制资源,线程池可以
wait与sleep的区别
功能都是当前线程暂停
wait会放开手去睡,放开手里的锁
sleep握紧手去睡,醒了手里还有锁
CountDownLatchDemo(减少计数)
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
public class CountDownLatchDemo{ public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); //6个上自习的同学,各自离开教室的时间不一致 for (int i = 1; i <=6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室"); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长"); } } `
CyclicBarrier(循环栅栏)
* 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
* 直到最后一个线程到达屏障时,屏障才会开门,所有
* 被屏障拦截的线程才会继续干活。
* 线程进入屏障通过CyclicBarrier的await()方法。
public class CyclicBarrierDemo{ private static final int NUMBER = 7; public static void main(String[] args) { //CyclicBarrier(int parties, Runnable barrierAction) CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{ System.out.println("*****集齐7颗龙珠就可以召唤神龙"); }) ; for (int i = 1; i <= 7; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName()+"\t 星龙珠被收集 "); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
Semaphore(信号灯)
* acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
* 要么一直等下去,直到有线程释放信号量,或超时。
* release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
* 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
public class SemaphoreDemo{ public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);//模拟3个停车位 for (int i = 1; i <=6; i++) //模拟6部汽车 { new Thread(() -> { try{ semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"\t 抢到了车位"); TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName()+"\t------- 离开"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }, String.valueOf(i)).start(); } } }
CompletableFuture
1.创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
a.以run开头的方法都是没有返回结果的,supply都是可以获取到返回结果的
b.有Executor参数的方法是可以传入自定义的线程池,没有的就使用默认的线程池
1) runAsync方法
2)supplyAsync 方法
2.计算完成时回调方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1)whenComplete方法

返回异常处理结果
使用handle方法执行完成后的处理
2)whenCompleteAsync
whenComplete 可以处理正常和异常的计算结果,exceptionally处理异常情况
whenComplete 和 whenCompleteAsync 的区别
whenComplete :是执行当前任务的线程继续执行whenComplete的任务
whenCompleteAsync :是执行的当前任务继续提交给线程池来执行(可能是同一个线程(是相同的线程池)执行,也可能是其他线程执行)
3.线程串行化方法(上个任务执行完,才能处理下一步任务的)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);public CompletableFuture<Void> thenAccept(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
1).加了async的方法,是执行完上一个任务,会再开另一个线程来处理下一个任务,默认是异步执行的,否则是跟上一个任务用同一个线程
2).thenAccept方法,消费处理结果,接收上一个任务的结果进行处理
3).thenRun方法,无法获取上一步的执行结果
4).thenApply方法,当一个线程依赖另一个线程时,获取上一个任务的结果,并返回当前任务的返回值
4.两任务组合(都需求完成后)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
1)thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
2)thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
3)runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
5.两个任务组合有一个完成
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
1)applyToEither:两个任务有一个执行完成,获取它的返回值 ,并处理任务并有新的返回值
2)acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
3)runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值
6.多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
1)allOf:等待所有任务完成
2)anyOf:只要有一个任务完成