異步httpClient(Async HttpClient)


  • 一、簡介
  • 二、mvn依賴
  • 三、客戶端
    •  3.1 官網實例
    •  3.2. 根據官方文檔的介紹,簡單封裝了一個異步HttpClient工具類
    • 3.3 基本原理
  • 四、參考文檔

一、簡介

      HttpClient提供了兩種I/O模型:經典的java阻塞I/O模型和基於Java NIO的異步非阻塞事件驅動I/O模型。

     Java中的阻塞I/O是一種高效、便捷的I/O模型,非常適合並發連接數量相對適中的高性能應用程序。只要並發連接的數量在1000個以下並且連接大多忙於傳輸數據,阻塞I/O模型就可以提供最佳的數據吞吐量性能。然而,對於連接大部分時間保持空閑的應用程序,上下文切換的開銷可能會變得很大,這時非阻塞I/O模型可能會提供更好的替代方案。 異步I/O模型可能更適合於比較看重資源高效利用、系統可伸縮性、以及可以同時支持更多HTTP連接的場景。

二、mvn依賴

      httpclient在4.x之后開始提供基於nio的異步版本httpasyncclient,httpasyncclient借助了Java並發庫和nio進行封裝(雖說NIO是同步非阻塞IO,但是HttpAsyncClient提供了回調的機制,與netty類似,所以可以模擬類似於AIO的效果),其調用方式非常便捷.

<dependency>
  <groupId>org.apache.httpcomponents</groupId>
  <artifactId>httpasyncclient</artifactId>
  <version>4.1.4</version>
</dependency>

三、客戶端

 3.1 官網實例

CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
try {
    // Start the client
    httpclient.start();

    // Execute request
    final HttpGet request1 = new HttpGet("http://www.apache.org/");
    Future<HttpResponse> future = httpclient.execute(request1, null);
    // and wait until a response is received
    HttpResponse response1 = future.get();
    System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine());

    // One most likely would want to use a callback for operation result
    final CountDownLatch latch1 = new CountDownLatch(1);
    final HttpGet request2 = new HttpGet("http://www.apache.org/");
    httpclient.execute(request2, new FutureCallback<HttpResponse>() {

        public void completed(final HttpResponse response2) {
            latch1.countDown();
            System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
        }

        public void failed(final Exception ex) {
            latch1.countDown();
            System.out.println(request2.getRequestLine() + "->" + ex);
        }

        public void cancelled() {
            latch1.countDown();
            System.out.println(request2.getRequestLine() + " cancelled");
        }

    });
    latch1.await();

    // In real world one most likely would also want to stream
    // request and response body content
    final CountDownLatch latch2 = new CountDownLatch(1);
    final HttpGet request3 = new HttpGet("http://www.apache.org/");
    HttpAsyncRequestProducer producer3 = HttpAsyncMethods.create(request3);
    AsyncCharConsumer<HttpResponse> consumer3 = new AsyncCharConsumer<HttpResponse>() {

        HttpResponse response;

        @Override
        protected void onResponseReceived(final HttpResponse response) {
            this.response = response;
        }

        @Override
        protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
            // Do something useful
        }

        @Override
        protected void releaseResources() {
        }

        @Override
        protected HttpResponse buildResult(final HttpContext context) {
            return this.response;
        }

    };
    httpclient.execute(producer3, consumer3, new FutureCallback<HttpResponse>() {

        public void completed(final HttpResponse response3) {
            latch2.countDown();
            System.out.println(request3.getRequestLine() + "->" + response3.getStatusLine());
        }

        public void failed(final Exception ex) {
            latch2.countDown();
            System.out.println(request3.getRequestLine() + "->" + ex);
        }

        public void cancelled() {
            latch2.countDown();
            System.out.println(request3.getRequestLine() + " cancelled");
        }

    });
    latch2.await();

} finally {
    httpclient.close();
}

 3.2. 根據官方文檔的介紹,簡單封裝了一個異步HttpClient工具類

public class AsyncHttpClientUtil {
    private static final Logger logger = LoggerFactory.getLogger(AsyncHttpClientUtil.class);
    //從池中獲取鏈接超時時間(ms)
    private static final int CONNECTION_REQUEST_TIMEOUT = 10000;
    //建立鏈接超時時間(ms)
    private static final int CONNECT_TIMEOUT = 10000;
    //讀取超時時間(ms)
    private static final int SOCKET_TIMEOUT = 5000;
    //連接數
    private static final int MAX_TOTAL = 50;
    private static final int MAX_PER_ROUTE = 50;

    private static final CloseableHttpAsyncClient httpclient;
    private static PoolingNHttpClientConnectionManager poolManager;

    static {
        httpclient=init();
        httpclient.start();
    }

    private static CloseableHttpAsyncClient init() {
        CloseableHttpAsyncClient client=null;
        try {
            //配置io線程
            IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
                    setIoThreadCount(Runtime.getRuntime().availableProcessors())
                    .setSoKeepAlive(true)
                    .build();
            //創建一個ioReactor
            ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
//            poolManager=new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor());
            poolManager=new PoolingNHttpClientConnectionManager(ioReactor);
            //設置連接池大小
            poolManager.setMaxTotal(MAX_TOTAL);
            poolManager.setDefaultMaxPerRoute(MAX_PER_ROUTE);
            // 配置請求的超時設置
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
                    .setConnectTimeout(CONNECT_TIMEOUT)
                    .setSocketTimeout(SOCKET_TIMEOUT)
                    .build();

            client= HttpAsyncClients.custom()
                    .setConnectionManager(poolManager)
                    .setDefaultRequestConfig(requestConfig)
                    .build();
            return client;
        } catch (IOReactorException e) {
            e.printStackTrace();
        }
        return client;
    }

    public static String get(String url, List<NameValuePair> ns) {
        HttpGet httpget;
        URIBuilder uri = new URIBuilder();
        try {
            if (ns!=null){
                uri.setPath(url);
                uri.addParameters(ns);
                httpget = new HttpGet(uri.build());
            }else{
                httpget = new HttpGet(url);
            }

            // One most likely would want to use a callback for operation result
            httpclient.execute(httpget, new FutureCallback<HttpResponse>() {

                public void completed(final HttpResponse response) {
                    System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine());
                    try {
                        System.out.println("當前請求狀態:"+poolManager.getTotalStats()+", response="+EntityUtils.toString(response.getEntity()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                public void failed(final Exception ex) {
                    System.out.println(httpget.getRequestLine() + "->" + ex);
                }

                public void cancelled() {
                    System.out.println(httpget.getRequestLine() + " cancelled");
                }

            });

        }catch (Exception e){
            logger.error("[發送get請求失敗]URL:{},異常:",uri.getUserInfo(), e);
        }
        return null;
    }

    public static void main(String[] args) {
        for (int i=0; i<10;i++){
            System.out.println("第" + i +"次:");
            get("http://httpbin.org/get",null);
        }
    }
}

3.3 基本原理

 HTTPAysncClient 最后使用的是 InternalHttpAsyncClient,在 InternalHttpAsyncClient 中有個 ConnectionManager,這個就是我們管理連接的管理器。

而在 httpAsync 中只有一個實現那就是 PoolingNHttpClientConnectionManager。這個連接管理器中有兩個我們比較關心的,一個是 Reactor,一個是 Cpool:

 

 

  • Reactor:所有的 Reactor 這里都是實現了 IOReactor 接口。在 PoolingNHttpClientConnectionManager 中會有擁有一個 Reactor,那就是 DefaultConnectingIOReactor,這個 DefaultConnectingIOReactor,負責處理 Acceptor。

在 DefaultConnectingIOReactor 有個 excutor 方法,生成 IOReactor 也就是我們圖中的 BaseIOReactor,進行 IO 的操作。

  • CPool:在 PoolingNHttpClientConnectionManager 中有個 CPool,主要是負責控制我們連接,我們上面所說的 maxTotal 和 defaultMaxPerRoute,都是由其進行控制。

如果每個路由有滿了,它會斷開最老的一個鏈接;如果總共的 total 滿了,它會放入 leased 隊列,釋放空間的時候就會將其重新連接。

 

關於Reactor可參考: https://www.cnblogs.com/doit8791/p/7461479.html

四、參考文檔

  1. http://hc.apache.org/httpcomponents-asyncclient-4.1.x/index.html
  2. http://hc.apache.org/httpcomponents-asyncclient-4.1.x/httpasyncclient/xref/index.html
  3. http://ifeve.com/httpclient-async-call/
  4. https://blog.csdn.net/ouyang111222/article/details/78884634


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM