/**
查詢出所有需要調用接口的數據,使用多線程把每條數據作為參數調用接口
**/
public void execute() throws UnsupportedEncodingException, InterruptedException, ExecutionException {
//所有數據條數,不查詢總條數了,提高效率
int count = 905471;
//每次查詢1000條
int limit = 1000;
//當前查詢的位置
int index = 1;
while (index <= count) {
//開8個線程可能會出現多線程處理阻塞,
ExecutorService threadPool = Executors.newFixedThreadPool(8);
CompletionService<ResultVo> cs = new ExecutorCompletionService<ResultVo>( threadPool);
List<ResultVo> resultVoList=new LinkedList<>();
List<Map> list;
//分頁查詢數據
if (index + limit < count) {
list = dataRepository.queryData(index, limit);
} else {
list = dataRepository.queryData(index, count - index + 1);
}
//每條數據拿去調用接口
for (Map map : list) {
cs.submit(new Callable<ResultVo>() {
@Override
public ResultVo call() throws Exception {
return excuteInterface(map);
}
});
}
//依次取得多線程調用接口返回的參數
for (int i = 0; i < list.size(); i++) {
ResultVo resultVo = cs.take().get();
resultVoList.add(resultVo);
}
//返回 的數據插入到數據庫保存
dataRepository.insert(resultVoList);
index += limit;
}
}
/**
每條數據調用接口的方法
**/
public ResultVo excuteInterface(Map map) throws Exception{
String postJson = JSONObject.toJSONString(map);
String content = HttpUtil.post(postJson, "接口地址");
//轉換編碼防止亂碼
String contentUtf8 = new String(content.getBytes("ISO8859-1"), "UTF-8");
ResultVo resultVo = new ResultVo();
if (ValidateUtil.isEmpty(content) || "ERROR".equals(content)) {
resultVo.setParam(postJson); //入參
resultVo.setLog(Util.NETWORK_ERROR);
} else {
resultVo = JSON.parseObject(contentUtf8, ResultVo.class);
resultVo.setParam(postJson); //入參
}
return resultVo;