1、config目錄下創建線程池對象
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Value("${task.pool.corePoolSize}")
private int corePoolSize;
@Value("${task.pool.maxPoolSize}")
private int maxPoolSize;
@Value("${task.pool.keepAliveSeconds}")
private int keepAliveSeconds;
@Value("${task.pool.queueCapacity}")
private int queueCapacity;
@Value("${task.pool.threadNamePrefix}")
private String threadNamePrefix;
@Bean
public Executor asyncReleaseServiceExecutor() {
logger.info("...ExecutorConfig...asyncServiceExecutor()...啟動[發布任務]線程池...");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
2、配置文件中添加對應屬性
task.pool.corePoolSize=10 task.pool.maxPoolSize=20 task.pool.keepAliveSeconds=300 task.pool.queueCapacity=999 task.pool.threadNamePrefix=Grape-
3、啟動類上開啟異步任務
@EnableAsync
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication .class, args);
}
}
4、編寫異步執行類和方法
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONPath;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@Component
public class AsyncSendSqlService {
private static final Logger logger = Logger.getLogger(AsyncSendSqlService.class);
@Async("asyncReleaseServiceExecutor")
Future<AsyncSendSqlResult> asyncSendSql(String url, String jsonRequestParams, CountDownLatch latch) {
String currentThreadName = Thread.currentThread().getName();
String httpURL = ServiceConstant.STR_HTTP + url + deliveryServerUrl;
logger.info("---線程: 【" + currentThreadName + "】 的發布地址: " + url + "---");
long start = System.currentTimeMillis();
String result = null;
try {
result = OkHttpUtil.postJson(httpURL, jsonRequestParams);
} catch (Exception e) {
logger.error("---線程: 【" + currentThreadName + "】---AsyncSendSqlService---asyncSendSql()---error:" + e);
}
logger.info("---線程: 【" + currentThreadName + "】 的返回結果 ::: " + result);
logger.info("---線程: 【" + currentThreadName + "】 的請求時間: " + (System.currentTimeMillis() - start) + " ms");
AsyncSendSqlResult asyncSendSqlResult = transformAsyncSendSqlResult(url, result);
latch.countDown();
ScriptValueSnapShotEntity scriptValueSnapShotEntity = saveSnapshoot(scriptProject.getId(), jsonRequestParams);
return new AsyncResult<>(asyncSendSqlResult);
}
}
5、調用多線程任務的方法
try {
//加閂
CountDownLatch latch = new CountDownLatch(projectUrlList.size());
for (String url : projectUrlList) {
Future<AsyncSendSqlResult> asyncSendSqlResultFuture = asyncSendSqlService.asyncSendSql(url, requestParamsJsonStr, latch);
asyncSendSqlResultList.add(asyncSendSqlResultFuture.get());
}
//等待N個線程執行完畢
latch.await();
} catch (ExecutionException e) {
logger.error("---發布多線程異常 ::: " + e);
e.getStackTrace();
} catch (InterruptedException e) {
logger.error("---發布多線程異常 ::: " + e);
e.getStackTrace();
}
注意事項:
1)異步調用的方法要單獨放置一個類中,否則@Async注解不生效
2)多個等待多個線程執行后一同處理結果,使用加門閂(CountDownLatch類)實現
3) @Async注解里面的參數和@Bean方法名稱保持一致
參考:
