創建ES連接
// 初始化api客戶端 public static RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(url,port, "http")));
將需要插入的數據轉化成Map<String, Object>類型
單條數據插入的方法
/** * 增加索引oalogincount的當前數據 * @param list */ public static void addDataOaloginData(List<Map<String, Object>> list) { for (int i = 0; i < list.size(); i++) { Map<String, Object> oaloginCountVo = list.get(i); log.info(oaloginCountVo); IndexRequest request = new IndexRequest( INDEX, //索引 INDEX, // mapping type oaloginCountVo.get("_id")+""); //文檔id oaloginCountVo.remove("_id"); //因為在插入數據時,不需要id,所以在這里將id remove掉 request.source(oaloginCountVo); /* request.routing("routing"); //設置routing值 request.timeout(TimeValue.timeValueSeconds(1)); //設置主分片等待時長 request.version(2); //設置版本號 */ request.setRefreshPolicy("wait_for"); //設置重刷新策略 request.opType(DocWriteRequest.OpType.CREATE); //操做類別 //四、發送請求 IndexResponse indexResponse = null; try { // 同步方式 indexResponse = client.index(request); } catch(Exception e) { log.error("索引異常", e); } //五、處理響應 if(indexResponse != null) { String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); Result result = indexResponse.getResult(); } } }
因為單條插入每一條都需要連接es,所以不太建議,建議直接批量插入
public static void addDataOaloginDataAll(List<Map<String, Object>> list) { BulkRequest request = new BulkRequest(); for (int i = 0; i < list.size(); i++) { Map<String, Object> oaloginCountVo = list.get(i); String _id = (String) oaloginCountVo.get("_id"); oaloginCountVo.remove("_id"); request.add(new IndexRequest(INDEX, INDEX, _id) .source(oaloginCountVo)); //文檔id //三、發送請求 同步請求 } try { BulkResponse bulkResponse = client.bulk(request); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }