Elasticsearch客戶端源碼剖析


注:本文出自博主 Chloneda個人博客 | 博客園 | Github | Gitee | 知乎

注:本文源鏈接https://www.cnblogs.com/chloneda/p/es-clients.html

前言

今天我們來聊聊Elasticsearch客戶端的類型。我們知道Elasticsearch是一種分布式的海量數據搜索與分析的技術,可以用於電商網站、門戶網站、企業IT系統等各種場景下的搜索引擎,也可以用於對海量的數據進行近實時的數據分析。

但Elasticsearch版本迭代更新太快,這就意味着在Elasticsearch升級過程中容易出現兼容性問題。也引出了今天對Elasticsearch客戶端種類及使用的問題討論!

ES客戶端種類

ES官方客戶端有TransportClient、Java Low Level REST Client和Java High Level REST Client三種。官方文檔對他們的說明是:

TransportClient

We plan on deprecating the TransportClient in Elasticsearch 7.0 and removing it completely in 8.0.

Java Low Level REST Client

the official low-level client for Elasticsearch. It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users. It is compatible with all Elasticsearch versions.

Java High Level REST Client

the official high-level client for Elasticsearch. Based on the low-level client, it exposes API specific methods and takes care of requests marshalling and responses un-marshalling.

意思就是說,TransportClient將會在將來版本進行廢棄移除,官方建議使用Java High Level REST Client。

為什么會這樣呢?這里涉及到兩個問題:

  • 未來版本為什么會淘汰TransportClient客戶端?
  • Java Low/High Level REST Client客戶端優點在哪里?

先別急,我們來看看這兩個問題!

客戶端的使用

各客戶端使用需要引入相關依賴,這里統一引入相關依賴,后面就不多贅述了!

<!-- elasticsearch core -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>
<!-- low level rest client -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>
<!-- high level rest client -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

:TransportClient將會在8.x版本后完全移除!

TransportClient

初始化TransportClient客戶端代碼示例:

public TransportClient initTransportClient(String esClusterName,String host,String port) throws UnknownHostException {
	Settings settings = Settings.builder()
			.put("cluster.name", esClusterName)
			.put("client.transport.sniff", true)
			.build();
	TransportClient client = new PreBuiltTransportClient(settings)
			.addTransportAddress(new TransportAddress(InetAddress.getByName(host),port);

	return client;
}

Java Low Level REST Client

初始化 RestClient 客戶端代碼示例:

public RestClient initRestClient(String host, int port) {
	RestClientBuilder builder = RestClient.builder(new HttpHost(host,
			port, "http"));
	Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
	builder.setDefaultHeaders(defaultHeaders);
	RestClient restClient = builder.build();
	return restClient;
}

Java High Level REST Client

初始化 RestHighLevelClient 客戶端代碼示例:

public RestHighLevelClient restHighLevelClient(List<String> hostArray) {
	//創建HttpHost數組,其中存放es主機和端口的配置信息
	HttpHost[] httpHostArray = new HttpHost[hostArray.size()];
	for (int i = 0; i < hostArray.size(); i++) {
		String item = hostArray.get(i);
		httpHostArray[i] = new HttpHost(item.split(":")[0], 
				Integer.parseInt(item.split(":")[1]), 
				"http");
	}
	//創建RestHighLevelClient客戶端
	return new RestHighLevelClient(RestClient.builder(httpHostArray));
}

以上就是初始化三種不同客戶端的示例代碼!下面我們深入客戶端代碼底層,看看他們之間有什么不一樣?

深入客戶端的底層

TransportClient

TransportClient客戶端自從Elasticsearch誕生以來,一直是Elasticsearch的一部分。 它是一個特殊的客戶端,因為它使用傳輸層協議(TCP)與Elasticsearch進行通信,如果該客戶端與其所使用的Elasticsearch不在同一版本上,則會導致兼容性問題。基於這個原因,官方會在8.x后完全移除!

因此,在這里就不對 TransportClient 客戶端底層進行深究了!

Java Low Level REST Client

2016年,Elasticsearch官方發布了一個低級REST客戶端,該客戶端基於眾所周知的Apache HTTP客戶端,它允許使用 HTTP 與任何版本的Elasticsearch集群進行通信。

我們來看看RestClient客戶端的代碼:

package org.elasticsearch.client;

public class RestClient implements Closeable {

    //已省略其他非必要屬性代碼。。。
    
    // RestClient 類構造器的第一個參數是 CloseableHttpAsyncClient,是Apache HTTP client 中的類,相關請求也是通過該參數

    RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, HttpHost[] hosts, String pathPrefix, RestClient.FailureListener failureListener) {
        this.client = client;
        this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
        this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
        this.failureListener = failureListener;
        this.pathPrefix = pathPrefix;
        this.setHosts(hosts);
    }

    //已省略其他非必要代碼。。。

    public void performRequestAsync(String method, String endpoint, Map<String, String> params, HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, ResponseListener responseListener, Header... headers) {
        try {
            Objects.requireNonNull(params, "params must not be null");
            Map<String, String> requestParams = new HashMap(params);
            String ignoreString = (String)requestParams.remove("ignore");
            Object ignoreErrorCodes;
            if (ignoreString == null) {
                if ("HEAD".equals(method)) {
                    ignoreErrorCodes = Collections.singleton(404);
                } else {
                    ignoreErrorCodes = Collections.emptySet();
                }
            } else {
                String[] ignoresArray = ignoreString.split(",");
                ignoreErrorCodes = new HashSet();
                if ("HEAD".equals(method)) {
                    ((Set)ignoreErrorCodes).add(404);
                }

                String[] var12 = ignoresArray;
                int var13 = ignoresArray.length;

                for(int var14 = 0; var14 < var13; ++var14) {
                    String ignoreCode = var12[var14];

                    try {
                        ((Set)ignoreErrorCodes).add(Integer.valueOf(ignoreCode));
                    } catch (NumberFormatException var17) {
                        throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", var17);
                    }
                }
            }

            URI uri = buildUri(this.pathPrefix, endpoint, requestParams);
            HttpRequestBase request = createHttpRequest(method, uri, entity);
            this.setHeaders(request, headers);
            RestClient.FailureTrackingResponseListener failureTrackingResponseListener = new RestClient.FailureTrackingResponseListener(responseListener);
            long startTime = System.nanoTime();
            this.performRequestAsync(startTime, this.nextHost(), request, (Set)ignoreErrorCodes, httpAsyncResponseConsumerFactory, failureTrackingResponseListener);
        } catch (Exception var18) {
            responseListener.onFailure(var18);
        }

    }

    //已省略其他非必要代碼。。。

    private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
        String var3 = method.toUpperCase(Locale.ROOT);
        byte var4 = -1;
        switch(var3.hashCode()) {
        case -531492226:
            if (var3.equals("OPTIONS")) {
                var4 = 3;
            }
            break;
        case 70454:
            if (var3.equals("GET")) {
                var4 = 1;
            }
            break;
        case 79599:
            if (var3.equals("PUT")) {
                var4 = 6;
            }
            break;
        case 2213344:
            if (var3.equals("HEAD")) {
                var4 = 2;
            }
            break;
        case 2461856:
            if (var3.equals("POST")) {
                var4 = 5;
            }
            break;
        case 75900968:
            if (var3.equals("PATCH")) {
                var4 = 4;
            }
            break;
        case 80083237:
            if (var3.equals("TRACE")) {
                var4 = 7;
            }
            break;
        case 2012838315:
            if (var3.equals("DELETE")) {
                var4 = 0;
            }
        }

        switch(var4) {
        case 0:
            return addRequestBody(new HttpDeleteWithEntity(uri), entity);
        case 1:
            return addRequestBody(new HttpGetWithEntity(uri), entity);
        case 2:
            return addRequestBody(new HttpHead(uri), entity);
        case 3:
            return addRequestBody(new HttpOptions(uri), entity);
        case 4:
            return addRequestBody(new HttpPatch(uri), entity);
        case 5:
            HttpPost httpPost = new HttpPost(uri);
            addRequestBody(httpPost, entity);
            return httpPost;
        case 6:
            return addRequestBody(new HttpPut(uri), entity);
        case 7:
            return addRequestBody(new HttpTrace(uri), entity);
        default:
            throw new UnsupportedOperationException("http method not supported: " + method);
        }
    }

}

看到上面的代碼,RestClient 類構造器的第一個參數是 CloseableHttpAsyncClient,是 Apache HTTP client 中的類,也就是說 RestClient 是基於 Apache HTTP 實現的,這里是 Apache HTTP 的依賴!

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>${http.version}</version>
</dependency>

Java High Level REST Client

最重要的是,我們發布了基於低級客戶端的高級REST客戶端,它負責請求編組和響應解組。

我們來看看 RestHighLevelClient 的底層代碼!

package org.elasticsearch.client;

public class RestHighLevelClient {
    private final RestClient client;
    private final NamedXContentRegistry registry;

    public RestHighLevelClient(RestClient restClient) {
        this(restClient, Collections.emptyList());
    }

    // 此處省略多處代碼!

    // 該類大部分方法最終會調用以下 performRequestAndParseEntity 方法,我們主要看該方法的調用關系

    protected <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request, CheckedFunction<Req, Request, IOException> requestConverter, CheckedFunction<XContentParser, Resp, IOException> entityParser, Set<Integer> ignores, Header... headers) throws IOException {
        return this.performRequest(request, requestConverter, (response) -> {
            return this.parseEntity(response.getEntity(), entityParser);
        }, ignores, headers);
    }

    protected <Req extends ActionRequest, Resp> Resp performRequest(Req request, CheckedFunction<Req, Request, IOException> requestConverter, CheckedFunction<Response, Resp, IOException> responseConverter, Set<Integer> ignores, Header... headers) throws IOException {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            throw validationException;
        } else {
            Request req = (Request)requestConverter.apply(request);

            Response response;
            try {
            
            	// 這里的 client 就是RestClient,最終還是調用 RestClient 的方法,也就是說 RestHighLevelClient 是基於 RestClient 的
                
                response = this.client.performRequest(req.getMethod(), req.getEndpoint(), req.getParameters(), req.getEntity(), headers);
            } catch (ResponseException var13) {
                ResponseException e = var13;
                if (ignores.contains(var13.getResponse().getStatusLine().getStatusCode())) {
                    try {
                        return responseConverter.apply(e.getResponse());
                    } catch (Exception var11) {
                        throw this.parseResponseException(var13);
                    }
                }

                throw this.parseResponseException(var13);
            }

            try {
                return responseConverter.apply(response);
            } catch (Exception var12) {
                throw new IOException("Unable to parse response body for " + response, var12);
            }
        }
    }

}

看上面的代碼及注解,我相信你很快就豁然開朗了!

其實上面的問題現在就有答案了!TransportClient廢棄的主要原因就是考慮到兼容性的問題,而后續兩個客戶端在兼容性方面就做的很好!

小結

關於Elasticsearch的客戶端問題,其實 ES官網 已經說得很明確了,這里也通過代碼剖析的方式去認識一下底層的代碼,加深理解!

由此可見,HighLevelClient 是基於 RestClient,而 RestClient 又是基於 Apache HTTP 客戶端, 這樣一來, 在客戶端方面, Elasticsearch 將 Java, Python, Php, Javascript 等各種語言的底層接口就都統一起來了; 與此同時, 使用 rest api, 還可以屏蔽各版本之前的差異。

這也提醒我們,在代碼的升級過渡期, 處理好新 client 和舊 client 的關系,可以減少代碼后期維護的工作量!

End!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM