Netty http client 編寫總結


Apache http client 有兩個問題,第一個是 apache http client 是阻塞式的讀取 Http request, 異步讀寫網絡數據性能更好些。第二個是當 client 到 server 的連接中斷時,http client 無法感知到這件事的發生,需要開發者主動的輪訓校驗,發 keep alive 或者 heart beat 消息,而 netty 可以設置回調函數,確保網絡連接中斷時有邏輯來 handle

使用 Netty 編寫 Http client,也有一些問題。首先是 netty 是事件驅動的,邏輯主要基於回調函數。數據包到來了也好,網絡連接中斷也好,都要通過寫回調函數確定這些事件來臨后的后續操作。沒有人喜歡回調函數,Future 是 scala 里討人喜歡的特性,它能把常規於語言里通過回調才能解決的問題通過主動調用的方式來解決,配合 map, flatmap, for 甚至 async,scala 里可以做到完全看不到回調函數。所以用 netty 做 client 第一個問題是如何把 回調函數搞成主動調用的函數。第二點是 長連接,一個 channel 不能發了一個消息就關閉了,每次發消息都要經過 http 三次握手四次揮手效率太低了,最好能重用 channel。第三個是 thread-safe,這個一開始並沒有考慮到,后來發現這個是最難解決的問題。當然 netty 作為一個比較底層的包,用它來實現一些高層的接口是比較費時費力的,有很多事情都要手動去做。我花了四五天的時間,沒有解決這幾個問題,只留下一些經驗,供以后參考(見后面的 update)。

回調函數變主動調用函數

netty 的操作都是基於回調函數的
消息到達時的邏輯

    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;

            if (content instanceof HttpContent) {
                sendFullResponse(ctx, content);
            } else {
                log.error("content is not http content");
            }
        }
    }

到 server 的連接建立后創建 channel 的邏輯

        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new HttpClientCodec());
                p.addLast(new HttpContentDecompressor());
                p.addLast(new HttpObjectAggregator(512 * 1024));
                p.addLast(new ResponseHandler());
            }
        });

這是我就希望有一個像 scala Future/Promise 一樣的東西,幫我把回調函數轉成主動調用函數,這是 scala 的一個例子

	Promise promise = Promise[HttpContent]
	def channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
		HttpContent content = (HttpContent) msg
		promise.success(content)
	}
	
	//somewhere else
	promise.future.map(content => println("content has been recieved in client"))

可以說有了 promise,我們接收到 httpContent 以后的事情就都能用主動調用的方式來寫了,雖然不完全像普通的 java 代碼那樣簡單,需要加一些組合子,但是已經夠好了。

Java 里沒有 promise,需要自己實現,參考了別人的代碼,發現 CountDownLatch 是實現 promise 的關鍵。setComplete 和 await 是最重要的兩個函數,一個設置 CountDownLatch,一個等待 CountDownLatch。

    private boolean setComplete(ResultHolder holder) {
        log.info("set complete");

        if (isDone()) {
            return false;
        }

        synchronized (this) {
            if (isDone()) {
                return false;
            }

            this.result = holder;
            if (this.complteLatch != null) {

                log.info("set complete time: " + System.currentTimeMillis());
                this.complteLatch.countDown();
            } else {
                log.info("completeLatch is null at the time: " + System.currentTimeMillis());
            }
        }
        return true;
    }
    
    
    public TaskFuture await() throws InterruptedException {
    	if (isDone()) {
            return this;
        }

        synchronized (this) {
            if (isDone()) {
                return this;
            }

            if (this.complteLatch == null) {
                log.info("await time: " + System.currentTimeMillis());
                this.complteLatch = new CountDownLatch(1);
            }
        }

        this.complteLatch.await();
        return this;
    }

有了 Promise 以后就能把回調函數轉為主動調用的函數了。雖然沒有組合子,但是已經夠好了,起碼 await 函數能夠保證開發者拿到 HttpContent 后能夠像正常的 java 代碼一樣操縱這個值。

public TaskPromise executeInternal(HttpRequest httpRequest)

重用 channel

根據上面那一節,得到了這個函數

    public TaskPromise executeInternal(HttpRequest httpRequest) {
        final TaskPromise promise = new DefaultTaskPromise();

        log.info("new created promise hashcode is " + promise.hashCode());

        Channel channel = channelFuture.channel();
        channel.pipeline().get(ResponseHandler.class).setResponseHandler(promise);

        channel.writeAndFlush(httpRequest).addListener((ChannelFutureListener) future -> {
            if(future.isSuccess()) {
                log.info("write success");
             }
        });

public class ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

    Logger log = LoggerFactory.getLogger(getClass());

    private TaskPromise promise;

    public void setResponseHandler(TaskPromise promise) {
        this.promise = promise;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
        log.info("channel read0 returned");
        promise.setSuccess(new NettyHttpResponse(ctx, msg));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
        log.info("exception caught in response handler");
        this.promise.setFailure(cause);
    }

}

每次調用 executeInternal 都創建一個 promise 將此 promise 放到 ResponseHandler 注冊一下,然后將 promise 句柄當做返回值。channel.pipeline().get(xxx).set(yyy) 是在 SO 找到的,看起來像個黑科技。這個函數看起來可以滿足需求了。
實際上不然,它不是線程安全的。當兩個線程同時調用 executeInternal 時,可能會同時 setResponseHandler,導致第一個 promise 被沖掉,然后兩個線程持有同一個 promise,一個 promise 只能被 setComplete 一次,第二次時會 exception。假如把 executeInernal 寫成同步的,線程安全問題仍在,因為只要是在一個請求返回來之前設置了 promise,第一個 promise 總是會被沖掉的。看起來這是一個解決不了的問題。

在 github 看了很多別人的代碼,發現大家都沒認真研究線程安全的問題,或者一個 channel 只發一個消息。查閱了一些資料,了解到InboundHandler 的執行是原子的,不用擔心線程安全問題,但這對我也沒什么幫助。找到 AsyncRestTemplate 的底層實現, Netty4ClientHttpRequest,我覺得它想做的事情跟我很像,但不過它好像是每個 channel 只發一個消息。因為每次發新的消息,Bootstrap 都會調用 connect 函數。

	@Override
	protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
		final SettableListenableFuture<ClientHttpResponse> responseFuture =
				new SettableListenableFuture<ClientHttpResponse>();

		ChannelFutureListener connectionListener = new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture future) throws Exception {
				if (future.isSuccess()) {
					Channel channel = future.channel();
					channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
					FullHttpRequest nettyRequest = createFullHttpRequest(headers);
					channel.writeAndFlush(nettyRequest);
				}
				else {
					responseFuture.setException(future.cause());
				}
			}
		};

		this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);

		return responseFuture;
	}

如果 bootstrap 能夠緩存住以前的連接,那么他就是我想要的東西了,但是我循環了 executeInternal 十次,發現建立了十個到 Server 的連接,也就說它並沒有重用 channel

update:

上一次寫總結時還卡在一個解決不了的並發問題上,當初的並發問題實際上可以寫成 how concurrent response mapping to request. 在 Stackoverflow 和中文論壇上有人討論過這個問題,從他們的討論中看的結論是:

在 Netty 里,channel 是 multiplex 的,但是返回的 Response 不會自動映射到發出的 Request 上,Netty 本身沒有這種能力,為了達到這個效果,需要在應用層做一些功夫。一般有兩種做法

  • 如果 Client, Server 都由開發者掌控,那么 client 和 server 可以在交互協議上添加 requestId field, request 和 response 都有 requestId 標識。client 端每發送一個 request 后,就在本地記錄 (requestId, Future[Response]) 這么一個 pair, 當 response 返回后,根據 requestId 找到對應的 future, 填充 future
  • 當 server 端不由開發者掌控時,channel 只能被動接受沒有狀態的 response,沒有其他信息可供 client 分辨它對應的是那個 request, 此時就只能使用 sync 模式發送消息了,這樣能夠保證 response 對應着的就是正在等待它的那個 request. 使用這種方法就失掉了並發的特性,但是可以創建一個 channel pool, 提供一定的並發性

對於有些不需要 response, request 對應關系的服務,channel 的寫法可以保持原始的回調函數,比如 heartbeat 服務就可以可以這么寫。

源碼鏈接https://github.com/sangszhou/NettyHttpClient

做了個簡單的 benchmark, 發現比 apache http client 慢了 2~3 倍,目前還不確定性能瓶頸的位置。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM