背景
先說下寫這個的目的,其實是好奇,dubbo是怎么實現同步轉異步的,然后了解到,其依賴了請求中攜帶的請求id來完成這個連接復用;然后我又發現,redisson這個redis客戶端,底層也是用的netty,那就比較好奇了:netty是異步的,上層是同步的,要拿結果的,同時呢,redis協議也不可能按照redisson的要求,在請求和響應里攜帶請求id,那,它是怎么實現同步轉異步的呢,異步結果回來后,又是怎么把結果對應上的呢?
對redisson debug調試了long long time之后(你們知道的,多線程不好調試),大概理清了思路,基本就是:連接池 的思路。比如,我要訪問redis:
- 我會先去連接池里拿一個連接(其實是一個netty的socketChannel),然后用這個連接,去發起請求。
- 上層新建一個promise(可寫的future,熟悉completablefuture的可以秒懂,不熟悉的話,可以理解為一個阻塞隊列,你去取東西,取不到,阻塞;生產者往隊列放一個東西,你就不再阻塞了,且拿到了東西),把發送請求的任務交給下層的netty channel后,將promise設置為netty channel的一個attribute,然后在這個promise上阻塞等待
- 下層的netty channel向redis 服務器發起請求
- netty接收到redis 服務器的響應后,從channel中取到第二步設置的attribute,獲取到promise,此時,相當於拿到了鎖,然后打開鎖,並把結果設置到promise中
- 主線程被第四步喚醒后,拿到結果並返回。
其實問題的關鍵是,第二步的promise傳遞,要設置為channel的一個attribute,不然的話,響應回來后,也不知道把響應給誰。
理清了redisson的基本思路后,我想到了很早之前,面試oppo,二面的面試官就問了我一個問題:寫過類似代理的中間件沒有?(因為當時面試的是中間件部門)
然后我說沒有,然后基本就涼了。
其實,中間件最主要的要求,尤其是代理這種,一方面接收請求,一方面還得作為客戶端去發起請求,發起請求這一步,很容易變成性能瓶頸,不少實現里,這一步都是直接使用http client這類同步請求的工具(也是支持異步的,只是同步更常見),所以我也一直想寫一個netty這種異步的客戶端,同時還能同步轉異步的,不能同步轉異步,應用場景就比較受限了。
實現思路
源碼給懶得看文字的同學:
https://gitee.com/ckl111/pooled-netty-http-client.git
扯了這么多,我說下我這個http client的思路,和上面那個redisson的差不多,我這邊的場景也是作為一個中間件,要訪問的后端服務就幾個,比如要訪問http://192.168.19.102:8080下的若干服務,我這邊是啟動時候,就會去建一個連接池(直接配置commons pool2的池化參數,我這里配置的是,2個連接),連接池好了后,netty 的channel已經是ok的了,如下所示:
這每一個長連接,是包在我們的一個核心的數據結構里的,叫NettyClient。
核心的屬性,其實主要下面兩個:
//要連接的host和端口
private HostAndPortConfig config;
/**
* 當前使用的channel
*/
Channel channel;
NettyClient的初始化
構造函數
構造函數如下:
public NettyClient(HostAndPortConfig config) {
this.config = config;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HostAndPortConfig {
private String host;
private Integer port;
}
夠簡單吧,先不考慮連接池,最開始測試的時候,我就是這樣,直接new對象的。
public static void main(String[] args) {
HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080);
NettyClient client = new NettyClient(config);
client.initConnection();
NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do",
JSONObject.toJSONString(new Object()));
if (response == null) {
return;
}
System.out.println(response.getBody());
}
初始化連接
上面的測試代碼,new完對象后,開始初始化連接。
public void initConnection() {
log.info("initConnection starts...");
Bootstrap bootstrap;
//1.創建netty所需的bootstrap配置
bootstrap = createBootstrap(config);
//2.發起連接
ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort());
log.info("current thread:{}", Thread.currentThread().getName());
//3.等待連接成功
boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS);
boolean bIsSuccess = ret && future.isSuccess();
if (!bIsSuccess) {
//4.不成功拋異常
bIsConnectionOk = false;
log.error("host config:{}",config);
throw new RuntimeException("連接失敗");
}
//5.走到這里,說明成功了,新的channle賦值給field
cleanOldChannelAndCancelReconnect(future, channel);
bIsConnectionOk = true;
}
這里初始化連接是直接同步等待的,如果失敗,直接拋異常。第5步里,主要是把新的channel賦值給當前對象的一個field,同時,關閉舊的channle之類的。
private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) {
/**
* 連接成功,關閉舊的channel,再用新的channel賦值給field
*/
try {
if (oldChannel != null) {
try {
log.info("Close old netty channel " + oldChannel);
oldChannel.close();
} catch (Exception e) {
log.error("e:{}", e);
}
}
} finally {
/**
* 新channel覆蓋field
*/
NettyClient.this.channel = future.channel();
NettyClient.this.bIsConnectionOk = true;
log.info("connection is ok,new channel:{}", NettyClient.this.channel);
if (NettyClient.this.scheduledFuture != null) {
log.info("cancel scheduledFuture");
NettyClient.this.scheduledFuture.cancel(true);
}
}
}
netty client中,涉及的出站handler
這里說下前面的bootstrap的構造,如下:
private Bootstrap createBootstrap(HostAndPortConfig config) {
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(NIO_EVENT_LOOP_GROUP);
bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
handler 鏈,主要在CustomChannelInitializer類中。
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// http客戶端編解碼器,包括了客戶端http請求編碼,http響應的解碼
pipeline.addLast(new HttpClientCodec());
// 把多個HTTP請求中的數據組裝成一個
pipeline.addLast(new HttpObjectAggregator(65536));
// 用於處理大數據流
pipeline.addLast(new ChunkedWriteHandler());
/**
* 重連handler
*/
pipeline.addLast(new ReconnectHandler(nettyClient));
/**
* 發送業務數據前,進行json編碼
*/
pipeline.addLast(new HttpJsonRequestEncoder());
pipeline.addLast(new HttpResponseHandler());
}
其中,出站時(即客戶端向外部write時),涉及的handler如下:
- HttpJsonRequestEncoder,把業務對象,轉變為httpRequest
- HttpClientCodec,把第一步傳給我們的httpRequest,編碼為bytebuf,交給channel發送
簡單說下HttpJsonRequestEncoder,這個是我自定義的:
/**
* http請求發送前,使用該編碼器進行編碼
*
* 本來是打算在這里編碼body為json,感覺沒必要,直接上移到工具類了
*/
public class HttpJsonRequestEncoder extends
MessageToMessageEncoder<NettyHttpRequest> {
final static String CHARSET_NAME = "UTF-8";
final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
@Override
protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest,
List<Object> out) {
// 1. 這個就是要最終傳遞出去的對象
FullHttpRequest request = null;
if (nettyHttpRequest.getHttpMethod() == HttpMethod.POST) {
ByteBuf encodeBuf = Unpooled.copiedBuffer((CharSequence) nettyHttpRequest.getBody(), UTF_8);
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, nettyHttpRequest.getUri(), encodeBuf);
HttpUtil.setContentLength(request, encodeBuf.readableBytes());
} else if (nettyHttpRequest.getHttpMethod() == HttpMethod.GET) {
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, nettyHttpRequest.getUri());
} else {
throw new RuntimeException();
}
//2. 填充header
populateHeaders(ctx, request);
out.add(request);
}
private void populateHeaders(ChannelHandlerContext ctx, FullHttpRequest request) {
/**
* headers 設置
*/
HttpHeaders headers = request.headers();
headers.set(HttpHeaderNames.HOST, ctx.channel().remoteAddress().toString().substring(1));
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
headers.set(HttpHeaderNames.CONTENT_TYPE,
"application/json");
/**
* 設置我方可以接收的
*/
headers.set(HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + ','
+ HttpHeaderValues.DEFLATE.toString());
headers.set(HttpHeaderNames.ACCEPT_CHARSET,
"utf-8,ISO-8859-1;q=0.7,*;q=0.7");
headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7");
headers.set(HttpHeaderNames.ACCEPT, "*/*");
/**
* 設置agent
*/
headers.set(HttpHeaderNames.USER_AGENT,
"Netty xml Http Client side");
}
}
netty client涉及的入站handler
- HttpClientCodec和HttpObjectAggregator,主要是將bytebuf,轉變為io.netty.handler.codec.http.FullHttpResponse 類型的對象
- HttpResponseHandler,我們的業務handler
/**
* http請求響應的處理器
*/
@Slf4j
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s);
// 1.
NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get();
log.info("req url:{},params:{},resp:{}",
nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(),
nettyHttpRequestContext.getNettyHttpRequest().getBody(),
nettyHttpResponse);
// 2.
Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise();
promise.setSuccess(nettyHttpResponse);
}
}
- 1處代碼,主要從channel中,根據key,獲取當前的請求相關信息
- 2處代碼,從當前請求中,拿到promise,設置結果,此時,會喚醒主線程。
netty client 發起http post調用
說完了netty client,我們再說說調用的過程:
public NettyHttpResponse doPost(String url, Object body) {
NettyHttpRequest request = new NettyHttpRequest(url, body);
return doHttpRequest(request);
}
private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP = new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify"));
private NettyHttpResponse doHttpRequest(NettyHttpRequest request) {
// 1
Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise();
// 2
NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise);
channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context);
// 3
ChannelFuture channelFuture = channel.writeAndFlush(request);
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 請求發送完成");
}
});
// 4
return get(defaultPromise);
}
上面我已經標注了幾個數字,分別講一下:
- 新建一個promise,可以理解為一把可以我們手動完成的鎖(一般主線程在這個鎖上等待,在另一個線程去完成)
- 把鎖和其他請求信息,一起放到channle里
- 使用channle發送數據
- 同步等待
第四步等待的get方法如下:
public <V> V get(Promise<V> future) {
// 1.
if (!future.isDone()) {
CountDownLatch l = new CountDownLatch(1);
future.addListener(new GenericFutureListener<Future<? super V>>() {
@Override
public void operationComplete(Future<? super V> future) throws Exception {
log.info("received response,listener is invoked");
if (future.isDone()) {
// 2
// promise的線程池,會回調該listener
l.countDown();
}
}
});
boolean interrupted = false;
if (!future.isDone()) {
try {
// 3
l.await(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("e:{}", e);
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
//4
if (future.isSuccess()) {
return future.getNow();
}
log.error("wait result time out ");
return null;
}
- 如果promise的狀態還是沒有完成,則我們new了一個閉鎖
- 加了一個listner在promise上面,別人操作這個promise,這個listener會被回調,回調邏輯:將閉鎖打開
- 主線程,在閉鎖上等待
- 主線程,走到這里,說明已經等待超時,或者已經完成,可以獲取結果並返回
什么地方會修改promise
前面我們提到了,在response的handler中:
/**
* http請求響應的處理器
*/
@Slf4j
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s);
// 1.
NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get();
log.info("req url:{},params:{},resp:{}",
nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(),
nettyHttpRequestContext.getNettyHttpRequest().getBody(),
nettyHttpResponse);
// 2.
Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise();
promise.setSuccess(nettyHttpResponse);
}
}
其中,2處,修改promise,此時就會回調前面說的那個listenr,打開閉鎖,主線程也因此得以繼續執行:
public <V> V get(Promise<V> future) {
if (!future.isDone()) {
CountDownLatch l = new CountDownLatch(1);
future.addListener(new GenericFutureListener<Future<? super V>>() {
@Override
public void operationComplete(Future<? super V> future) throws Exception {
log.info("received response,listener is invoked");
if (future.isDone()) {
// io線程會回調該listener
l.countDown();
}
}
});
.....
}
總結
本篇的大致思路差不多就是這樣了,主要邏輯在於同步轉異步那一塊。
還有些沒講到的,后面再講,大概還有2個部分。
- 斷線重連
- commons pool實現連接池。
代碼我放在: