研究這個問題的起因
起因是一次面試,一次面試某電商網站,前面問到緩存,分布式,業務這些,還相談甚歡。然后面試官突然甩出一句:“了解dubbo嗎?dubbo是長連接還是短連接?”。當時我主要接觸了解學習的還是spring cloud,dubbo作為知名的分布式rpc框架,只是有一定了解,並且連接這一塊並沒有很深入去了解,但是基於對分布式系統的了解,我不假思索的回答了:“長連接啊!”。其實分布式系統接觸多了就知道了,分布式系統為了應對高並發,減少在高並發時的線程上下文切換損失以及重新建立連接的損失,往往都是采用長連接的。所以我當時我是這么想的:“dubbo作為處理小數據高並發分布式RPC框架,如果采用短連接,應該不可能達到那么高的吞吐吧。”。所以果斷回答了長連接。可是沒想到面試官微微一笑,帶着幾分不屑的說道:“短連接”。當時就給我整懵逼了,無法想象短連接如何處理高並發下重復建立連接以及線程上下文切換的問題。導致我回家的地鐵上一直都處在懷疑人生的狀態,回家后立馬各種百度Google(甚至還懷疑查到的結果)。
dubbo的連接機制
這里直接上結論了,dubbo默認是使用單一長連接,即消費者與每個服務提供者建立一個單一長連接,即如果有消費者soa-user1,soa-user2,提供者soa-account三台,則每台消費者user都會與3台account建立一個連接,結果是每台消費者user有3個長連接,每台提供者account維持6個長連接。
為什么這么做
dubbo這么設計的原因是,一般情況下因為消費者是在請求鏈路的上游,消費者承擔的連接數以及並發量都是最高的,他需要承擔更多其他的連接請求,而對提供者來說,承擔的連接只來於消費者,所以每台提供者只需要承接消費者數量的連接就可以了,dubbo面向的就是消費者數量遠大於服務提供者的情況。所以說,現在很多項目使用的都是消費者和提供者不分的情況,這種情況並沒有很好的利用這個機制。
dubbo同步轉異步
dubbo的底層是使用netty,netty之前介紹過是非阻塞的,但是dubbo調用我們大多數時候都是使用的同步調用,那么這里是怎么異步轉同步的呢?這里其實延伸下,不只是dubbo,大多數在web場景下,還是同步請求為主,那么netty中要如何將異步轉同步?我這邊描述一下關鍵步驟。
dubbo的實現
//DubboInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
//...
if (isOneway) {//2.異步沒返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//1.異步有返回值,異步的直接返回帶future的result就完事了
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {//3.異步變同步,這里是同步的返回,主要阻塞的原因在於.get(),實際上就是HeaderExchangeChannel里返回的DefaultFuture的.get()方法
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout)//返回下面的future
.get();//進入get()方法,是當前線程阻塞。那么當有結果返回時,喚醒這個線程
}
}
//--HeaderExchangeChannel
public ResponseFuture request(Object request, int timeout) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//這里在發送前,構建自定義的future,用來讓調用線程等待,注意這里的future和netty的channelFuture不同。
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);//使用實際的channel,里面封裝了netty的channel,最后是調用到了nettyChannel的send的。
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
//--DefaultFuture--實現阻塞調用線程的邏輯,接收到結果
//阻塞的邏輯
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = 1000;
}
//done是自身對象的一個可重入鎖成員變量的一個condition,這里的邏輯就是:
//如果獲取到了鎖,並且條件不滿足,則await線程等到下面receive方法喚醒。
//其實我想吐槽下,這個condition命名為done,又有一個方法叫isDone,但是isDone又是判斷response!=null的和done沒有任何關系,這個命名不是很科學。
if (!this.isDone()) {
long start = System.currentTimeMillis();
this.lock.lock();
try {
while(!this.isDone()) {
this.done.await((long)timeout, TimeUnit.MILLISECONDS);
if (this.isDone() || System.currentTimeMillis() - start > (long)timeout) {
break;
}
}
} catch (InterruptedException var8) {
throw new RuntimeException(var8);
} finally {
this.lock.unlock();
}
if (!this.isDone()) {
throw new TimeoutException(this.sent > 0L, this.channel, this.getTimeoutMessage(false));
}
}
return this.returnFromResponse();
}
//收到結果時喚醒的邏輯
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;//拿到了響應
if (done != null) {
done.signal();//喚醒線程
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
//那么我們知道DefaultFuture被調用received方法時會被喚醒,那么是什么時候被調用的呢?
//--HeaderExchangeHandler-- netty中處理的流就是handler流,之前有篇文章講到過,這里也是在handler中給處理的,其實上面還有ExchangeHandlerDispatcher這類dispatcher預處理,將返回
//分給具體的channelHandler處理,但是結果到了這里
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
HeaderExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
Request request = (Request)message;
if (request.isEvent()) {
this.handlerEvent(channel, request);
} else if (request.isTwoWay()) {
Response response = this.handleRequest(exchangeChannel, request);
channel.send(response);
} else {
this.handler.received(exchangeChannel, request.getData());
}
} else if (message instanceof Response) {
//主要是這里
handleResponse(channel, (Response)message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = this.handler.telnet(channel, (String)message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
this.handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//handleResponse,到這里直接調用靜態方法,回到了上面接受結果那步。
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
純netty的簡單實現
純netty的簡單實現,其實也很簡單,在創建handler時,構造時將外部的FutureTask對象構造到hanlder中,外面使用FutureTask對象get方法阻塞,handler中在最后有結果時,將FutureTask的結果set一下,外部就取消了阻塞。
public SettableTask<String> sendAndSync(FullHttpRequest httpContent){
//創建一個futureStask
final SettableTask<String> responseFuture = new SettableTask<>();
ChannelFutureListener connectionListener = future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
//創建一個listener,在連接后將新的futureTask構造到一個handler中
channel.pipeline().addLast(new SpidersResultHandler(responseFuture));
} else {
responseFuture.setExceptionResult(future.cause());
}
};
try {
Channel channel = channelPool.acquire().syncUninterruptibly().getNow();
log.info("channel status:{}",channel.isActive());
channel.writeAndFlush(httpContent).addListener(connectionListener);
} catch (Exception e) {
log.error("netty寫入異常!", e);
}
return responseFuture;
}
//重寫一個可以手動set的futureTask
public class SettableTask<T> extends FutureTask<T> {
public SettableTask() {
super(() -> {
throw new IllegalStateException("Should never be called");
});
}
public void setResultValue(T value) {
this.set(value);
}
public void setExceptionResult(Throwable exception) {
this.setException(exception);
}
@Override
protected void done() {
super.done();
}
}
//resultHandler
public class SpidersResultHandler extends SimpleChannelInboundHandler<String> {
private SettableTask<String> future;
public SpidersResultHandler(SettableTask<String> future){
this.future=future;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String httpContent) throws Exception {
log.info("result={}",httpContent);
future.setResultValue(httpContent);
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
log.error("{}異常", this.getClass().getName(), throwable);
}
}
總結
dubbo的高性能,也源於他對每個點不斷的優化,最早的時候我記得看到一篇文章寫到:dubbo的異步轉同步機制,是使用的CountDownLatch實現的。現在想來,可能是在亂說。一些框架的原理,還是要自己多思考多翻看,才能掌握。