Java操作ElasticSearch


  需要注意ES暴露的http服務端口是9200,TCP通訊端口是9300,也就是Javaclient操作ES需要連接9300端口。

0. 簡介

  Java項目中操作ES可以用ES的客戶端 TransportClient、RestClient; springboot項目可以用Spring Data Elasticsearch(內部也是封裝了RestClient)。

1. TransportClient
  TransportClient 是ElasticSearch(java)客戶端封裝對象,使用transport遠程連接到Elasticsearch集群,默認用的TCP端口是9300,該transport node並不會加入集群,而是簡單的向ElasticSearch集群上的節點發送請求。

2. Rest ClientJava REST客戶端有兩種風格:
  Java Low Level REST Client:elasticsearch client 低級別客戶端。它允許通過http請求與Elasticsearch集群進行通信。API本身不負責數據的編碼解碼,由用戶去編碼解碼。它與所有的ElasticSearch版本兼容。
  Java High Level REST Client:Elasticsearch client官方高級客戶端。基於低級客戶端,它定義的API,已經對請求與響應數據包進行編碼解碼。
Elasticsearch計划在Elasticsearch 7.0中棄用TransportClient,在8.0中完全刪除它。故在實際使用過程中建議使用Java高級REST client。Rest client執行HTTP請求來執行操作,無需再序列化的Java請求。

 

參考網站:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

 1.pom配置如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.qlq</groupId>
    <artifactId>esclient</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
        <maven.compiler.source>1.8</maven.compiler.source>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>7.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>
</project>

resources下新建log4j2.properties

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%m%n

rootLogger.level = info
rootLogger.appenderRef.console.ref = console

 

2.API測試 (基於TransportClient)

其API操作過程如下:

1.創建client

單機版如下:

// on startup

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

集群如下:

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...

TransportClient 進行操作的時候可以指定操作的索引類型、以及ID等操作,例如:

查詢時候指定索引類型:(prepareSearch()接收的是可變參數,可以指定多個索引類型搜索。不指定默認是查詢所有類型)

        SearchRequestBuilder srb1 = client.prepareSearch("orders").setQuery(QueryBuilders.queryStringQuery("qiao"))
                .setSize(1);

 

創建時指定類型:(ID不指定ES會生成)

IndexResponse response = client.prepareIndex("orders", "order").setSource(builder).get();

2.操作

  進行CRUD

3.關閉client

// on shutdown

client.close();

 

0.准備工作

1.修改EK/conf/elasticsearch.yml下面的集群名稱

cluster.name: my-application

2.啟動兩個節點

elasticsearch.bat -Ehttp.port=9200 -Epath.data=E:/data/0
elasticsearch.bat -Ehttp.port=19200 -Epath.data=E:/data/1

3.查看節點

訪問:http://localhost:9200/_cluster/stats?pretty

1.文檔API

1.創建索引文檔

 創建文檔構造數據有4種方式,如下:

Manually (aka do it yourself) using native byte[] or as a String

Using a Map that will be automatically converted to its JSON equivalent

Using a third party library to serialize your beans such as Jackson

Using built-in helpers XContentFactory.jsonBuilder()

第一種:使用ES helper構造

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 構造數據
        // 第一種:使用es heler
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("username", "qiaozhi")
                .field("fullname", "喬治").field("created", new Date()).field("deleted", false).endObject();
        IndexResponse response = client.prepareIndex("accounts", "person", "1").setSource(builder).get();

        // 第二種: 自己構造JSON數據
        // IndexResponse response = client.prepareIndex("twitter", "_doc")
        // .setSource(json, XContentType.JSON)
        // .get();

        // 打印保存信息
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // status has stored current instance statement.
        RestStatus status = response.status();
        System.out.println("status " + status);

        // on shutdown
        client.close();
    }

結果:

_index accounts

_type person

_id 1

_version 2

status OK

 

第二種:手動構造JSON數據且不指定ID會生成ID

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 構造數據
        String json = "{" + "\"username\":\"zhangsan\"," + "\"fullname\":\"張三\"," + "\"deleted\":\"false\","
                + "\"created\":\"2020-02-05\"" + "}";
        // 保存文檔不指定ID
        IndexResponse response = client.prepareIndex("accounts", "person").setSource(json, XContentType.JSON).get();

        // 打印保存信息
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // status has stored current instance statement.
        RestStatus status = response.status();
        System.out.println("status " + status);

        // on shutdown
        client.close();
    }

結果:

_index accounts

_type person

_id nvyXyXMB58D4pLOfTCzx

_version 1

status CREATED

 

使用kibana進行查詢:

GET accounts/person/nvyXyXMB58D4pLOfTCzx

#! Deprecation: [types removal] Specifying types in document get requests is deprecated, use the /{index}/_doc/{id} endpoint instead.
{
  "_index" : "accounts",
  "_type" : "person",
  "_id" : "nvyXyXMB58D4pLOfTCzx",
  "_version" : 1,
  "_seq_no" : 7,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "username" : "zhangsan",
    "fullname" : "張三",
    "deleted" : "false",
    "created" : "2020-02-05"
  }
}

 

補充:如果已經存在相同ID的數據會進行修改操作,比如下面:

        // 構造數據
        String json = "{" + "\"username\":\"zhangsan2\"," + "\"fullname\":\"張三2\"," + "\"deleted\":\"false\","
                + "\"created\":\"2020-02-05\"" + "}";
        // 保存文檔不指定ID
        IndexResponse response = client.prepareIndex("accounts", "person", "nvyXyXMB58D4pLOfTCzx")
                .setSource(json, XContentType.JSON).get();

結果:

_index accounts

_type person

_id nvyXyXMB58D4pLOfTCzx

_version 2

status OK

 

kibana查看數據如下:

#! Deprecation: [types removal] Specifying types in document get requests is deprecated, use the /{index}/_doc/{id} endpoint instead.
{
  "_index" : "accounts",
  "_type" : "person",
  "_id" : "nvyXyXMB58D4pLOfTCzx",
  "_version" : 2,
  "_seq_no" : 8,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "username" : "zhangsan2",
    "fullname" : "張三2",
    "deleted" : "false",
    "created" : "2020-02-05"
  }
}

補充:測試集群中兩個節點都有數據 

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   220  100   220    0     0  13750      0 --:--:-- --:--:-- --:--:--  214k{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":2,"_seq_no":8,"_primary_term":3,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:19200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   220  100   220    0     0   7096      0 --:--:-- --:--:-- --:--:-- 14666{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":2,"_seq_no":8,"_primary_term":3,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

 

2.查詢

    private static void getDocument() throws UnknownHostException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 根據ID查詢
        GetResponse response = client.prepareGet("accounts", "person", "nvyXyXMB58D4pLOfTCzx").get();

        // 打印獲取信息
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);
        // 獲取存的信息
        String sourceAsString = response.getSourceAsString();
        System.out.println("sourceAsString " + sourceAsString);
        Map<String, Object> sourceAsMap = response.getSourceAsMap();
        System.out.println("sourceAsMap " + sourceAsMap);

        // on shutdown
        client.close();
    }

結果:

_index accounts
_type person
_id nvyXyXMB58D4pLOfTCzx
_version 2
sourceAsString {"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}
sourceAsMap {deleted=false, created=2020-02-05, fullname=張三2, username=zhangsan2}

 

批量查詢:

  再創建一個訂單類型的文檔數據。(這里需要注意,不允許一個index下面有多個type)

liqiang@root MINGW64 ~/Desktop
$ curl http://localhost:9200/orders/order/HSJgznMBk9PkhN4HiuEb
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   233  100   233    0     0   7516      0 --:--:-- --:--:-- --:--:-- 15533{"_index":"orders","_type":"order","_id":"HSJgznMBk9PkhN4HiuEb","_version":1,"_seq_no":0,"_primary_term":1,"found":true,"_source":{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}}

 

批量查詢語法如下:

    private static void batchSelect() throws Exception {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 查詢多個類型的數據(接受可變類型的ID參數)
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("accounts", "person", "nvyXyXMB58D4pLOfTCzx")
                .add("orders", "order", "HSJgznMBk9PkhN4HiuEb", "otherId").get();
        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }

        // on shutdown
        client.close();
    }

結果:

{"username":"zhangsan2","fullname":"修改后","deleted":"false","created":"2020-02-05"}
{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}

 

3.刪除文檔

    private static void deleteDoc() throws UnknownHostException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        // 根據ID刪除
        DeleteResponse response = client.prepareDelete("accounts", "person", "nvyXyXMB58D4pLOfTCzx").get();

        // 打印獲取信息
        // Index name
        String _index = response.getIndex();
        System.out.println("_index " + _index);
        // Type name
        String _type = response.getType();
        System.out.println("_type " + _type);
        // Document ID (generated or not)
        String _id = response.getId();
        System.out.println("_id " + _id);
        // Version (if it's the first time you index this document, you will
        // get: 1)
        long _version = response.getVersion();
        System.out.println("_version " + _version);

        // on shutdown
        client.close();
    }

結果:

_index accounts
_type person
_id nvyXyXMB58D4pLOfTCzx
_version 3

 

刪除后再次查詢:

$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    81  100    81    0     0   2531      0 --:--:-- --:--:-- --:--:--  5062{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","found":false}

 

也可以根據查詢結果進行刪除:

        BulkByScrollResponse response = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
                .filter(QueryBuilders.matchQuery("fullname", "修改后")).source("accounts").get();
        long deleted = response.getDeleted();

 

4.修改文檔:

例如已經存在的文檔:

$ curl http://localhost:9200/accounts/person/nvyXyXMB58D4pLOfTCzx
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   221  100   221    0     0    523      0 --:--:-- --:--:-- --:--:--   565{"_index":"accounts","_type":"person","_id":"nvyXyXMB58D4pLOfTCzx","_version":1,"_seq_no":10,"_primary_term":4,"found":true,"_source":{"username":"zhangsan2","fullname":"張三2","deleted":"false","created":"2020-02-05"}}

(1)重新插入帶ID的文檔就是修改

(2)使用UpdateRequest

        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("accounts");
        updateRequest.type("person");
        updateRequest.id("nvyXyXMB58D4pLOfTCzx");
        updateRequest.doc(XContentFactory.jsonBuilder().startObject().field("fullname", "修改后").endObject());
        UpdateResponse updateResponse = client.update(updateRequest).get();

 

 2. 查詢API

 首先准備十條測試數據,如下:

    private static void createDocument() throws UnknownHostException, IOException {
        // on startup
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));

        for (int i = 0; i < 10; i++) {
            // 構造數據
            // 第一種:使用es heler
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("desc", "測試訂單" + i)
                    .field("createTime", new Date()).field("deleted", false).field("username", "zhangsan").endObject();
            IndexResponse response = client.prepareIndex("orders", "order").setSource(builder).get();

            // 打印保存信息
            // Index name
            String _index = response.getIndex();
            System.out.println("_index " + _index);
            // Type name
            String _type = response.getType();
            System.out.println("_type " + _type);
            // Document ID (generated or not)
            String _id = response.getId();
            System.out.println("_id " + _id);
            // Version (if it's the first time you index this document, you will
            // get: 1)
            long _version = response.getVersion();
            System.out.println("_version " + _version);
            // status has stored current instance statement.
            RestStatus status = response.status();
            System.out.println("status " + status);
        }

        // on shutdown
        client.close();
    }

1. 滾動查詢,類似於分頁查詢

        QueryBuilder qb = QueryBuilders.termQuery("username", "zhangsan");

        SearchResponse scrollResp = client.prepareSearch("orders")
                .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(new TimeValue(60000)).setQuery(qb)
                .setSize(3).get();
        // Scroll until no hits are returned

        int startPage = 1;
        do {
            System.out.println("開始分頁===" + (startPage++));
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                // Handle the hit...
                String sourceAsString = hit.getSourceAsString();
                System.out.println(sourceAsString);
            }
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute()
                    .actionGet();
        } while (scrollResp.getHits().getHits().length != 0); 

結果:

開始分頁===1

{"desc":"測試訂單0","createTime":"2020-08-09T02:31:18.435Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單1","createTime":"2020-08-09T02:31:19.455Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單2","createTime":"2020-08-09T02:31:19.655Z","deleted":false,"username":"zhangsan"}

開始分頁===2

{"desc":"測試訂單3","createTime":"2020-08-09T02:31:19.819Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單4","createTime":"2020-08-09T02:31:20.312Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單5","createTime":"2020-08-09T02:31:20.402Z","deleted":false,"username":"zhangsan"}

開始分頁===3

{"desc":"測試訂單6","createTime":"2020-08-09T02:31:20.536Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單7","createTime":"2020-08-09T02:31:20.667Z","deleted":false,"username":"zhangsan"}

{"desc":"測試訂單8","createTime":"2020-08-09T02:31:20.866Z","deleted":false,"username":"zhangsan"}

開始分頁===4

{"desc":"測試訂單9","createTime":"2020-08-09T02:31:21.137Z","deleted":false,"username":"zhangsan"}

 

2. MultiSearch 多個條件查詢(不指定查詢的索引默認查詢所有的)

        SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("qiao")).setSize(1);
        SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("username", "zhangsan2"))
                .setSize(1);
        MultiSearchResponse sr = client.prepareMultiSearch().add(srb1).add(srb2).get();

        // You will get all individual responses from
        // MultiSearchResponse#getResponses()
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits) {
                System.out.println(hit.getSourceAsString());
            }
        }

結果:

{

  "name": "zhi",

  "lastName": "qiao",

  "job": "enginee"

}

{"desc":"測試訂單","createTime":"2020-08-08T14:01:37.160Z","deleted":false,"username":"zhangsan2"}

 

3.Query DSL 以及聚合查詢

  下一篇介紹。

 

補充:使用Java高級REST client的例子如下

    private static void restClientQuery() throws IOException {
        // on startup
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 19200, "http")));

        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchQuery("name", "qiao"));
        firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        SearchRequest secondSearchRequest = new SearchRequest();
        searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchQuery("username", "zhangsan2"));
        secondSearchRequest.source(searchSourceBuilder);
        request.add(secondSearchRequest);

        // 2. 打印查詢結果
        MultiSearchResponse sr = client.msearch(request, RequestOptions.DEFAULT);
        // You will get all individual responses from
        // MultiSearchResponse#getResponses()
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits) {
                System.out.println(hit.getSourceAsString());
            }
        }

        // on shutdown
        client.close();
    }

結果:

{"username":"zhangsan2","fullname":"張三2","sex":1,"userid":3,"birth":"2020-08-09T09:29:44.832Z"}
{"username":"zhangsan2","fullname":"張三2","createTime":"2020-08-09T04:11:42.547Z","deleted":false,"sex":1}
{"amount":2,"createTime":"2020-08-09T11:09:07.789Z","description":"訂單描述2","orderid":3,"ordernum":"order2","username":"zhangsan2"}
{"amount":7,"createTime":"2020-08-09T11:09:13.823Z","description":"訂單描述7","orderid":8,"ordernum":"order7","username":"zhangsan2"}

 

中文API文檔參考:https://www.wenjiangs.com/doc/auwvbcpq1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM