前言:
前幾天有運營部門人員反應派發紅包很慢,經常出現504或者無響應,於是由我這邊進行一個優化后,發放速度由原來的超時或者1分鍾變為幾秒。
發放流程:
活動后台導入一個xls表格,大概2W左右條,經過后台的篩選處理等邏輯后會循環調用插入數據庫的代碼。
優化過程:
分析慢的原因:
發放的時候循環發放,導致發放的紅包一多的時候要循環幾萬次,而且每次插入都是new一個對象,然后往里面set數據,最后調用 jdbcTemplate插入。
優化思路 :
-
-
-
jdbcTemplate 有個batchUpdate的api,可以通過這個api完成批處理
- 通過多線程拆分大的單元,類似於 jdk的 forkJoin,然后每個線程處理一批,最后的結果通過回調統一歸並。
-
-
拆分List代碼片段:
抽取出一個公共的接口,用於調用具體的處理方法。
public interface I2pTask<T, E> { T execute(List e, Map<String, Object> params); }
由於發放紅包需要實時展示給運營人員看,所以需要有回調處理函數,可以將不同結果的線程收集起來統一給主線程返回,但jdk的Callable又滿足不了,所以得自己新建一個類重寫Callable,這個類只負責調度處理任務以及返回執行任務結果。
public class I2pHanderCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 線程名稱 private String threadName = ""; // 需要處理的數據 private List<E> data; // 輔助參數 private Map<String, Object> params; // 具體執行任務 private I2pTask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 該線程中所有數據處理返回結果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size()); // 返回結果集 List<ResultBean<String>> resultList = new ArrayList<>(); resultList.add(task.execute(data, params)); /*resultList.add(task.execute(data, params));*/ // 循環處理每個數據 /* for (int i = 0; i < data.size(); i++) { // 需要執行的數據 E e = data.get(i); // 將數據執行結果加入到結果集中 resultList.add(task.execute(e, params)); logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1)); }*/ logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size()); resultBean.setData(resultList); resultBean.setCode(data.size()); } return resultBean; } }
具體的處理線程的類有了,那么還需要考慮來切分任務,所以新建一個 線程工具類,主要業務就是切分任務,創建具體的線程個數,統一收集結果。
public class I2pThreadUtils<T> {
private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);
// 線程個數,如不賦值,默認為5
private int threadCount = 20;
// 具體業務任務
// private I2pTask<ResultBean<String>, T> task;
// 線程池管理器
private CompletionService<ResultBean> pool = null;
public static I2pThreadUtils newInstance(int threadCount) {
I2pThreadUtils instance = new I2pThreadUtils();
threadCount = threadCount;
instance.setThreadCount(threadCount);
return instance;
}
@SuppressWarnings("rawtypes")
public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {
// 創建線程池
int num = 0;
ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
// 根據線程池初始化線程池管理器
pool = new ExecutorCompletionService<ResultBean>(threadpool);
// 開始時間(ms)
long l = System.currentTimeMillis();
// 數據量大小
int length = data.size();
// 每個線程處理的數據個數
int taskCount = length / threadCount;
// 划分每個線程調用的數據
for (int i = 0; i < threadCount; i++) {
// 每個線程任務數據list
List<T> subData = null;
if (i == (threadCount - 1)) {
subData = data.subList(i * taskCount, length);
} else {
subData = data.subList(i * taskCount, (i + 1) * taskCount);
}
// 將數據分配給各個線程
HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
// 將線程加入到線程池
pool.submit(execute);
}
// 總的返回結果集
List<ResultBean<String>> result = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
// 每個線程處理結果集
ResultBean<List<ResultBean<String>>> threadResult;
try {
threadResult = pool.take().get();
if(threadResult!=null && threadResult.getData()!=null){
System.out.println("======線程" + i + "執行完畢,返回結果數據:" + threadResult.getCode());
List<ResultBean<String>> list = threadResult.getData();
num+=threadResult.getCode();
}
System.out.println("每個線程處理結果集"+threadResult.getData());
result.addAll(threadResult.getData());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 關閉線程池
threadpool.shutdownNow();
// 執行結束時間
long end_l = System.currentTimeMillis();
logger.info("總耗時:{}ms", (end_l - l));
logger.info("總數量:{}num:", num);
return ResultBean.newInstance().setData(num);
}
public int getThreadCount() {
return threadCount;
}
public void setThreadCount(int threadCount) {
this.threadCount = threadCount;
}
}
以上
