Spring通過任務執行器(TaskExecutor)來實現多線程和並發編程。使用ThreadPoolTaskExecutor可實現一個基於線程池的TaskExecutor。而實際開發中任務一般是非阻礙的,即異步的,所以我們要在配置類中通過@EnableAsync 開啟對異步任務的支持,並通過實際執行Bean的方法中使用@Async注解來聲明其是一個異步任務。
示例:
1.配置類。
package com.cnpiec.ireader.config;
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@ComponentScan("com.cnpiec.ireader")
@EnableAsync
// 線程配置類
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的處理流程
// 當池子大小小於corePoolSize,就新建線程,並處理請求
// 當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去workQueue中取任務並處理
// 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
// 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);// 最小線程數
taskExecutor.setMaxPoolSize(10);// 最大線程數
taskExecutor.setQueueCapacity(25);// 等待隊列
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
2.任務執行類
package com.cnpiec.ireader.service;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
import com.alibaba.fastjson.JSONObject;
import com.cnpiec.ireader.dao.GetBookDataDao;
import com.cnpiec.ireader.model.Book;
import com.cnpiec.ireader.model.Chapter;
import com.cnpiec.ireader.utils.FileNameUtils;
@Service
// 線程執行任務類
public class AsyncDownloadBookTaskService {
private Logger logger = LoggerFactory.getLogger(AsyncDownloadBookTaskService.class);
@Autowired
private GetBookDataDao getBookDataDao;
@Async
// 表明是異步方法
// 無返回值
public void executeAsyncTask(List<Book> list, String clientId, String clientSecret) {
System.out.println(Thread.currentThread().getName() + "開啟新線程執行");
for (Book book : list) {
String name = book.getName();
File file = new File("iReaderResource/" + name);
if (file.mkdirs()) {
logger.info("文件夾創建成功!創建后的文件目錄為:" + file.getPath());
}
try {
getChapterList(book, clientId, clientSecret);
} catch (Exception e) {
logger.error("多線程下載書籍失敗:" + book.getBookId(), e);
e.printStackTrace();
}
}
}
/**
* 異常調用返回Future
*
* @param i
* @return
* @throws InterruptedException
*/
@Async
public Future<String> asyncInvokeReturnFuture(List<Book> list, String clientId, String clientSecret)
throws InterruptedException {
//業務代碼
Future<String> future = new AsyncResult<String>("success");// Future接收返回值,這里是String類型,可以指明其他類型
return future;
}
/**
* 根據書籍ID獲取章節創建文件並寫入章節信息
*
* @param book
* @param clientId
* @param clientSecret
* @throws Exception
*/
private void getChapterList(Book book, String clientId, String clientSecret) throws Exception {
String stempSign = clientId + clientSecret + book.getBookId();
String sign = DigestUtils.md5DigestAsHex(stempSign.getBytes());
CloseableHttpClient httpclient = HttpClients.createDefault();
StringBuffer sb = new StringBuffer();
sb.append("http://api.res.ireader.com/api/v2/book/chapterList?").append("bookId=").append(book.getBookId())
.append("&clientId=").append(clientId).append("&sign=").append(sign).append("&resType=json");
HttpGet httpget = new HttpGet(sb.toString());
CloseableHttpResponse response = httpclient.execute(httpget);
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == 200) {
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity, "utf-8");
List<Chapter> chapterList = JSONObject.parseArray(result, Chapter.class);
for (Chapter chapter : chapterList) {
File file = new File("iReaderResource/" + book.getName() + "/"
+ FileNameUtils.replaceSpecialCharacters(chapter.getTitle()) + ".txt");
if (file.createNewFile()) {
logger.info("創建章節文件成功:" + file.getPath());
}
String filePath = file.getPath();
getChapterInfo(chapter, book, clientId, clientSecret, filePath);
}
getBookDataDao.updateBookStatus(book.getBookId());
}
}
/**
* 獲取章節信息寫入文本文件
*
* @param chapter
* @param book
* @param clientId
* @param clientSecret
* @param filePath
* @throws Exception
*/
private void getChapterInfo(Chapter chapter, Book book, String clientId, String clientSecret, String filePath)
throws Exception {
String stempSign = clientId + clientSecret + book.getBookId() + chapter.getChapterId();
String sign = DigestUtils.md5DigestAsHex(stempSign.getBytes());
CloseableHttpClient httpclient = HttpClients.createDefault();
StringBuffer sb = new StringBuffer();
sb.append("http://api.res.ireader.com/api/v2/book/chapterInfo?").append("bookId=").append(book.getBookId())
.append("&chapterId=").append(chapter.getChapterId()).append("&clientId=").append(clientId)
.append("&sign=").append(sign).append("&resType=json");
HttpGet httpget = new HttpGet(sb.toString());
CloseableHttpResponse response = httpclient.execute(httpget);
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == 200) {
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity, "utf-8");
Chapter chapter2 = JSONObject.parseObject(result, Chapter.class);
String content = chapter2.getContent();
// 寫文件內容
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(filePath), true));
writer.write(content);
writer.close();
}
}
}
3.運行
for (int i = 0; i < list2.size(); i++) {
asyncTaskService.executeAsyncTask(list2.get(i),clientId,clientSecret);
}
