曹工雜談:花了兩天時間,寫了一個netty實現的http客戶端,支持同步轉異步和連接池(1)--核心邏輯講解


背景

先說下寫這個的目的,其實是好奇,dubbo是怎么實現同步轉異步的,然后了解到,其依賴了請求中攜帶的請求id來完成這個連接復用;然后我又發現,redisson這個redis客戶端,底層也是用的netty,那就比較好奇了:netty是異步的,上層是同步的,要拿結果的,同時呢,redis協議也不可能按照redisson的要求,在請求和響應里攜帶請求id,那,它是怎么實現同步轉異步的呢,異步結果回來后,又是怎么把結果對應上的呢?

對redisson debug調試了long long time之后(你們知道的,多線程不好調試),大概理清了思路,基本就是:連接池 的思路。比如,我要訪問redis:

  1. 我會先去連接池里拿一個連接(其實是一個netty的socketChannel),然后用這個連接,去發起請求。
  2. 上層新建一個promise(可寫的future,熟悉completablefuture的可以秒懂,不熟悉的話,可以理解為一個阻塞隊列,你去取東西,取不到,阻塞;生產者往隊列放一個東西,你就不再阻塞了,且拿到了東西),把發送請求的任務交給下層的netty channel后,將promise設置為netty channel的一個attribute,然后在這個promise上阻塞等待
  3. 下層的netty channel向redis 服務器發起請求
  4. netty接收到redis 服務器的響應后,從channel中取到第二步設置的attribute,獲取到promise,此時,相當於拿到了鎖,然后打開鎖,並把結果設置到promise中
  5. 主線程被第四步喚醒后,拿到結果並返回。

其實問題的關鍵是,第二步的promise傳遞,要設置為channel的一個attribute,不然的話,響應回來后,也不知道把響應給誰。

理清了redisson的基本思路后,我想到了很早之前,面試oppo,二面的面試官就問了我一個問題:寫過類似代理的中間件沒有?(因為當時面試的是中間件部門)

然后我說沒有,然后基本就涼了。

其實,中間件最主要的要求,尤其是代理這種,一方面接收請求,一方面還得作為客戶端去發起請求,發起請求這一步,很容易變成性能瓶頸,不少實現里,這一步都是直接使用http client這類同步請求的工具(也是支持異步的,只是同步更常見),所以我也一直想寫一個netty這種異步的客戶端,同時還能同步轉異步的,不能同步轉異步,應用場景就比較受限了。

實現思路

源碼給懶得看文字的同學:

https://gitee.com/ckl111/pooled-netty-http-client.git

扯了這么多,我說下我這個http client的思路,和上面那個redisson的差不多,我這邊的場景也是作為一個中間件,要訪問的后端服務就幾個,比如要訪問http://192.168.19.102:8080下的若干服務,我這邊是啟動時候,就會去建一個連接池(直接配置commons pool2的池化參數,我這里配置的是,2個連接),連接池好了后,netty 的channel已經是ok的了,如下所示:

這每一個長連接,是包在我們的一個核心的數據結構里的,叫NettyClient。

核心的屬性,其實主要下面兩個:

//要連接的host和端口
private HostAndPortConfig config;

/**
 * 當前使用的channel
 */
Channel channel;

NettyClient的初始化

構造函數

構造函數如下:

public NettyClient(HostAndPortConfig config) {
    this.config = config;

}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class HostAndPortConfig {

    private String host;

    private Integer port;

}

夠簡單吧,先不考慮連接池,最開始測試的時候,我就是這樣,直接new對象的。

public static void main(String[] args) {
        HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080);
        NettyClient client = new NettyClient(config);
        client.initConnection();
        NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do",
                JSONObject.toJSONString(new Object()));
        if (response == null) {
            return;
        }

        System.out.println(response.getBody());
    }

初始化連接

上面的測試代碼,new完對象后,開始初始化連接。


    public void initConnection() {
        log.info("initConnection starts...");

        Bootstrap bootstrap;
        //1.創建netty所需的bootstrap配置
        bootstrap = createBootstrap(config);
        //2.發起連接
        ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort());
        log.info("current thread:{}", Thread.currentThread().getName());
        //3.等待連接成功
        boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS);

        boolean bIsSuccess = ret && future.isSuccess();
        if (!bIsSuccess) {
            //4.不成功拋異常
            bIsConnectionOk = false;
            log.error("host config:{}",config);
            throw new RuntimeException("連接失敗");
        }
		//5.走到這里,說明成功了,新的channle賦值給field
        cleanOldChannelAndCancelReconnect(future, channel);

        bIsConnectionOk = true;
    }

