【spring boot】在spring boot下使用多線程


 

使用場景:

方法處理到某一步,需要將信息交給另一個線程去處理!!

===================================================================================

第一種:最簡單的Runnable

    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);
        Runnable runnable = dealMsg(msg);
    //將返回的runnable對象傳入,並start()啟動線程
     new Thread(runnable).start(); }
復制代碼
//創建一個Runnable,重寫run方法
public Runnable dealMsg(String msg){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println("新開線程中處理:"+msg); } }; return runnable; }
復制代碼

 

====================================================================================================

第二種:自己創建JDK線程池,交給spring管理,然后將任務交給線程池即可

1.創建線程池,交給spring管理

package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     創建一個指定工作線程數量的線程池。每當提交一個任務就創建一個工作線程,如果工作線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。

     newCachedThreadPool
     創建一個可緩存的線程池。這種類型的線程池特點是: 
     1).工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。 
     2).如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鍾),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創建一個工作線程。

     newSingleThreadExecutor
     創建一個單線程化的Executor,即只創建唯一的工作者線程來執行任務,如果這個線程異常結束,會有另一個取代它,保證順序執行(我覺得這點是它的特色)。單工作線程最大的特點是可保證順序地執行各個任務,並且在任意給定的時間不會有多個線程是活動的 。

     newScheduleThreadPool
     創建一個定長的線程池,而且支持定時的以及周期性的任務執行,類似於Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
復制代碼
package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     創建一個指定工作線程數量的線程池。每當提交一個任務就創建一個工作線程,如果工作線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。

     newCachedThreadPool
     創建一個可緩存的線程池。這種類型的線程池特點是: 
     1).工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。 
     2).如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鍾),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創建一個工作線程。

     newSingleThreadExecutor
     創建一個單線程化的Executor,即只創建唯一的工作者線程來執行任務,如果這個線程異常結束,會有另一個取代它,保證順序執行(我覺得這點是它的特色)。單工作線程最大的特點是可保證順序地執行各個任務,並且在任意給定的時間不會有多個線程是活動的 。

     newScheduleThreadPool
     創建一個定長的線程池,而且支持定時的以及周期性的任務執行,類似於Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
復制代碼

2.使用它

import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分類1:可以返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "處理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分類2:不會返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分類3:也可以這樣
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}

 

復制代碼
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分類1:可以返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "處理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分類2:不會返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分類3:也可以這樣
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}
復制代碼

 

====================================================================================================

第三種:使用spring封裝的線程池

1.創建線程配置類【

@ComponentScan("com.sxd") 標明會在哪個包下使用多線程  

package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 線程配置類
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的處理流程
    // 當池子大小小於corePoolSize,就新建線程,並處理請求
    // 當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去workQueue中取任務並處理
    // 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
    // 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小線程數
        taskExecutor.setMaxPoolSize(10);// 最大線程數
        taskExecutor.setQueueCapacity(25);// 等待隊列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
復制代碼
package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 線程配置類
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的處理流程
    // 當池子大小小於corePoolSize,就新建線程,並處理請求
    // 當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去workQueue中取任務並處理
    // 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
    // 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小線程數
        taskExecutor.setMaxPoolSize(10);// 最大線程數
        taskExecutor.setQueueCapacity(25);// 等待隊列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
復制代碼

2.創建線程任務執行類

package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 線程執行任務類
public class AsyncTaskService {

    Random random = new Random();// 默認構造方法

    @Async
    // 表明是異步方法
    // 無返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"開啟新線程執行" + msg);
    }

    /**
     * 異常調用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,這里是String類型,可以指明其他類型

        return future;
    }
}
復制代碼
package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 線程執行任務類
public class AsyncTaskService {

    Random random = new Random();// 默認構造方法

    @Async
    // 表明是異步方法
    // 無返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"開啟新線程執行" + msg);
    }

    /**
     * 異常調用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,這里是String類型,可以指明其他類型

        return future;
    }
}
復制代碼

3.使用它

@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
復制代碼
@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
復制代碼

 

====================================================================================================

第四種:在代碼中啟動異步處理最簡單的代碼

復制代碼
public void test(){
    new Thread(()->doReplace(replaceLog)).start();         
}

public void doReplace(String replaceLog){
                  
            //異步處理的業務
}
復制代碼

 

======================================

就這么多,再補充噻!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM