ES 常用java api


java rest client 有兩種:

  1、Java Low Level REST Client :用於Elasticsearch的官方低層客戶端。它允許通過http與Elasticsearch集群通信。葉子請求編組,響應反編組給用戶。它兼容所有的Elasticsearch版本。

  2、Java High Level REST Client :Elasticsearch的官方高級客戶端。它基於底層客戶端,公開API特定的方法,處理請求編組和響應反編組。

 

一、Java Low Level REST Client

  1、先引入jar包

  <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>6.6.0</version>
    </dependency>

  2、編寫代碼

public class LowLevelRestClientTest {
    
    //RequestOptions類包含請求的一些部分,這些部分應該在同一個應用程序中的多個請求之間共享。你可以創建一個單實例,並在所有請求之間共享:
    private static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        builder.addHeader("Authorization", "kyle " + "TOKEN"); 
        builder.setHttpAsyncResponseConsumerFactory(           
            new HttpAsyncResponseConsumerFactory
                .HeapBufferedResponseConsumerFactory(200*1024*1024));
        COMMON_OPTIONS = builder.build();
    }
    
    public static RestClient buildRestClient(){
        //創建RestClientBuilder
                RestClientBuilder builder = RestClient.builder(
                        new HttpHost("localhost",9201,"http"),
                        new HttpHost("localhost",9202,"http"),
                        new HttpHost("localhost",9203,"http")
                        );
                //在創建restClient的同時,設置每個請求需要發送的默認頭文件,以避免在每個請求中指定它們
                Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
                builder.setDefaultHeaders(defaultHeaders);
                //設置應該遵守的超時,以防對同一個請求進行多次嘗試,默認為3000ms.
                builder.setMaxRetryTimeoutMillis(1000);
                //設置一個監聽器,用來在節點發生故障的時候,采取相應的操作。
                builder.setFailureListener(new RestClient.FailureListener(){
                    @Override
                    public void onFailure(Node node) {
                        super.onFailure(node);
                        //doSomeThing();
                    }
                });
                //將節點選擇器設置為用於過濾客戶機將發送請求到的節點之間的節點,這些節點被設置為客戶機本身。
                //這對於防止在啟用嗅探時將請求發送到專用的主節點非常有用。默認情況下,客戶機向每個配置的節點發送請求
                builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
                //設置一個回調函數,允許修改默認的請求配置
                /*builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                    
                    @Override
                    public Builder customizeRequestConfig(Builder arg0) {
                        return null;
                    }
                });*/
                
                //設置一個回調,允許修改http客戶機配置
                /*builder.setHttpClientConfigCallback(new HttpClientConfigCallback(){

                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder arg0) {
                        return null;
                    }
                    
                });*/
                
                //創建restClient,中間的那些配置也可以不設置
                RestClient restClient = builder.build();
                
                return restClient;
    }
    
    public static void request(RestClient restClient) throws ParseException, IOException{
        //創建一個請求組
                Request request = new Request("GET","/_search");
                //為請求添加一些參數
                request.addParameter("pretty", "true");
                
                //請求的主體設置為任何HttpEntity,
                //為HttpEntity指定的ContentType非常重要,因為它將用於設置content - type頭部,以便Elasticsearch能夠正確解析內容。
                //request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
                
                //還可以將其設置為一個字符串,該字符串將默認為application/json的ContentType。
                request.setJsonEntity("{\"query\": {\"match\": {\"address\": \"Street\"}}}");
                
                //非必須
                request.setOptions(COMMON_OPTIONS);
                
                //發送一個同步的請求,線程會阻塞
                Response response = restClient.performRequest(request);
                
                //發送異步請求,然后使用監聽器來對返回結果進行處理
                /*restClient.performRequestAsync(request, new ResponseListener() {
                    
                    @Override
                    public void onSuccess(Response resp) {
                        System.out.println("成功");
                    }
                    
                    @Override
                    public void onFailure(Exception arg0) {
                        System.out.println("失敗");
                    }
                });*/
                
                //有關已執行請求的信息
                RequestLine requestLine = response.getRequestLine();
                //返回響應的主機
                HttpHost host = response.getHost();
                //響應狀態行,您可以從中檢索狀態代碼
                int statusCode = response.getStatusLine().getStatusCode();
                //響應頭
                Header[] headers = response.getHeaders();
                
                String responseBody = EntityUtils.toString(response.getEntity());
                
                System.out.println(requestLine.getUri());
                System.out.println(host);
                System.out.println(statusCode);
                System.out.println(headers);
                System.out.println(responseBody);
                
    }
    

    public static void main(String[] args) throws IOException {
        RestClient restClient = buildRestClient();
        request(restClient);
        //關閉restClient
        restClient.close();
    }

  

  3、RequestConfigCallback 和 HttpClientConfigCallback 的一些常用的配置

  RequestConfigCallback和HttpClientConfigCallback允許Apache Async Http客戶機公開的任何定制。這些回調使修改客戶機的某些特定行為成為可能,而無需覆蓋使用RestClient初始化的所有其他默認配置。本節描述一些常見的場景,這些場景需要對底層Java REST客戶機進行額外的配置。

  3.1、身份驗證

final CredentialsProvider credentialsProvider =
    new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
    new UsernamePasswordCredentials("user", "password"));

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            //httpClientBuilder.disableAuthCaching(); ①
            return httpClientBuilder
                .setDefaultCredentialsProvider(credentialsProvider);
        }
    });

  ①可以禁用搶占式身份驗證,這意味着每個請求都將在沒有授權頭的情況下發送,以查看它是否被接受,並且在接收到HTTP 401響應后,它將使用基本身份驗證頭重新發送完全相同的請求。

  3.2、默認情況下,Apache Http Async客戶機啟動一個dispatcher線程和連接管理器使用的多個工作線程,以及本地檢測到的處理器的數量(取決於Runtime.getRuntime(). availableprocessors()返回的是什么)。線程數可以修改如下:

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            return httpClientBuilder.setDefaultIOReactorConfig(
                IOReactorConfig.custom()
                    .setIoThreadCount(1)
                    .build());
        }
    });

  3.3、設置鏈接超時或者socket超時

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setRequestConfigCallback(
        new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(
                    RequestConfig.Builder requestConfigBuilder) {
                return requestConfigBuilder
                    .setConnectTimeout(5000)
                    .setSocketTimeout(60000);
            }
        })
    .setMaxRetryTimeoutMillis(60000);

  3.4、加密傳輸

KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(keyStorePath)) {
    truststore.load(is, keyStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom()
    .loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200, "https"))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            return httpClientBuilder.setSSLContext(sslContext);
        }
    });

 4、嗅探器(Sniffer):允許從運行的elasticsearch集群中自動發現節點,並將其設置到現有的RestClient實例中。 默認情況下,它將使用Nodes Info api來檢索屬於集群的節點,並使用jackson解析響應的json數據。假如集群中有100個節點,如果我們全部用手寫進代碼那樣麻煩而且容易出錯。這時候就可以使用嗅探器來自動發現節點。

  4.1、先引入jar包

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client-sniffer</artifactId>
    <version>6.6.0</version>
</dependency>

  4.2、sinffer的創建和關閉

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201, "http"))
    .build();
//創建Sniffer 並設置一分鍾更新一次,默認為5分鍾
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(60000).build();


//sniffer需要在restClient之前關閉
sniffer.close();
restClient.close();

  4.3、還可以啟用故障嗅探功能,這意味着在每次故障之后,節點列表將立即更新,而不是在接下來的普通嗅探輪中更新。在這種情況下,需要首先創建一個SniffOnFailureListener,並在創建RestClient時提供它。同樣,在稍后創建嗅探器之后,它需要與相同的SniffOnFailureListener實例相關聯,該實例將在每次失敗時得到通知,並使用嗅探器執行所述的額外嗅探。

SniffOnFailureListener sniffOnFailureListener =
    new SniffOnFailureListener();
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201))
    .setFailureListener(sniffOnFailureListener) 
    .build();
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffAfterFailureDelayMillis(30000) 
    .build();
sniffOnFailureListener.setSniffer(sniffer);

  4.4、使用https進行連接

RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9201, "http"))
        .build();
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
        restClient,
        ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
        ElasticsearchNodesSniffer.Scheme.HTTPS);
Sniffer sniffer = Sniffer.builder(restClient)
        .setNodesSniffer(nodesSniffer).build();

  4.5、同樣,也可以定制sniffRequestTimeout,默認值為1秒。是超時參數作為一個查詢字符串參數當調用節點信息提供api,所以當超時過期在服務器端,一個有效的響應仍返回雖然可能只包含節點的一個子集,是集群的一部分,那些在那之前已經做出了回應。

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201, "http"))
    .build();
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
    restClient,
    TimeUnit.SECONDS.toMillis(5),
    ElasticsearchNodesSniffer.Scheme.HTTP);
Sniffer sniffer = Sniffer.builder(restClient)
    .setNodesSniffer(nodesSniffer).build();

 

 二、Java High Level REST Client 

  Java高級REST客戶機在Java低級REST客戶機之上工作。它的主要目標是公開API特定的方法,這些方法接受請求對象作為參數並返回響應對象,以便請求編組和響應反編組由客戶機本身處理。 可以同步或異步調用每個API。同步方法返回一個響應對象,而異步方法(名稱以async后綴結尾)則需要一個偵聽器參數,一旦響應或錯誤為r,偵聽器參數(在低級客戶機管理的線程池上)就會被通知

  1、兼容性

    1.1、Java高級REST客戶機需要Java 1.8,並且依賴於Elasticsearch core項目。客戶端版本與客戶端開發的Elasticsearch版本相同

    1.2、高級客戶機保證能夠與運行在相同主版本和更大或更小版本上的Elasticsearch節點通信。它不需要處於與之通信的Elasticsearch節點相同的次要版本中,因為它是向前兼容的,這意味着它支持與后來版本的Elasticsearch通信。

但是向前版本通信可能會有不兼容的。例如在6.1和6.0之間,如果6.1客戶機支持一些api的新請求體字段,而這些api是6.0節點所不知道的。

    1.3、建議當elasticsearch集群版本升級后,最好是將客戶端版本也升級到該版本。

  2、引入jar包

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.6.0</version>
</dependency>

  3、編寫代碼

public class HighLeveRestClientTest {

    public static void main(String[] args) throws IOException {
        //創建高級rest客戶端
        /*高級客戶端將在內部創建用於基於提供的構建器執行請求的低級客戶端。這個低級客戶機維護一個連接池,並啟動一些線程,
         * 所以當您真正完成高級客戶機的操作時,應該關閉它,然后關閉內部低級客戶機來釋放這些資源。
         * */
        //高級rest客戶端和低級客戶端一樣設置requestOptions,具體參考低級客戶端。
        RestHighLevelClient restClient = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9201),
                        new HttpHost("127.0.0.1", 9202),
                        new HttpHost("127.0.0.1", 9203)
                        )
                );
        
        //構建請求,有很多種構建請求的方式,這里只列舉一種
        IndexRequest request = new IndexRequest(
                "posts", 
                "doc",  
                "1");   
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        request.source(jsonString, XContentType.JSON);
        
        //執行請求
        IndexResponse indexResponse = restClient.index(request, RequestOptions.DEFAULT);
        
        //處理返回
        String index = indexResponse.getIndex();
        String type = indexResponse.getType();
        String id = indexResponse.getId();
        long version = indexResponse.getVersion();
        System.out.println(index);
        System.out.println(type);
        System.out.println(id);
        System.out.println(version);
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
            System.out.println("創建");
        } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            System.out.println("更新");
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason(); 
                System.out.println(reason);
            }
        }
        //關閉客戶端
        restClient.close();
    }

}

 

由於后續的api太多,懶得記錄了,上個鏈接吧 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM