ship對象類
public class Ship { private String mmsi; private String utc; public String getMmsi() { return mmsi; } public void setMmsi(String mmsi) { this.mmsi = mmsi; } public String getUtc() { return utc; } public void setUtc(String utc) { this.utc = utc; } }
批量添加
public static void insertAllBulk(List<Ship> dataList,String indexName) { TransportClient client = null; try { client = connectionPool.getConnection(); BulkRequestBuilder builder = client.prepareBulk(); //請求命令數量 int z = 0; long l2 = System.currentTimeMillis(); int n = dataList.size(); if (n != 0) { //遍歷列表,每條數據生成一條請求命令 for (int i = 0; i < n; i++) { Ship data = dataList.remove(0); if (data != null) { String utc = data.getUtc(); if (utc == null || utc.equals("")) { continue; } else { IndexRequestBuilder request = client.prepareIndex(indexName, "_doc").setSource( XContentFactory.jsonBuilder() .startObject() .field("mmsi", data.getMmsi()) .field("utc", utc) .endObject() ); //添加到批量請求 builder.add(request); z++; } } } //命令數不為零 if (z != 0) { long l3 = System.currentTimeMillis(); logger2.info("生成請求耗時:" + (l3 - l2)); //執行批量請求,獲取結果 BulkResponse bulkItemResponses = builder.get(); if (bulkItemResponses.hasFailures()) { logger2.error(bulkItemResponses.buildFailureMessage()); } long l4 = System.currentTimeMillis(); logger2.info("執行並返回結果耗時:" + (l4 - l3)); long millis = bulkItemResponses.getTook().getMillis(); dataList.clear(); logger2.info("船完成批量導入ES" + n + "個,耗時" + millis); } } } catch (Exception e) { e.printStackTrace(); logger2.error("insertBulk 錯誤", e); } finally { connectionPool.releaseConnection(client); } }