ES:JAVA訪問並操作ES(基礎篇)


pom.xml

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4 
 5     <groupId>study.es</groupId>
 6     <artifactId>study.es</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8     <packaging>jar</packaging>
 9 
10     <name>study.es</name>
11     <url>http://maven.apache.org</url>
12 
13     <properties>
14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15         <!-- https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_maven_repository.html -->
16         <es.version>7.5.1</es.version>
17     </properties>
18 
19     <dependencies>
20         <!-- ES客戶端請求 -->
21         <dependency>
22             <groupId>org.elasticsearch.client</groupId>
23             <artifactId>transport</artifactId>
24             <version>${es.version}</version>
25         </dependency>
26         
27         <!-- 用於json轉換 -->
28         <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
29         <dependency>
30             <groupId>com.alibaba</groupId>
31             <artifactId>fastjson</artifactId>
32             <version>1.2.68</version>
33         </dependency>
34        
35     </dependencies>
36 </project>

 

EsTool.java

package com.study.es;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

/**
 * 訪問ES的工具類
 * @see 命令行參考https://github.com/elastic/elasticsearch
 *
 */
public class EsTool 
{
    /**
     * 查看ES路徑:elasticsearch-7.5.1\config\elasticsearch.yml cluster.name
     */
    private static final String CLUSTER_NAME = "my-application";
    
    private static EsTool esTool;
    
    private static TransportClient transportClient;
    
    public static EsTool getInstance() {
        if (null == esTool) {
            esTool = new EsTool();
        }
        return esTool;
    }
    
    /**
     * 減少頻繁獲取連接,定義一個變量存放連接
     * @return
     */
    @SuppressWarnings("deprecation")
    public TransportClient getClient() {
        if (null == transportClient) {
            transportClient = getNewClient();
        }
        return transportClient;
    }
    
    public static void main( String[] args ) throws IOException
    {
        String operate = "search";
        if ("addIndicesMapping".equals(operate)) {
            // 添加索引
            String jsonString = "{\"properties\":{\"author\":{\"type\":\"keyword\"},\"title\":{\"type\":\"text\"},\"content\":{\"type\":\"text\"},\"price\":{\"type\":\"integer\"},\"date\":{\"type\":\"date\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}";
            // 執行成功后顯示:----------Add mapping success----------true
            getInstance().createIndexAndMapping("indices_test",jsonString);
        } else if ("addDocument".equals(operate)) {
            // Add id success,version is :1
            getInstance().addIndexDocument("indices_test", "_doc");
        } else if ("addOrUpdateDocument".equals(operate)) {
            // bulk success
            getInstance().bulkIndexDocument("indices_test", "_doc");
        } else if ("deleteById".equals(operate)) {
            getInstance().deleteById("indices_test", "_doc", "id_003");
        } else if ("batchDeleteByIds".equals(operate)) {
            List<String> ids = new ArrayList<String>();
            ids.add("id_001");
            ids.add("id_002");
            getInstance().batchDeleteByIds("indices_test", "_doc", ids);
        } else if ("updateDocument".equals(operate)) {
            // result is OK   == id指_id
            getInstance().updateDocument("indices_test", "_doc", "TNZDUHEB_rQdj7R3LnO4", null);
        } else if ("updateDocumentPrepare".equals(operate)) {
            // result is UPDATED   == id指_id
            getInstance().updateDocumentPrepare("indices_test", "_doc", "TNZDUHEB_rQdj7R3LnO4", null);
        } else if ("searchByIndex".equals(operate)) {
            // id指_id
            getInstance().searchByIndex("indices_test", "_doc", "TNZDUHEB_rQdj7R3LnO4");
        } else if ("queryAll".equals(operate)) {
            // ..."totalHits":{"value":3,"relation":"EQUAL_TO"},"maxScore":1.0}
            getInstance().queryAll("indices_test");
        } else if ("search".equals(operate)) {
            // 查詢全部
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            getInstance().searchQuery("indices_test", queryBuilder);
            //以下內容僅僅為插敘條件格式
            // Span First  
            /*QueryBuilder queryBuilder = QueryBuilders.spanFirstQuery(  
                    QueryBuilders.spanTermQuery("title", "title"), 1);*/
            
            /*QueryBuilder queryBuilder =QueryBuilders.spanNearQuery(QueryBuilders.spanTermQuery("title", "title"),1000)  
            .addClause(QueryBuilders.spanTermQuery("title", "title_001"))
            .addClause(QueryBuilders.spanTermQuery("title", "title_002"))  
            .addClause(QueryBuilders.spanTermQuery("title", "title_003"));*/
            // ...
        }
    }
    
    /**
     * 根據不同的條件查詢
     * @throws Exception
     */
    public void searchQuery(String index, QueryBuilder queryBuilder) {
        SearchResponse response = getClient().prepareSearch(index).setQuery(queryBuilder).get();
        for (SearchHit searchHit : response.getHits()) {
            System.out.println(searchHit);
        }
    }

    /**
     * 根據索引、類型、id獲取記錄
     * @param index
     * @param type
     * @param id
     */
    public void searchByIndex(String index, String type, String id) {
        GetResponse response = getClient().prepareGet(index, type, id).execute().actionGet();
        String json = response.getSourceAsString();
        if (null != json) {
            System.out.println(json);
        } else {
            System.out.println("no result");
        }
    }
    
    /**
     * 修改內容
     * @throws Exception
     */
    public boolean updateDocumentPrepare(String index, String type, String id, XContentBuilder source) {
        XContentBuilder endObject;
        try {
            // 修改后的內容
            endObject = XContentFactory.jsonBuilder().startObject().field("author","test_prepare_001").endObject();
            UpdateResponse response = getClient().prepareUpdate(index, type, id).setDoc(endObject).get();
            System.out.println("result is " + response.getResult().name());
            return "UPDATED".equals(response.getResult().name());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }
    
    /**
     * 修改內容
     * @return
     */
    @SuppressWarnings("deprecation")
    public boolean updateDocument(String index, String type, String id, XContentBuilder source) {
        Date time = new Date();
        // 創建修改請求
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(index);
        updateRequest.type(type);
        updateRequest.id(id);
        try {
            // 根據實際需要調整方法參數source里的值
            updateRequest.doc(XContentFactory.jsonBuilder().startObject().field("author", "author001").field("title", "title001")
                    .field("content", "content001")
                    .field("date", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)).endObject());
            UpdateResponse response = getClient().update(updateRequest).get();
            System.out.println("result is " + response.status().name());
            return "OK".equals(response.status().name());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }
    
    /**
     * 根據id批量刪除
     * @param index
     * @param type
     * @param ids
     * @return
     */
    public boolean batchDeleteByIds(String index, String type, List<String> ids) {
        if (null == ids || ids.isEmpty()) {
            System.out.println("ids is require");
            return true;
        }
        BulkRequestBuilder builder = getClient().prepareBulk();
        for (String id : ids) {
            builder.add(getClient().prepareDelete(index, type, id).request());
        }
        BulkResponse bulkResponse = builder.get();
        System.out.println(bulkResponse.status());
        if (bulkResponse.hasFailures()) {
            System.out.println("has failed, " + bulkResponse.status().name());
            return false;
        }
        return true;
    }
    
    /**
     * 根據索引名稱、類型和id刪除記錄
     * @param indexName
     * @param type
     * @param id
     */
    public void deleteById(String indexName, String type, String id) {
        DeleteResponse dResponse = getClient().prepareDelete(indexName, type, id).execute().actionGet();
        if ("OK".equals(dResponse.status().name())) {
            System.out.println("delete id success");
        } else {
            System.out.println("delete id failed : " + dResponse.getResult().toString());
        }
    }
    
    /**
     * 刪除某個索引下所有數據
     * @param indexName
     * @return
     * @see 刪除不存在的索引時,記錄實際情況,默認返回成功
     */
    public boolean deleteAllIndex(String indexName) {
        if (null == indexName || "".equals(indexName.trim())) {
            System.out.println("Error: index name is require.");
            return false;
        }
        //如果傳人的indexName不存在會出現異常.可以先判斷索引是否存在:
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = getClient().admin().indices()
                .exists(inExistsRequest).actionGet();
        if (inExistsResponse.isExists()) {
            AcknowledgedResponse response = getClient().admin().indices().prepareDelete(indexName)
                    .execute().actionGet();
            System.out.println("delete index date, result is " + response.isAcknowledged());
            return response.isAcknowledged();
        } else {
            System.out.println("index is not existed");
        }
        return true;
    }
    
    /**
     * 查詢索引下的全部數據
     * @param index
     * @param type
     */
    public void queryAll(String index) {
        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
        SearchResponse response = getClient().prepareSearch(index).setQuery(queryBuilder).get();
        
        SearchHits resultHits = response.getHits();
        System.out.println(JSONObject.toJSON(resultHits));
    }
    
    /**
     * 添加或者修改ES里的數據
     * @param index
     * @param type
     */
    public void bulkIndexDocument(String index, String type) {
        BulkRequestBuilder bulkRequest = getClient().prepareBulk();
        Date time = new Date();
        try {
            bulkRequest.add(getClient().prepareIndex(index, type, "id_002")
                    .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("id","id_002")
                            .field("author","author_002")
                            .field("title","titile_002")
                            .field("content","content_002")
                            .field("price","20")
                            .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
                            .endObject()
                    )
            );

            bulkRequest.add(getClient().prepareIndex(index, type, "id_003")
                    .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("id","id_003")
                            .field("author","author_003")
                            .field("title","title_003")
                            .field("content","content_003")
                            .field("price","30")
                            .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
                            .endObject()
                            )
                    );
            
            BulkResponse bulkResponse = bulkRequest.get();
            if (bulkResponse.hasFailures()) {
                // process failures by iterating through each bulk response item
                System.out.println("bulk has failed and token " + bulkResponse.getTook());
            } else {
                System.out.println("bulk success");
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 根據索引添加數據
     * @param index
     * @param type
     */
    public void addIndexDocument(String index, String type) {
        Date time = new Date();
        IndexResponse response = null;
        try {
            response = getInstance().getClient().prepareIndex(index, type)
            .setSource(XContentFactory.jsonBuilder()
            // 以下內容可以封裝成一個對象,然后重新解析成如下格式(方法多加一個參數,建議使用反射方式改成通用方法)
            .startObject()
                .field("id", "id_001")
                .field("author", "author_001")
                .field("title", "title_001")
                .field("content", "content_001")
                .field("price", "10")
                .field("date", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
            .endObject())
            .get();
            System.out.println("Add id success,version is :" + response.getVersion());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 創建索引和mapping
     * @param indiceName
     * @throws Exception
     */
    public boolean createIndexAndMapping(String indiceName, String json) {
        if (null == indiceName || "".equals(indiceName.trim())) {
            System.out.println("indice is required");
            return false;
        }
        String content = "content";
        CreateIndexRequestBuilder cib = getClient().admin().indices().prepareCreate(indiceName);
        XContentBuilder builderMapping = generateMappingBuilder(json);
        cib.addMapping(content, builderMapping);
        CreateIndexResponse res = cib.execute().actionGet();
        if (res.isAcknowledged()) {
            System.out.println("----------Add mapping success----------" + res.isAcknowledged());
        } else {
            System.out.println("----------Add mapping failed-----------" + res.isAcknowledged());
        }
        return res.isAcknowledged();
    }
    
    /**
     * 根據json動態構造mapping索引對應的XContentBuilder
     * @param objJson
     * @param builder
     * @param isBegin 是否是開始位置
     */
    private XContentBuilder generateMappingBuilder(Object object) {
        XContentBuilder builder = null;
        try {
            builder = XContentFactory.jsonBuilder();
            JSONObject jsonObj = null;
            if (object instanceof String) {
                jsonObj = JSON.parseObject((String) object);
            }
            // json對象
            generateMappingBuilder(jsonObj, builder, true);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.out.println("get json builder error");
        }
        return builder;
    }
    
    /**
     * 根據json對象動態構造mapping索引對應的XContentBuilder
     * @param objJson
     * @param builder
     * @param isBegin 是否是開始位置
     */
    private void generateMappingBuilder(Object objJson, XContentBuilder builder, boolean isBegin) {
        try {
            // #builder構造,需要添加一個開始"{"
            if (isBegin) {
                builder.startObject();
            }
            // json數組
            if (objJson instanceof JSONArray) {
                JSONArray objArray = (JSONArray) objJson;
                for (int i = 0; i < objArray.size(); i++) {
                    generateMappingBuilder(objArray.get(i), builder, false);
                }
            }
            // json對象
            else if (objJson instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) objJson;
                Iterator<String> it = jsonObject.keySet().iterator();
                while (it.hasNext()) {
                    String key = it.next().toString();
                    Object object = jsonObject.get(key);

                    // builder:key;這里區分object和普通的屬性(冒號前認為為對象,冒號后為屬性)
                    if (!key.equals("type") && !key.equals("format")) {
                        builder.startObject(key);
                        // System.out.println("==" + key);
                    }

                    // json數組
                    if (object instanceof JSONArray) {
                        JSONArray objArray = (JSONArray) object;
                        generateMappingBuilder(objArray, builder, false);
                    }
                    // json對象
                    else if (object instanceof JSONObject) {
                        generateMappingBuilder((JSONObject) object, builder, false);
                    }
                    // 其他
                    else {
                        builder.field(key, object.toString());
                        // System.out.println("====" + key + "," + object.toString());
                    }
                }
                // #builder構造,需要添加一個結束"}"
                builder.endObject();
                // System.out.println("==");
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.out.println("generate mapping builder failed");
        }
    }
    
    /**
     * 
     * @return
     * @throws IOException 
     */
    private XContentBuilder generateMapping() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder = builder
                // #builer開始"{"
                .startObject()
                    .startObject("properties") //設置之定義字段
                        .startObject("author")
                            .field("type","keyword") //設置數據類型
                        .endObject()
                        .startObject("title")
                            .field("type","text")
                        .endObject()
                        .startObject("content")
                            .field("type","text")
                        .endObject()
                        .startObject("price")
                            .field("type","integer")
                        .endObject()
                        .startObject("date")
                            .field("type","date")  //設置Date類型
                            .field("format","yyyy-MM-dd HH:mm:ss") //設置Date的格式
                        .endObject()
                    .endObject()
                // #builer結束"}"
                .endObject();
        return builder;
    }
    
    /**
     * 獲取訪問ES的連接
     * @return
     */
    @SuppressWarnings({ "deprecation", "resource" })
    private TransportClient getNewClient() {
         TransportClient client = null;
        try {
            Settings settings = Settings.builder().put("cluster.name", CLUSTER_NAME)
                    // 開啟嗅探功能(即自動檢測集群內其他的節點和新加入的節點);或者全部用addTransportAddress添加,如下:
                    .put("client.transport.sniff", true).build();    
            client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.out.println("get host error");
        }
        return client;
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM