使用Java Low Level REST Client操作elasticsearch


Java REST客戶端有兩種風格:

Java低級別REST客戶端(Java Low Level REST Client,以后都簡稱低級客戶端算了,難得碼字):Elasticsearch的官方low-level客戶端。 它允許通過http與Elasticsearch集群進行通信。 不會對請求進行編碼和響應解碼。 它與所有Elasticsearch版本兼容。
Java高級REST客戶端(Java High Level REST Client,以后都簡稱高級客戶端):Elasticsearch的官方high-level客戶端。 基於low-level客戶端,它公開了API特定的方法,並負責處理。

低級客戶端 的功能包括:

  • 依賴最小
  • 所有可用節點,會負載平衡
  • 在節點故障和響應特定狀態碼的情況下會進行故障轉移
  • 連接失敗會進行處罰(失敗的節點是否重試,取決於連續失敗的次數,失敗次數越多,客戶端等待的時間越長)
  • 持久連接
  • 跟蹤記錄請求和響應的日志
  • 可選的自動發現群集節點

低級客戶端 快速入門

Java API文檔在這里可以找到。低級客戶端托管在Maven Central上。所需的最低Java版本是1.7。低級客戶端與Elasticsearch的發布周期相同。發布的第一個版本為5.0.0-alpha4。客戶端版本和與之通信的Elasticsearch版本沒有任何關系,可以替換客戶端版本為你想要的任何版本。低級客戶端與所有Elasticsearch版本兼容。

Maven Repository  

Maven 配置

下面是使用maven作為依賴管理器配置依賴項。 將以下內容添加到您的pom.xml文件中:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.2.3</version>
</dependency>

Gradle 配置

 下面是使用gradle作為依賴項管理器來配置依賴項。在您的build.gradle中添加以下內容:

dependencies {
    compile 'org.elasticsearch.client:elasticsearch-rest-client:6.2.3'
}

Dependencies

低級客戶端在內部使用Apache Http Async Client發送http請求。 它依賴於以下部件,即the async http client及其自身的傳遞依賴:

  • org.apache.httpcomponents:httpasyncclient
  • org.apache.httpcomponents:httpcore-nio
  • org.apache.httpcomponents:httpclient
  • org.apache.httpcomponents:httpcore
  • commons-codec:commons-codec
  • commons-logging:commons-logging

Shading

為了避免版本沖突,依賴需要shaded(翻譯為被隱藏不知合不合理)和打包到一個單獨的jar文件中。(該操作也被稱作"uber JAR"或"fat JAR",是一種可執行的Jar包。FatJar和普通的jar不同在於它包含了依賴的jar包。)
對依賴進行隱藏需要取其內容(資源文件和java類文件),然后在放到jar文件之前會對一些包進行重命名。該操作可以使用第三方的插件,比如Gradle 和 Maven來完成。感興趣的參照這兒。

請注意,隱藏一個JAR也是有缺點的。 例如,隱藏 Commons Logging 層,意味着也需要對依賴的第三方日志進行隱藏。

Maven 配置

下面是使用Maven Shade插件的配置。將以下內容添加到您的pom中。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals><goal>shade</goal></goals>
                    <configuration>
                        <relocations>
                            <relocation>
                                <pattern>org.apache.http</pattern>
                                <shadedPattern>hidden.org.apache.http</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.logging</pattern>
                                <shadedPattern>hidden.org.apache.logging</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.commons.codec</pattern>
                                <shadedPattern>hidden.org.apache.commons.codec</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.commons.logging</pattern>
                                <shadedPattern>hidden.org.apache.commons.logging</shadedPattern>
                            </relocation>
                        </relocations>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Gradle 配置

下面是使用Gradle ShadowJar插件的配置。在您的 build.gradle 中添加以下內容。

shadowJar {
    relocate 'org.apache.http', 'hidden.org.apache.http'
    relocate 'org.apache.logging', 'hidden.org.apache.logging'
    relocate 'org.apache.commons.codec', 'hidden.org.apache.commons.codec'
    relocate 'org.apache.commons.logging', 'hidden.org.apache.commons.logging'
}

初始化

RestClient實例可以通過RestClientBuilder類創建,通過RestClient 的 builder(HttpHost ...)靜態方法創建。 唯一需要的參數是客戶端將與之通信的一個或多個主機,如下所示:

RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9200, "http"),
        new HttpHost("localhost", 9201, "http")).build();

RestClient類是線程安全的,理想情況下與使用它的應用程序具有相同的生命周期。當不再需要時關閉它是非常重要的,這樣它所使用的所有資源以及底層http客戶端實例及其線程都可以得到釋放。

restClient.close();

RestClientBuilder還允許在構建RestClient實例時可選地設置以下配置參數:

       //配置可選參數
        RestClientBuilder builder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"));
        Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
        //設置每個請求需要發送的默認headers,這樣就不用在每個請求中指定它們。
        builder.setDefaultHeaders(defaultHeaders);
        // 設置應該授予的超時時間,以防對相同的請求進行多次嘗試。默認值是30秒,與默認socket超時時間相同。
        // 如果自定義socket超時時間,則應相應地調整最大重試超時時間。
        builder.setMaxRetryTimeoutMillis(10000);
        builder.setFailureListener(new RestClient.FailureListener() {
            @Override
            public void onFailure(HttpHost host) {
                //設置一個監聽程序,每次節點發生故障時都會收到通知,這樣就可以采取相應的措施。
                //Used internally when sniffing on failure is enabled.(這句話沒搞懂啥意思)
            }
        });
        builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                //設置允許修改默認請求配置的回調
                // (例如,請求超時,身份驗證或org.apache.http.client.config.RequestConfig.Builder允許設置的任何內容)
                return requestConfigBuilder.setSocketTimeout(10000);
            }
        });
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                //設置允許修改http客戶端配置的回調
                // (例如,通過SSL的加密通信,或者org.apache.http.impl.nio.client.HttpAsyncClientBuilder允許設置的任何內容)
                return httpClientBuilder.setProxy(new HttpHost("proxy", 9000, "http"));
            }
        });

執行請求

 一旦創建了RestClient,就可以調用performRequest或performRequestAsync方法來發送請求。

performRequest方法是同步的,直接返回響應,這意味着客戶端將被阻塞並等待響應返回。

performRequestAsync方法返回void,並接受一個ResponseListener作為參數,這意味着它們是異步執行的。當請求完成或失敗時,監聽器將被通知。

發送同步請求

    //方式1:只提供謂詞和終節點,這兩個參數是必需要的參數
        Response response = restClient.performRequest("GET", "/");

        //方式2:提供謂詞和終節點以及一些查詢字符串參數來發送請求
        Map<String, String> params = Collections.singletonMap("pretty", "true");
        response = restClient.performRequest("GET", "/", params);

        //方式3:提供謂詞和終節點以及可選查詢字符串參數和org.apache.http.HttpEntity對象中包含的請求主體來發送請求
        params = Collections.emptyMap();
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        //為HttpEntity指定ContentType非常重要,因為它將用於設置Content-Type請求頭,以便Elasticsearch可以正確解析內容。
        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
        response = restClient.performRequest("PUT", "/posts/doc/1", params, entity);

        //方式4:提供謂詞,終節點,可選查詢字符串參數,可選請求主體
        // 以及用於為每個請求嘗試創建org.apache.http.nio.protocol.HttpAsyncResponseConsumer回調實例的可選工廠來發送請求。
        // 控制響應正文如何從客戶端的非阻塞HTTP連接進行流式傳輸。
        // 如果未提供,則使用默認實現,將整個響應主體緩存在堆內存中,最大為100 MB。
        params = Collections.emptyMap();
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);
        response = restClient.performRequest("GET", "/posts/_search", params, null, consumerFactory);

 發送異步請求

    //方式1: 提供謂詞,終節點和響應監聽器來發送異步請求,一旦請求完成,就會通知響應監聽器,這三個參數是必需要的參數
        ResponseListener responseListener = new ResponseListener() {
            @Override
            public void onSuccess(Response response) {
                // 定義請求成功執行時需要做的事情
            }
            @Override
            public void onFailure(Exception exception) {
                // 定義請求失敗時需要做的事情,即每當發生連接錯誤或返回錯誤狀態碼時做的操作。
            }
        };
        restClient.performRequestAsync("GET", "/", responseListener);

        //方式2: 提供謂詞,終節點,一些查詢字符串參數和響應監聽器來發送異步請求
        Map<String, String>  params = Collections.singletonMap("pretty", "true");
        restClient.performRequestAsync("GET", "/", params, responseListener);

        //方式3:提供謂詞,終節點,可選查詢字符串參數,
        // org.apache.http.HttpEntity對象中包含的請求主體以及在請求完成后通知響應偵聽器 來發送異步請求
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        NStringEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
        restClient.performRequestAsync("PUT", "/posts/doc/1", params, entity, responseListener);

        //方式4:提供謂詞,終節點,可選查詢字符串參數,可選請求主體
        // 以及用於為每個請求嘗試創建org.apache.http.nio.protocol.HttpAsyncResponseConsumer回調實例的可選工廠 來發送異步請求。
        // 控制響應正文如何從客戶端的非阻塞HTTP連接進行流式傳輸。
        // 如果未提供,則使用默認實現,將整個響應主體緩存在堆內存中,最大為100 MB。
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory  consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);
        restClient.performRequestAsync("GET", "/posts/_search", params, null, consumerFactory, responseListener);

接下來是一個發送異步請求的基本示例:

final CountDownLatch latch = new CountDownLatch(documents.length);
        for (int i = 0; i < documents.length; i++) {
            restClient.performRequestAsync(
                    "PUT",
                    "/posts/doc/" + i,
                    Collections.<String, String>emptyMap(),
                    //此處假設文檔已存在 HttpEntity數組里
                    documents[i],
                    new ResponseListener() {
                        @Override
                        public void onSuccess(Response response) {
                            //處理返回的響應內容
                            latch.countDown();
                        }

                        @Override
                        public void onFailure(Exception exception) {
                            // 由於通信錯誤或帶有指示錯誤的狀態碼的響應,用於處理返回的異常
                            latch.countDown();
                        }
                    }
            );
        }
        latch.await();

上面列出的每個方法都支持通過Header 可變參數發送請求頭,源碼如下:

一個header時, 如下例所示:

Response response = restClient.performRequest("GET", "/", new BasicHeader("header", "value"));

 多個header時,如下所示:

Header[] headers = {
        new BasicHeader("header1", "value1"),
        new BasicHeader("header2", "value2")
};
restClient.performRequestAsync("GET", "/", responseListener, headers);

獲取響應

 Response對象(由同步performRequest方法返回或由ResponseListener的onSuccess(Response)中的參數接收),包裝從http客戶端返回的響應對象並公開一些其他的信息。

 RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")).build();
        Response response = restClient.performRequest("GET", "/");
        RequestLine requestLine = response.getRequestLine();//關於已執行請求的信息
        HttpHost host = response.getHost();//返回響應的主機
        int statusCode = response.getStatusLine().getStatusCode();//響應狀態行,可以從中獲取狀態碼
        Header[] headers = response.getHeaders();// 獲取響應頭
        String header=response.getHeader("content-type");// 獲取指定名稱的響應頭
        String responseBody = EntityUtils.toString(response.getEntity());//響應體包含在org.apache.http.HttpEntity對象中

執行請求時,會在以下情況中引發異常(異步時在ResponseListener#onFailure(Exception)中作為參數接收到該異常):

  1. IOException,通信問題(例如SocketTimeoutException)
  2. ResponseException,返回了一個響應,但是它的狀態碼表明是錯誤的(不是2xx)。 ResponseException是一個有效的http響應,因此它暴露了其相應的Response對象,可以訪問返回的響應。

對於返回404狀態代碼的HEAD請求,不會引發ResponseException,因為它是預期的HEAD響應,它只是表示找不到資源。 

Response response = restClient.performRequest("HEAD", "/s");//不會拋異常

除非ignore參數包含404,否則所有其他HTTP方法(例如GET)都會為404響應拋出ResponseException。

ignore是一個特殊的客戶端參數,它不會發送到Elasticsearch,且包含以逗號分隔的錯誤狀態碼列表。 它允許控制是否應將某些錯誤狀態碼視為預期響應而不是異常。

這對於get api來說很有用,因為它可以在缺少文檔時返回404,在這種情況下,響應主體將不會包含錯誤,而是通常的get api響應,只是沒有文檔,因為它沒有找到。

注意,低級客戶端不會序列化或反序列化json。用戶可以自由使用他們喜歡的庫。

底層的Apache Async Http Client附帶不同的org.apache.http.HttpEntity實現,允許以不同格式(流,字節數組,字符串等)提供請求體。

至於讀取響應體,HttpEntity的getContent方法很方便,它會返回來自先前緩沖的響應體的InputStream。作為一種替代方法,可以提供一個自定義org.apache.http.nio.protocol.HttpAsyncResponseConsumer來控制如何讀取和緩沖字節。

 日志

The Java REST client使用了和Apache Async Http Client相同的日志庫:Apache Commons Logging,
它支持許多流行的日志實現。 用於啟用日志記錄功能的java包分別是客戶端本身的org.elasticsearch.client,以及嗅探器的org.elasticsearch.client.sniffer。

也可以啟用以curl格式來記錄每個請求和相應的響應。 這使得調試時非常方便。

例如在需要手動執行請求以檢查它是否仍然產生相同的響應時。 為跟蹤器包啟用日志記錄以打印出此類日志。 請注意,該類型的日志記錄非常昂貴,不應始終在生產環境中啟用,而只是在需要時暫時使用。

官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-overview.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM