上一次寫了關於《FunctionalInterface~一個批量處理數據的類》和《Future和Callable實現大任務的並行處理》的文章,本講主要結合實際應用,來封裝一個集合並行處理組件,我們的集合分為數據庫查詢出現的分頁集合;還有一個是內存的集合,今天主要說一下內存集合的並行處理。
場景介紹
- 有一個比較耗時的工作,將top 400的用戶的行為信息統計
- 統計的信息來自很多業務,很多服務,不能使用聚合直接計算
- 這些業務統計的時間,大概每個人平均需要1秒
- 這些用戶的各種類型,彼此獨立,沒有關系
如何設計
如果直接順序寫代碼,那1萬的用戶,需要400秒的時間,這是我們不能接受的,我們使用並行編程8秒就把它搞定。
如何實現
- 400的集合,進行拆分,每100個為一組,分為4組(4頁)
- 對每100個集合進行拆分,每2個為1組,將100個分成了50組
- 對50組數據,開50個線程並行處理,結果為2行完成
- 400的信息,分成了4頁,每頁2秒,一共8秒
代碼實現
/**
* 數據集並行處理工具
*/
public class DataHelper {
/**
* 並行處理線程數字
*/
static final int THREAD_COUNT = 50;
/**
* 單線程中處理的集合的長度,50個線程,每個線程處理2條,如果處理時間為1S,則需要2S的時間.
*/
static final int INNER_LIST_LENGTH = 2;
static Logger logger = LoggerFactory.getLogger(DataHelper.class);
/**
* 大集合拆分.
*
* @param list
* @param len
* @param <T>
* @return
*/
private static <T> List<List<T>> splitList(List<T> list, int len) {
if (list == null || list.size() == 0 || len < 1) {
return null;
}
List<List<T>> result = new ArrayList<List<T>>();
int size = list.size();
int count = (size + len - 1) / len;
for (int i = 0; i < count; i++) {
List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
result.add(subList);
}
return result;
}
/**
* 並行處理.
*
* @param list 大集合
* @param pageSize 單頁數據大小
* @param consumer 處理程序
* @param <T>
*/
public static <T> void fillDataByPage(List<T> list,
int pageSize,
Consumer<T> consumer) {
List<List<T>> innerList = new ArrayList<>();
splitList(list, pageSize).forEach(o -> innerList.add(o));
int totalPage = innerList.size();
AtomicInteger i = new AtomicInteger();
innerList.forEach(items -> {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
i.getAndIncrement();
Collection<BufferInsert<T>> bufferInserts = new ArrayList<>();
splitList(items, INNER_LIST_LENGTH).forEach(o -> {
bufferInserts.add(new BufferInsert(o, consumer));
});
try {
executor.invokeAll(bufferInserts);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
logger.info("【當前數據頁:{}/{}】", i.get(), totalPage);
});
}
/**
* 多線程並發處理數據.
*
* @param <T>
*/
static class BufferInsert<T> implements Callable<Integer> {
/**
* 要處理的數據列表.
*/
List<T> items;
/**
* 處理程序.
*/
Consumer<T> consumer;
public BufferInsert(List<T> items, Consumer<T> consumer) {
this.items = items;
this.consumer = consumer;
}
@Override
public Integer call() {
for (T item : items) {
this.consumer.accept(item);
}
return 1;
}
}
}
調用代碼
/**
* 8秒處理400個任務,每個任務執行時間為1S,並行的威力
*/
@Test
public void test() {
List<Integer> sumList = new ArrayList<>();
for (int i = 0; i < 400; i++) {
sumList.add(i);
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
DataHelper.fillDataByPage(sumList, 100, (o) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
stopWatch.stop();
System.out.println("time:" + stopWatch.getTotalTimeMillis());
}
結果截圖