異步httpclient(httpasyncclient)的使用與總結


參考:異步httpclient(httpasyncclient)的使用與總結

1. 前言
應用層的網絡模型有同步與異步。同步意味當前線程是阻塞的,只有本次請求完成后才能進行下一次請求;異步意味着所有的請求可以同時塞入緩沖區,不阻塞當前的線程;

httpclient在4.x之后開始提供基於nio的異步版本httpasyncclient,httpasyncclient借助了Java並發庫和nio進行封裝(雖說NIO是同步非阻塞IO,但是HttpAsyncClient提供了回調的機制,與netty類似,所以可以模擬類似於AIO的效果),其調用方式非常便捷,但是其中也有許多需要注意的地方。

2. pom文件
本文依賴4.1.2,當前最新的客戶端版本是4.1.3maven repository 地址

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.5</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3. 簡單的實例
public class TestHttpClient {
public static void main(String[] args){

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)
.build();

//配置io線程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.build();
//設置連接池大小
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(100);
connManager.setDefaultMaxPerRoute(100);


final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();


//構造請求
String url = "http://127.0.0.1:9200/_bulk";
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
try {
String a = "{ \"index\": { \"_index\": \"test\", \"_type\": \"test\"} }\n" +
"{\"name\": \"上海\",\"age\":33}\n";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);

//start
client.start();

//異步請求
client.execute(httpPost, new Back());

while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class Back implements FutureCallback<HttpResponse>{

private long start = System.currentTimeMillis();
Back(){
}

public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}

public void failed(Exception e) {
System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
}

public void cancelled() {

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
4. 幾個重要的參數
4.1 TimeOut(3個)的設置


ConnectTimeout : 連接超時,連接建立時間,三次握手完成時間。
SocketTimeout : 請求超時,數據傳輸過程中數據包之間間隔的最大時間。
ConnectionRequestTimeout : 使用連接池來管理連接,從連接池獲取連接的超時時間。

在實際項目開發過程中,這三個值可根據具體情況設置。

(1) 下面針對ConnectionRequestTimeout的情況進行分析

實驗條件:設置連接池最大連接數為1,每一個異步請求從開始到回調的執行時間在100ms以上;

實驗過程:連續發送2次請求

public class TestHttpClient {
public static void main(String[] args){

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(10)//設置為10ms
.build();

//配置io線程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.build();
//設置連接池大小
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(1);//最大連接數設置1
connManager.setDefaultMaxPerRoute(1);//per route最大連接數設置1


final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();


//構造請求
String url = "http://127.0.0.1:9200/_bulk";
List<HttpPost> list = new ArrayList<HttpPost>();
for(int i=0;i<2;i++){
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
try {
String a = "{ \"index\": { \"_index\": \"test\", \"_type\": \"test\"} }\n" +
"{\"name\": \"上海\",\"age\":33}\n";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);
list.add(httpPost);
}

client.start();

for(int i=0;i<2;i++){
client.execute(list.get(i), new Back());
}

while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class Back implements FutureCallback<HttpResponse>{

private long start = System.currentTimeMillis();
Back(){
}

public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}

public void failed(Exception e) {
e.printStackTrace();
System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
}

public void cancelled() {

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
實驗結果 :
第一次請求執行時間在200ms左右
第二請求回調直接拋出TimeOutException


java.util.concurrent.TimeoutException
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:364)
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:344)
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:318)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:303)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:239)
at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:168)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
結果分析:由於連接池大小是1,第一次請求執行后連接被占用(時間在100ms),第二次請求在規定的時間內無法獲取連接,於是直接連接獲取的TimeOutException

(2) 修改ConnectionRequestTimeout

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)//設置為1000ms
.build();
1
2
3
4
5
上述兩次請求正常執行。

下面進一步看一下代碼中拋異常的地方:

 

 

從上面的代碼中可以看到如果要設置永不ConnectionRequestTimeout,只需要將ConnectionRequestTimeout設置為小於0即可,當然后這種設置一定要慎用, 如果處理不當,請求堆積會導致OOM。

4.2 連接池大小的設置


ConnTotal:連接池中最大連接數;
ConnPerRoute(1000):分配給同一個route(路由)最大的並發連接數,route為運行環境機器到目標機器的一條線路,舉例來說,我們使用HttpClient的實現來分別請求 www.baidu.com 的資源和 www.bing.com 的資源那么他就會產生兩個route;

對於上述的實驗,在一定程度上可以通過增大最大連接數來解決ConnectionRequestTimeout的問題!

后續:本文重點在於使用,后續會對源碼進行分析與解讀

HttpAsyncClient 的簡單使用

下載地址:http://hc.apache.org/downloads.cgi

在NetBeans中導入以下jar文件:

1:一次請求:

復制代碼
 public static void oneReuest(){
        final CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        httpClient.start();
        final HttpGet request = new HttpGet("http://www.apache.org/");
        final Future future = httpClient.execute(request, null);
        try {
            HttpResponse response = (HttpResponse) future.get();
            System.out.println("Response:" + response.getStatusLine());
            System.out.println("Shutting down");
        } catch (Exception ex) {
            Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
        }finally{
            try {
                httpClient.close();
            } catch (IOException ex) {
                Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        
        System.out.println("執行完畢");
    }
復制代碼

2:多次異步請求:

復制代碼
 public static void moreRequest(){
        final RequestConfig requestConfitg = RequestConfig.custom()
                .setSocketTimeout(3000)
                .setConnectTimeout(3000).build();
        
        final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
                .setDefaultRequestConfig(requestConfitg)
                .build();
        
        httpClient.start();
        
        final HttpGet[] requests = new HttpGet[]{
            new HttpGet("http://www.apache.org/"),
            new HttpGet("http://www.baidu.com/"),
            new HttpGet("http://www.oschina.net/")
        };
        
        final CountDownLatch latch = new CountDownLatch(requests.length);
        for(final HttpGet request: requests){
            
                httpClient.execute(request, new FutureCallback(){
                    @Override
                    public void completed(Object obj) {
                       final HttpResponse response = (HttpResponse)obj;
                       latch.countDown();
                       System.out.println(request.getRequestLine() + "->" + response.getStatusLine());
                    }

                    @Override
                    public void failed(Exception excptn) {
                        latch.countDown();
                        System.out.println(request.getRequestLine() + "->" + excptn);
                    }

                    @Override
                    public void cancelled() {
                        latch.countDown();
                        System.out.println(request.getRequestLine() + "cancelled");
                    }
                });
         }       
        
        try {
            latch.await();
            System.out.println("Shutting Down");
        } catch (InterruptedException ex) {
            Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
        }finally{
            try {
                httpClient.close();
            } catch (IOException ex) {
                Logger.getLogger(Httpasyncclient.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        System.out.println("Finish!");
    }
復制代碼

運行結果:

復制代碼
run:
GET http://www.baidu.com/ HTTP/1.1->HTTP/1.1 200 OK
GET http://www.oschina.net/ HTTP/1.1->HTTP/1.1 200 OK
GET http://www.apache.org/ HTTP/1.1->HTTP/1.1 200 OK
Shutting Down
Finish!
成功構建 (總時間: 2 秒)

可以看出是異步執行的!不是按照我們傳入的URL參數順序執行的!
復制代碼

 

篇提到了高性能處理的關鍵是異步,而我們當中許多人依舊在使用同步模式的HttpClient訪問第三方Web資源,我認為原因之一是:異步的HttpClient誕生較晚,許多人不知道;另外也可能是大多數Web程序其實不在意這點性能損失了。

而要自己實現一個異步的HttpClient則比較困難,通常都是自己開一個新的工作線程,利用HttpClient的同步去訪問,完成后再回調這種形式,這樣做其實不是真正的異步,因為依舊會有一個線程處於阻塞中,等待着第三方Web資源的返回。

而如今訪問第三方Web資源的情景越來越多,最典型就是使用第三方登錄平台,如QQ或微信等,我們需要訪問騰訊的服務器去驗證登錄者的身份,根據我的經驗,這個過程可能會阻塞好幾秒鍾,可看作是一個“長時間調用”,所以最好要使用異步方式。

OK,廢話少說,要使用異步的HttpClient,請Maven中帶上:

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.1</version>
        </dependency>

接下來是一個完整的Demo代碼:

復制代碼
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] argv) {
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
        httpclient.start();

        final CountDownLatch latch = new CountDownLatch(1);
        final HttpGet request = new HttpGet("https://www.alipay.com/");

        System.out.println(" caller thread id is : " + Thread.currentThread().getId());

        httpclient.execute(request, new FutureCallback<HttpResponse>() {

            public void completed(final HttpResponse response) {
                latch.countDown();
                System.out.println(" callback thread id is : " + Thread.currentThread().getId());
                System.out.println(request.getRequestLine() + "->" + response.getStatusLine());
                try {
                    String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                    System.out.println(" response content is : " + content);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            public void failed(final Exception ex) {
                latch.countDown();
                System.out.println(request.getRequestLine() + "->" + ex);
                System.out.println(" callback thread id is : " + Thread.currentThread().getId());
            }

            public void cancelled() {
                latch.countDown();
                System.out.println(request.getRequestLine() + " cancelled");
                System.out.println(" callback thread id is : " + Thread.currentThread().getId());
            }

        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            httpclient.close();
        } catch (IOException ignore) {

        }
    }
}
復制代碼

 呃……代碼很簡單,好像也沒什么好說的了,稍作封裝就可以實現如“getJson()”這樣的方法。

也許你還注意到了,這個HttpClient跟同步的版本一樣,直接支持https,但如果網站的證書是自簽的,默認還是不行的,解決方法當然有,但代碼有些麻煩,我覺得還不如直接買張證書來得簡單,如果網站是你管的話。

參考:Java的異步HttpClient

參考:HttpAsyncClient 的簡單使用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM