ship对象类
public class Ship { private String mmsi; private String utc; public String getMmsi() { return mmsi; } public void setMmsi(String mmsi) { this.mmsi = mmsi; } public String getUtc() { return utc; } public void setUtc(String utc) { this.utc = utc; } }
批量添加
public static void insertAllBulk(List<Ship> dataList,String indexName) { TransportClient client = null; try { client = connectionPool.getConnection(); BulkRequestBuilder builder = client.prepareBulk(); //请求命令数量 int z = 0; long l2 = System.currentTimeMillis(); int n = dataList.size(); if (n != 0) { //遍历列表,每条数据生成一条请求命令 for (int i = 0; i < n; i++) { Ship data = dataList.remove(0); if (data != null) { String utc = data.getUtc(); if (utc == null || utc.equals("")) { continue; } else { IndexRequestBuilder request = client.prepareIndex(indexName, "_doc").setSource( XContentFactory.jsonBuilder() .startObject() .field("mmsi", data.getMmsi()) .field("utc", utc) .endObject() ); //添加到批量请求 builder.add(request); z++; } } } //命令数不为零 if (z != 0) { long l3 = System.currentTimeMillis(); logger2.info("生成请求耗时:" + (l3 - l2)); //执行批量请求,获取结果 BulkResponse bulkItemResponses = builder.get(); if (bulkItemResponses.hasFailures()) { logger2.error(bulkItemResponses.buildFailureMessage()); } long l4 = System.currentTimeMillis(); logger2.info("执行并返回结果耗时:" + (l4 - l3)); long millis = bulkItemResponses.getTook().getMillis(); dataList.clear(); logger2.info("船完成批量导入ES" + n + "个,耗时" + millis); } } } catch (Exception e) { e.printStackTrace(); logger2.error("insertBulk 错误", e); } finally { connectionPool.releaseConnection(client); } }