這里初始化連接是直接同步等待的,如果失敗,直接拋異常。第5步里,主要是把新的channel賦值給當前對象的一個field,同時,關閉舊的channle之類的。

private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) {
    /**
     * 連接成功,關閉舊的channel,再用新的channel賦值給field
     */
    try {
        if (oldChannel != null) {
            try {
                log.info("Close old netty channel " + oldChannel);
                oldChannel.close();
            } catch (Exception e) {
                log.error("e:{}", e);
            }
        }
    } finally {
        /**
         * 新channel覆蓋field
         */
        NettyClient.this.channel = future.channel();
        NettyClient.this.bIsConnectionOk = true;
        log.info("connection is ok,new channel:{}", NettyClient.this.channel);
        if (NettyClient.this.scheduledFuture != null) {
            log.info("cancel scheduledFuture");
            NettyClient.this.scheduledFuture.cancel(true);
        }
    }
}

netty client中,涉及的出站handler

這里說下前面的bootstrap的構造,如下:

private Bootstrap createBootstrap(HostAndPortConfig config) {
    Bootstrap bootstrap = new Bootstrap()
            .channel(NioSocketChannel.class)
            .group(NIO_EVENT_LOOP_GROUP);

    bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this));
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    return bootstrap;
}

handler 鏈,主要在CustomChannelInitializer類中。

protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    // http客戶端編解碼器,包括了客戶端http請求編碼,http響應的解碼
    pipeline.addLast(new HttpClientCodec());

    // 把多個HTTP請求中的數據組裝成一個
    pipeline.addLast(new HttpObjectAggregator(65536));

    // 用於處理大數據流
    pipeline.addLast(new ChunkedWriteHandler());

    /**
     * 重連handler
     */
    pipeline.addLast(new ReconnectHandler(nettyClient));

    /**
     * 發送業務數據前,進行json編碼
     */
    pipeline.addLast(new HttpJsonRequestEncoder());


    pipeline.addLast(new HttpResponseHandler());

}

其中,出站時(即客戶端向外部write時),涉及的handler如下:

  1. HttpJsonRequestEncoder,把業務對象,轉變為httpRequest
  2. HttpClientCodec,把第一步傳給我們的httpRequest,編碼為bytebuf,交給channel發送

簡單說下HttpJsonRequestEncoder,這個是我自定義的:

/**
 * http請求發送前,使用該編碼器進行編碼
 *
 * 本來是打算在這里編碼body為json,感覺沒必要,直接上移到工具類了
 */
public class HttpJsonRequestEncoder extends
        MessageToMessageEncoder<NettyHttpRequest> {

    final static String CHARSET_NAME = "UTF-8";

    final static Charset UTF_8 = Charset.forName(CHARSET_NAME);


    @Override
    protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest,
                          List<Object> out) {
        // 1. 這個就是要最終傳遞出去的對象
        FullHttpRequest request = null;
        if (nettyHttpRequest.getHttpMethod() == HttpMethod.POST) {
            ByteBuf encodeBuf = Unpooled.copiedBuffer((CharSequence) nettyHttpRequest.getBody(), UTF_8);
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                    HttpMethod.POST, nettyHttpRequest.getUri(), encodeBuf);

            HttpUtil.setContentLength(request, encodeBuf.readableBytes());
        } else if (nettyHttpRequest.getHttpMethod() == HttpMethod.GET) {
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                    HttpMethod.GET, nettyHttpRequest.getUri());
        } else {
            throw new RuntimeException();
        }
	    //2. 填充header
        populateHeaders(ctx, request);

        out.add(request);
    }
    
    private void populateHeaders(ChannelHandlerContext ctx, FullHttpRequest request) {
        /**
         * headers 設置
         */
        HttpHeaders headers = request.headers();
        headers.set(HttpHeaderNames.HOST, ctx.channel().remoteAddress().toString().substring(1));
        headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

        headers.set(HttpHeaderNames.CONTENT_TYPE,
                "application/json");
        /**
         * 設置我方可以接收的
         */
        headers.set(HttpHeaderNames.ACCEPT_ENCODING,
                HttpHeaderValues.GZIP.toString() + ','
                        + HttpHeaderValues.DEFLATE.toString());

        headers.set(HttpHeaderNames.ACCEPT_CHARSET,
                "utf-8,ISO-8859-1;q=0.7,*;q=0.7");
        headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7");
        headers.set(HttpHeaderNames.ACCEPT, "*/*");
        /**
         * 設置agent
         */
        headers.set(HttpHeaderNames.USER_AGENT,
                "Netty xml Http Client side");
    }
 }

netty client涉及的入站handler

  1. HttpClientCodec和HttpObjectAggregator,主要是將bytebuf,轉變為io.netty.handler.codec.http.FullHttpResponse 類型的對象
  2. HttpResponseHandler,我們的業務handler
/**
 * http請求響應的處理器
 */
@Slf4j
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
        String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8);

        NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s);
        // 1.
        NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get();

        log.info("req url:{},params:{},resp:{}",
                nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(),
                nettyHttpRequestContext.getNettyHttpRequest().getBody(),
                nettyHttpResponse);
        // 2.
        Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise();
        promise.setSuccess(nettyHttpResponse);
    }
}
  1. 1處代碼,主要從channel中,根據key,獲取當前的請求相關信息
  2. 2處代碼,從當前請求中,拿到promise,設置結果,此時,會喚醒主線程。

netty client 發起http post調用

說完了netty client,我們再說說調用的過程:

public NettyHttpResponse doPost(String url, Object body) {
    NettyHttpRequest request = new NettyHttpRequest(url, body);
    return doHttpRequest(request);
}

    private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP =  new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify"));


private NettyHttpResponse doHttpRequest(NettyHttpRequest request) {
    // 1 
    Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise();
    // 2
    NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise);
    channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context);
    // 3
    ChannelFuture channelFuture = channel.writeAndFlush(request);
    channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
        @Override
        public void operationComplete(Future<? super Void> future) throws Exception {
            System.out.println(Thread.currentThread().getName() + " 請求發送完成");
        }
    });
	// 4 
    return get(defaultPromise);
}

上面我已經標注了幾個數字,分別講一下:

  1. 新建一個promise,可以理解為一把可以我們手動完成的鎖(一般主線程在這個鎖上等待,在另一個線程去完成)
  2. 把鎖和其他請求信息,一起放到channle里
  3. 使用channle發送數據
  4. 同步等待

第四步等待的get方法如下:

public <V> V get(Promise<V> future) {
    // 1.
    if (!future.isDone()) {
        CountDownLatch l = new CountDownLatch(1);
        future.addListener(new GenericFutureListener<Future<? super V>>() {
            @Override
            public void operationComplete(Future<? super V> future) throws Exception {
                log.info("received response,listener is invoked");
                if (future.isDone()) {
                    // 2
                    // promise的線程池,會回調該listener
                    l.countDown();
                }
            }
        });

        boolean interrupted = false;
        if (!future.isDone()) {
            try {
                // 3
                l.await(4, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.error("e:{}", e);
                interrupted = true;
            }

        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
	//4 
    if (future.isSuccess()) {
        return future.getNow();
    }
    log.error("wait result time out ");
    return null;
}
  1. 如果promise的狀態還是沒有完成,則我們new了一個閉鎖
  2. 加了一個listner在promise上面,別人操作這個promise,這個listener會被回調,回調邏輯:將閉鎖打開
  3. 主線程,在閉鎖上等待
  4. 主線程,走到這里,說明已經等待超時,或者已經完成,可以獲取結果並返回

什么地方會修改promise

前面我們提到了,在response的handler中:

/**
 * http請求響應的處理器
 */
@Slf4j
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
        String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8);

        NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s);
        // 1.
        NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get();

        log.info("req url:{},params:{},resp:{}",
                nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(),
                nettyHttpRequestContext.getNettyHttpRequest().getBody(),
                nettyHttpResponse);
        // 2.
        Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise();
        promise.setSuccess(nettyHttpResponse);
    }
}

其中,2處,修改promise,此時就會回調前面說的那個listenr,打開閉鎖,主線程也因此得以繼續執行:

public <V> V get(Promise<V> future) {
    if (!future.isDone()) {
        CountDownLatch l = new CountDownLatch(1);
        future.addListener(new GenericFutureListener<Future<? super V>>() {
            @Override
            public void operationComplete(Future<? super V> future) throws Exception {
                log.info("received response,listener is invoked");
                if (future.isDone()) {
                    // io線程會回調該listener
                    l.countDown();
                }
            }
        });
        .....
    }

總結

本篇的大致思路差不多就是這樣了,主要邏輯在於同步轉異步那一塊。

還有些沒講到的,后面再講,大概還有2個部分。

  1. 斷線重連
  2. commons pool實現連接池。

代碼我放在:

https://gitee.com/ckl111/pooled-netty-http-client.git


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM