/**
查询出所有需要调用接口的数据,使用多线程把每条数据作为参数调用接口
**/
public void execute() throws UnsupportedEncodingException, InterruptedException, ExecutionException {
//所有数据条数,不查询总条数了,提高效率
int count = 905471;
//每次查询1000条
int limit = 1000;
//当前查询的位置
int index = 1;
while (index <= count) {
//开8个线程可能会出现多线程处理阻塞,
ExecutorService threadPool = Executors.newFixedThreadPool(8);
CompletionService<ResultVo> cs = new ExecutorCompletionService<ResultVo>( threadPool);
List<ResultVo> resultVoList=new LinkedList<>();
List<Map> list;
//分页查询数据
if (index + limit < count) {
list = dataRepository.queryData(index, limit);
} else {
list = dataRepository.queryData(index, count - index + 1);
}
//每条数据拿去调用接口
for (Map map : list) {
cs.submit(new Callable<ResultVo>() {
@Override
public ResultVo call() throws Exception {
return excuteInterface(map);
}
});
}
//依次取得多线程调用接口返回的参数
for (int i = 0; i < list.size(); i++) {
ResultVo resultVo = cs.take().get();
resultVoList.add(resultVo);
}
//返回 的数据插入到数据库保存
dataRepository.insert(resultVoList);
index += limit;
}
}
/**
每条数据调用接口的方法
**/
public ResultVo excuteInterface(Map map) throws Exception{
String postJson = JSONObject.toJSONString(map);
String content = HttpUtil.post(postJson, "接口地址");
//转换编码防止乱码
String contentUtf8 = new String(content.getBytes("ISO8859-1"), "UTF-8");
ResultVo resultVo = new ResultVo();
if (ValidateUtil.isEmpty(content) || "ERROR".equals(content)) {
resultVo.setParam(postJson); //入参
resultVo.setLog(Util.NETWORK_ERROR);
} else {
resultVo = JSON.parseObject(contentUtf8, ResultVo.class);
resultVo.setParam(postJson); //入参
}
return resultVo;