/** 查询出所有需要调用接口的数据,使用多线程把每条数据作为参数调用接口 **/ public void execute() throws UnsupportedEncodingException, InterruptedException, ExecutionException { //所有数据条数,不查询总条数了,提高效率 int count = 905471; //每次查询1000条 int limit = 1000; //当前查询的位置 int index = 1; while (index <= count) { //开8个线程可能会出现多线程处理阻塞, ExecutorService threadPool = Executors.newFixedThreadPool(8); CompletionService<ResultVo> cs = new ExecutorCompletionService<ResultVo>( threadPool); List<ResultVo> resultVoList=new LinkedList<>(); List<Map> list; //分页查询数据 if (index + limit < count) { list = dataRepository.queryData(index, limit); } else { list = dataRepository.queryData(index, count - index + 1); } //每条数据拿去调用接口 for (Map map : list) { cs.submit(new Callable<ResultVo>() { @Override public ResultVo call() throws Exception { return excuteInterface(map); } }); } //依次取得多线程调用接口返回的参数 for (int i = 0; i < list.size(); i++) { ResultVo resultVo = cs.take().get(); resultVoList.add(resultVo); } //返回 的数据插入到数据库保存 dataRepository.insert(resultVoList); index += limit; } } /** 每条数据调用接口的方法 **/ public ResultVo excuteInterface(Map map) throws Exception{ String postJson = JSONObject.toJSONString(map); String content = HttpUtil.post(postJson, "接口地址"); //转换编码防止乱码 String contentUtf8 = new String(content.getBytes("ISO8859-1"), "UTF-8"); ResultVo resultVo = new ResultVo(); if (ValidateUtil.isEmpty(content) || "ERROR".equals(content)) { resultVo.setParam(postJson); //入参 resultVo.setLog(Util.NETWORK_ERROR); } else { resultVo = JSON.parseObject(contentUtf8, ResultVo.class); resultVo.setParam(postJson); //入参 } return resultVo;