flink寫入elasticsearch報錯!OOM內存溢出!連接異常關閉!


最近公司有個項目,需要flink實時地對elasticsearch進行頻繁的插入。但是在寫入elasticsearch的時候出現了OOM內存溢出的異常,以及連接異常中斷的錯誤。

報錯如下:1.Caused by: java.lang.IllegalStateException: I/O reactor has been shut down  連接異常關閉。
         2.java.lang.OutOfMemoryError: Direct buffer memory    OOM內存溢出。

首先解決第一個異常,連接中斷。網上很多人說是因為es的client調用了close方法,client請求在還沒有完畢時就已經被關閉掉,導致后面的連接不可用,從而報出來這個異常。

但是我的代碼一開始用的原生elasticsearch7.12來執行插入請求,沒用調用close方法,所以異常可能是別的原因造成的。后面改為了flink封裝的方法,需要手動關閉。

當然了,在解決這個問題之前,一定要保證代碼本身執行沒有問題,否則可能是其他的異常導致連接的關閉。

為了解決這個異常我們做了如下努力:

用flink封裝的ElasticsearchSink代替es原生的client來執行插入的請求。(可能原生的也可以,但是我們在測試過程中發現,flink封裝的效果更好,更不容易出錯)

然后設置參數:

1.設置超時時間: requestBuilder.setConnectTimeout(60000); requestBuilder.setSocketTimeout(60000);這里兩個超時時間都設置的一分鍾。

2.設置最大連接數和刷新周期: esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setBulkFlushMaxSizeMb(1); esSinkBuilder.setBulkFlushInterval(1);//刷新周期設置的1毫秒。

3.設置線程數量:

IOReactorConfig.custom().setIoThreadCount(5).build());

esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());//處理失敗的Elasticsearch請求

這里sink每執行一次就要建立一次請求,所以要進行關閉。if(build!=null)build.close();

elasticsearch7.12版本使用了登錄驗證

完整代碼如下:

  `//operator為flink數據流
    SingleOutputStreamOperator<JSONObject> operator;
    //elasticsearch 地址
    List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses("locolhost1:9200,locolhost2:9200,locolhost3:9200,locolhost4:9200,locolhost5:9200");
    //getEsAddresses實體類
    public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
    String[] hostList = hosts.split(",");
    List<HttpHost> addresses = new ArrayList<>();
    for (String host : hostList) {
        if (host.startsWith("http")) {
            URL url = new URL(host);
            addresses.add(new HttpHost(url.getHost(), url.getPort()));
        } else {
            String[] parts = host.split(":", 2);
            if (parts.length > 1) {
                addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
            } else {
                throw new MalformedURLException("invalid elasticsearch hosts format");
            }
        }
    }
    return addresses;
}

    //elasticsearch插入請求
    ESSinkUtil.addSink(esAddresses, 1, 8, operator, new ElasticsearchSinkFunction<JSONObject>() {
            @Override
            public void process(JSONObject metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                requestIndexer.add(Requests.indexRequest()
                        .index(INDEX)
                        .id(metric.get("id"))
                        .source(JSON.toJSONString(metric), XContentType.JSON));
            }
        });

    //flink封裝的elasticsearch連接sink
    public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
                               SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {
        try {
            ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
            esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);//每次最大插入數量
            esSinkBuilder.setBulkFlushMaxSizeMb(1);//最大插入內存
            esSinkBuilder.setBulkFlushInterval(1);//插入刷新周期
            esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());//處理失敗的Elasticsearch請求

            //設置自定義http客戶端配置
            esSinkBuilder.setRestClientFactory(new RestClientFactory() {
                @Override
                public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                    final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(USER, PASSWORD));
                    restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            //httpClientBuilder.disableAuthCaching();
                            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            return httpClientBuilder.setDefaultIOReactorConfig(
                                    IOReactorConfig.custom()
                                            .setIoThreadCount(5)//設置線程數量為5
                                            .build());
                        }
                    }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback(){
                        @Override
                        public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestBuilder) {
                            requestBuilder.setConnectTimeout(5000);//連接超時時間
                            requestBuilder.setSocketTimeout(60000);
                            requestBuilder.setConnectionRequestTimeout(10000);
                            return requestBuilder;
                        }
                    });
                }
            });
            //todo:xpack security
            ElasticsearchSink<T> build = esSinkBuilder.build();
            data.addSink(build).setParallelism(parallelism);
            if (build!=null) build.close();//build用完后一定要關閉
        } catch (Exception e) {
            e.printStackTrace();
        }
  }

  //處理失敗的Elasticsearch請求
  public class RetryRequestFailureHandler implements ActionRequestFailureHandler {

      public RetryRequestFailureHandler() {
      }

      @Override
      public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
          if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
              requestIndexer.add(new ActionRequest[]{actionRequest});
          } else {
              if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
                  return;
              } else {
                  Optional<IOException> exp = ExceptionUtils.findThrowable(throwable, IOException.class);
                  if (exp.isPresent()) {
                      IOException ioExp = exp.get();
                      if (ioExp != null && ioExp.getMessage() != null && ioExp.getMessage().contains("max retry timeout")) {
                           return;
                      }
                  }
              }
              throw throwable;
          }
      }
  }

  //下面原生elasticsearch建立client連接
  public static RestHighLevelClient client(){
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY,
            new UsernamePasswordCredentials(USER, PASSWORD));  //es賬號密碼(默認用戶名為elastic)
    //創建帶用戶名密碼的ES客戶端對象
    try {
        if (null == client){
            client = new RestHighLevelClient(RestClient.builder(new HttpHost(PRODUCE_HOST,PORT,SCHEMA)
                    ,new HttpHost(PRODUCE_HOST2,PORT,SCHEMA),new HttpHost(PRODUCE_HOST3,PORT,SCHEMA),new HttpHost(PRODUCE_HOST4,PORT,SCHEMA)
                    ,new HttpHost(PRODUCE_HOST5,PORT,SCHEMA))
                    //異步HTTPclient連接數配置
                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            //httpClientBuilder.disableAuthCaching();
                            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            return httpClientBuilder.setDefaultIOReactorConfig(
                                    IOReactorConfig.custom()
                                            .setIoThreadCount(5)
                                            .build());

                        }
                    })
                    .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback(){
                        @Override
                        public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestBuilder) {
                            requestBuilder.setConnectTimeout(5000);
                            requestBuilder.setSocketTimeout(60000);
                            requestBuilder.setConnectionRequestTimeout(10000);
                            return requestBuilder;
                        }
                    })
            );
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return client;
}`

然后解決OOM內存溢出的問題,我們出發點是代碼里調整並行度,保證有足夠的slot可用,暫定為8。然后環境配置里調大內存。一般來說內存溢出就是存在內存泄漏

還有可能是代碼本身異常太多,導致程序異常。通過修改代碼,找到可能出現異常的地方,進行修改。

接着就是給flink設置重啟策略

上述操作弄好之后,flink的報錯就消失了,之前任務一直跑不上去,放到ui上面馬上就報紅失敗。

碼字不易,如果問題解決了別忘了留言點贊噢


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM