並發查詢ElasticSearch, 根據分片來實現


並發查詢ES,根據分片的個數來設置並發

  1. 獲取所有的分片
  2. 設置並發
  3. 每個線程都可以使用scroll全量查詢分片數據.

直連分片的這種方式有可能會導致ES集群壓力增加,只能適用於低頻、需要快速導出數據的場景,不能過度依賴.

所用到的依賴:

    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.2</version>
        </dependency>
    </dependencies>

代碼:


import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;

public class ShardQuery {

    private static String                   index   = "index_name";
    private static AtomicLong               count   = new AtomicLong(0); // 統計當前已查詢ES記錄數.(測試代碼, 生產環境需要刪掉)
    private static ScheduledFuture<?>       scheduledFuture; // (測試代碼, 生產環境需要刪掉)
    private static ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);

    /**
     * 無密碼認證.
     */
    private static RestHighLevelClient newEsClient() {
        HttpHost host = new HttpHost("ip", port);
        RestClientBuilder restClientBuilder = RestClient.builder(host);
        return new RestHighLevelClient(restClientBuilder);
    }

    /**
     * 有密碼認證.
     */
//    private static RestHighLevelClient newEsClient() {
//        HttpHost host = new HttpHost("ip", port);
//        RestClientBuilder restClientBuilder = RestClient.builder(host);
//        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY,
//            new UsernamePasswordCredentials("username", "passwd"));
//        restClientBuilder.setHttpClientConfigCallback((httpClientBuilder) -> {
//            httpClientBuilder.disableAuthCaching();
//            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//        });
//        return new RestHighLevelClient(restClientBuilder);
//    }

    public static void main(String[] args) {
        countPrinter();

        int shards = getShardsNum(index);
        ExecutorService exec = Executors.newFixedThreadPool(shards);
        CountDownLatch countDownLatch = new CountDownLatch(shards);

        for (int i = 0; i < shards; i++) {
            int finalI = i;
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    RestHighLevelClient client = newEsClient();
                    Scroll scroll = new Scroll(TimeValue.timeValueSeconds(30));
                    SearchRequest searchRequest = new SearchRequest(index);
                    searchRequest.scroll(scroll);
                    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
                    searchSourceBuilder.sort("_doc", SortOrder.ASC);
                    searchSourceBuilder.size(5);// 測試值. 生產環境建議設置在 1000 ~ 10000
                    searchRequest.source(searchSourceBuilder);

                    if (shards > 1) {
                        searchRequest.preference("_shards:" + finalI); // 分片偏好. 只有分片數大於1時才起作用.
                    }

                    System.out.println(String.format("啟動線程%s, 編號:%d", Thread.currentThread().getName(), finalI));

                    SearchResponse searchResponse = null;
                    try {
                        searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    assert searchResponse != null;
                    String scrollId = searchResponse.getScrollId();
                    SearchHit[] hits = searchResponse.getHits().getHits();
                    while (hits.length != 0) {
                        for (SearchHit hit : hits) {
                            String data = hit.getSourceAsString();
                            System.out.println(finalI + " 查詢數據: " + data);
                            // todo 此處發送數據到其他sink datasource
                            count.getAndIncrement();
                        }

                        SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                        searchScrollRequest.scroll(scroll);

                        SearchResponse searchScrollResponse = null;
                        try {
                            searchScrollResponse = client.scroll(searchScrollRequest,
                                RequestOptions.DEFAULT);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        assert searchScrollResponse != null;
                        scrollId = searchScrollResponse.getScrollId();
                        hits = searchScrollResponse.getHits().getHits();
                    }
                    ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                    clearScrollRequest.addScrollId(scrollId);
                    try {
                        client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        try {
            countDownLatch.await();
            scheduledFuture.cancel(true);
            System.out.println("最終計數器, 一共讀取記錄數: " + count.toString());
            exec.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查詢index的分片數量.
     */
    private static int getShardsNum(String indexName) {
        RestHighLevelClient client = newEsClient();
        GetSettingsRequest settingsRequest = new GetSettingsRequest().indices(indexName);
        settingsRequest.names("index.number_of_shards");
        int shards = 1;
        try {
            GetSettingsResponse settingsResponse = client
                .indices()
                .getSettings(settingsRequest, RequestOptions.DEFAULT);
            String numberOfShards = settingsResponse
                .getSetting(indexName, "index.number_of_shards");
            shards = Integer.parseInt(numberOfShards);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return shards;
    }

    private static void countPrinter() {
        scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println(System.currentTimeMillis() + "\t當前count: " + count.get());
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM