前言:
前幾天有運營部門人員反應派發紅包很慢,經常出現504或者無響應,於是由我這邊進行一個優化后,發放速度由原來的超時或者1分鍾變為幾秒。
發放流程:
活動后台導入一個xls表格,大概2W左右條,經過后台的篩選處理等邏輯后會循環調用插入數據庫的代碼。
優化過程:
分析慢的原因:
發放的時候循環發放,導致發放的紅包一多的時候要循環幾萬次,而且每次插入都是new一個對象,然后往里面set數據,最后調用 jdbcTemplate插入。
優化思路 :
-
-
-
jdbcTemplate 有個batchUpdate的api,可以通過這個api完成批處理
- 通過多線程拆分大的單元,類似於 jdk的 forkJoin,然后每個線程處理一批,最后的結果通過回調統一歸並。
-
-
拆分List代碼片段:
抽取出一個公共的接口,用於調用具體的處理方法。
public interface I2pTask<T, E> { T execute(List e, Map<String, Object> params); }
由於發放紅包需要實時展示給運營人員看,所以需要有回調處理函數,可以將不同結果的線程收集起來統一給主線程返回,但jdk的Callable又滿足不了,所以得自己新建一個類重寫Callable,這個類只負責調度處理任務以及返回執行任務結果。
public class I2pHanderCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 線程名稱 private String threadName = ""; // 需要處理的數據 private List<E> data; // 輔助參數 private Map<String, Object> params; // 具體執行任務 private I2pTask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 該線程中所有數據處理返回結果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size()); // 返回結果集 List<ResultBean<String>> resultList = new ArrayList<>(); resultList.add(task.execute(data, params)); /*resultList.add(task.execute(data, params));*/ // 循環處理每個數據 /* for (int i = 0; i < data.size(); i++) { // 需要執行的數據 E e = data.get(i); // 將數據執行結果加入到結果集中 resultList.add(task.execute(e, params)); logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1)); }*/ logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size()); resultBean.setData(resultList); resultBean.setCode(data.size()); } return resultBean; } }
具體的處理線程的類有了,那么還需要考慮來切分任務,所以新建一個 線程工具類,主要業務就是切分任務,創建具體的線程個數,統一收集結果。
public class I2pThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 線程個數,如不賦值,默認為5 private int threadCount = 20; // 具體業務任務 // private I2pTask<ResultBean<String>, T> task; // 線程池管理器 private CompletionService<ResultBean> pool = null; public static I2pThreadUtils newInstance(int threadCount) { I2pThreadUtils instance = new I2pThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 創建線程池 int num = 0; ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根據線程池初始化線程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 開始時間(ms) long l = System.currentTimeMillis(); // 數據量大小 int length = data.size(); // 每個線程處理的數據個數 int taskCount = length / threadCount; // 划分每個線程調用的數據 for (int i = 0; i < threadCount; i++) { // 每個線程任務數據list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 將數據分配給各個線程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 將線程加入到線程池 pool.submit(execute); } // 總的返回結果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每個線程處理結果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); if(threadResult!=null && threadResult.getData()!=null){ System.out.println("======線程" + i + "執行完畢,返回結果數據:" + threadResult.getCode()); List<ResultBean<String>> list = threadResult.getData(); num+=threadResult.getCode(); } System.out.println("每個線程處理結果集"+threadResult.getData()); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 關閉線程池 threadpool.shutdownNow(); // 執行結束時間 long end_l = System.currentTimeMillis(); logger.info("總耗時:{}ms", (end_l - l)); logger.info("總數量:{}num:", num); return ResultBean.newInstance().setData(num); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }
以上