准備
將producer模塊復制出來三份,分別修改以下內容
-
service.port
-
dubbo.protocol.port
-
HelloWordImpl.java中的輸出內容
public class HelloWordImpl implements HelloWord { public String helloWord(String s) { return "HelloWordImpl20881:"+s; // return "HelloWordImpl20882:"+s; // return "HelloWordImpl20883:"+s; } }
-
三個生產者全部啟動
version
消費者可通過指定版本來控制要RPC調用對應版本的生產者
-
修改生產者實現類注解
//指定對外暴漏的服務版本為20882 @DubboReference(version = "20882")
-
修改消費者@DubboReference
//指定請求版本為20882的生產者 @DubboReference(version = "20882")
protocol通訊協議
dubbo可對協議擴展,例如:Dubbo\Rest\Http\⾃定義
配置單個協議
protocol:
port: 20881 #配置當前這個服務在dubbo容器中的端⼝號,每個dubbo容器內部的服務的端⼝號唯一
name: dubbo
配置多個協議
protocols:
protocol_1:
id: protocol_1
name: rest
port: 8080
host: 127.0.0.1
protocol_2:
id: protocol_2
name: dubbo
port: 21881
host: 127.0.0.1
protocol_3:
id: protocol_3
name: http
port: 8181
host: 127.0.0.1
協議指定方式
//protocol不指定,yml聲明了幾個協議,就會對每個協議分別創建一份對應請求協議的服務
@DubboService(version = "20883",protocol = "配置中指定的id")
public class HelloWordImpl implements HelloWord {
public String helloWord(String s) {
return "HelloWordImpl20883:"+s;
}
}
選擇Rest協議
POM
添加GVA
<!-- https://mvnrepository.com/artifact/org.apache.dubbo/dubbo-rpc-rest -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-rest</artifactId>
<version>3.0.6</version>
</dependency>
YML
server:
port: 8080
dubbo:
application:
name: hello-word-service #服務名稱
registry:
address: zookeeper://192.168.0.104:2181,192.168.0.104:2182,192.168.0.104:2183,192.168.0.104:2184 #zk地址
protocol:
port: 6060 #rest暴漏端口
name: rest #選擇rest
修改生產者實現類
下面最終請求地址為 127.0.0.1:6066/rest/hello/helloWord?param=value
// path相當於在@Path前又加了一個前綴,可省略
@DubboService(protocol = "rest",path = "rest")
@Path("hello") //@RequestMapping
public class HelloWordImpl implements HelloWord {
@GET //發送get請求,還有put post等
@Path("helloWord") // 方法請求路徑 ,@GET和@Path = @GetMapping
@Produces({ContentType.APPLICATION_JSON_UTF_8}) //設置ContentType,可設置多個
public String helloWord(@QueryParam("param") String param) {
return "HelloWordImpl20881:"+param;
}
}
跳過注冊中心,直連請求
生產者配置
假設生產者通過下面配置,並且實現類只標注了@DubboService(version = "v1"),未加protocol=‘’屬性來指明到底用哪個通訊協議,則生產者啟動會開啟兩個協議通訊端口
protocols:
protocol_20881:
id: protocol_20881
name: dubbo
port: 20881
host: 0.0.0.0
protocol_20882:
id: protocol_20882
name: dubbo
port: 20882
host: 0.0.0.0
消費者直連方式
//添加版本和直連的url/端口/接口全包名
//輸入20882和20881都可調用成功,說明開啟了兩個通訊協議端口
@DubboReference(version = "v1",url ="dubbo://127.0.0.1:20882/myinterface.HelloWord")
private HelloWord helloWord;
@Test
void contextLoads() {
//執行代理方法
System.out.println(helloWord.helloWord("RB"));
}
超時配置
@DubboReference(version = "v1",timeout = 3000)
//消費者配置超過3秒就超時報錯
@DubboService(version = "v1",timeout = 2000)
//生產者配置超過2秒就報錯
-
都不配置默認超時時間1秒
-
如果消費者或生產者只配置一方,則超時時間消費者和生產者共享配置
-
如果消費者配置3秒過期,生產者配置2秒過期
-
當方法執行超過3秒后,消費者報錯
-
當方法執行超過2秒后,生產者會打印WARN日志,但方法還會執行完,不會終止
[04/04/22 14:47:39:086 CST] NettyServerWorker-3-1 WARN transport.AbstractServer: [DUBBO] All clients has disconnected from /192.168.0.105:20882. You can graceful shutdown now., dubbo version: 3.0.6, current host: 192.168.0.105
-
集群容錯(高可用)機制
dubbo為集群調⽤提供了容錯⽅案:
-
failover(默認):
當出現失敗時,會進⾏重試,默認重試2次,⼀共三次調⽤。但是會出現冪等性問題。 雖然會出現冪等性問題,但是依然推薦使⽤這種容錯機制,在業務層⾯解決冪等性問題:
-
⽅案⼀:把數據的業務id作為數據庫的聯合主鍵,此時業務id不能重復。
-
⽅案⼆(推薦):使⽤分布式鎖來解決重復消費問題。
-
-
failfast:當出現失敗時。⽴即報錯,不進⾏重試。
-
failsafe:失敗不報錯,記⼊⽇志。
-
failback:失敗就失敗,開啟定時任務 定時重發。
-
forking:並⾏訪問多個服務器,獲取某⼀個結果既視為成功。
結論:使⽤dubbo,不推薦把重試關掉,⽽是在⾮冪等性操作的場景下,服務提供者⽅ 要做冪等性的解決⽅案(保證)。
服務降級
可以通過服務降級功能臨時屏蔽某個出錯的非關鍵服務,並定義降級后的返回策略。
向注冊中心寫入動態配置覆蓋規則:
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));
其中:
mock=force:return+null
表示消費方對該服務的方法調用都直接返回 null 值,不發起遠程調用。用來屏蔽不重要服務不可用時對調用方的影響。- 還可以改為
mock=fail:return+null
表示消費方對該服務的方法調用在失敗后,再返回 null 值,不拋異常。用來容忍不重要服務不穩定時對調用方的影響。 - Demo
//消費者注解添加mock
@DubboReference(version = "v1",mock = "force:return timeout")
本地存根
類似熔斷機制
遠程服務后,客戶端通常只剩下接⼝,⽽實現全在服務器端,但提供⽅有些時候想在客戶端 也執⾏部分邏輯,⽐如:做 ThreadLocal 緩存,提前驗證參數,調⽤失敗后偽造容錯數據等 等,此時就需要在 API 中帶上 Stub,客戶端⽣成 Proxy 實例,會把 Proxy 通過構造函數傳給 Stub 1,然后把 Stub 暴露給⽤戶,Stub 可以決定要不要去調 Proxy。
消費端配置
@DubboReference(version = "v1",stub = "com.rb.customer.HelloWord1Stub")
private HelloWord helloWord;
//或
@DubboReference(version = "v1",stub = "true")
private HelloWord helloWord;
Stub類
public class HelloWordStub implements HelloWord {
private final HelloWord helloWord;
public HelloWord1Stub(HelloWord helloWord) {
this.helloWord = helloWord;
}
public String helloWord(String parameter) {
try {
return helloWord.helloWord(parameter);
} catch (Exception e) {
return "Stub-Error:"+parameter;
}
}
}
流程
stub配置為true
- dubbo會在接口HelloWord包路徑下尋找HelloWordStub.java
stub配置為自定義*Stub.java路徑
-
dubbo會在使用定義路徑下的*Stub.java
然后執行下面兩步
-
消費端實際調用的是HelloWordStub下helloWord()方法
-
如果HelloWordStub類中 Dubbo提供的HelloWord類報錯,則調用catch方法
參數回調
個人理解就是類似支付后回調的流程,用戶支付后通過Dubbo調用支付寶,支付寶入賬后再通過反向代理調用用戶端提供的方法。
通過參數回調從服務器端調用客戶端邏輯
參數回調方式與調用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中聲明哪個參數是 callback 類型即可。Dubbo 將基於長連接生成反向代理,這樣就可以從服務器端調用客戶端邏輯。可以參考 dubbo 項目中的示例代碼。
創建生產者回調接口
package myinterface;
/**
* 創建生產者回調接⼝
*/
public interface CallbackListener {
void callback(String msg);
}
創建哪些功能方法需要回調
package myinterface;
/**
* 定義一個功能接口
*/
public interface HelloWord {
String helloWord(String parameter);
//創建帶回調的方法,key用於讓dubbo知道具體回調哪個方法,callbackListener為生產者調用回調的對象
String helloWord(String parameter,String key,CallbackListener callbackListener);
}
消費者端添加實現回調接口類
package com.rb.customer;
import myinterface.CallbackListener;
import java.io.Serializable;
public class CallbackServiceImpl implements CallbackListener, Serializable {
@Override
public void callback(String s) {
//如果是支付后回調,那這就是支付成功支付寶通知用戶該修改訂單狀態了
System.out.println("生產者回調:"+s);
}
}
消費者發起請求
helloWord.helloWord("RB","V1",new CallbackServiceImpl())
生產者接到請求處理
package com.rb.producer.impl;
import myinterface.CallbackListener;
import myinterface.HelloWord;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.annotation.Argument;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;
import org.apache.dubbo.rpc.RpcContext;
import java.util.concurrent.TimeUnit;
/**
* 對接口的實現
*/
//添加Dubbo注解
//name:哪個方法開啟回調,arguments.index:對應方法第幾個參數開啟回調,可配置多個
@DubboService(version = "v1",methods = {@Method(name="helloWord",arguments = {@Argument(index = 2, callback = true)})})
public class HelloWordImpl implements HelloWord {
public String helloWord(String param) {
return "HelloWordImpl20882:"+param;
}
@Override
public String helloWord(String s, String s1, CallbackListener callbackListener) {
callbackListener.callback("abcd");//業務處理完,要傳給消費端的參數,這個也是一個代理類,調用后消費端就可收到abcd
return "HelloWordImpl20882:"+s;
}
}
執行
當消費者發起調用后,消費者會打印CallbackServiceImpl.callback方法
異步調用
方法接口添加異步接口
/**
* 定義一個功能接口
*/
public interface HelloWord {
String helloWord(String parameter);
String helloWord(String parameter,String key,CallbackListener callbackListener);
//異步方法
CompletableFuture<String> helloWordAsync(String name);
}
消費者調用
@Test
void contextLoads() throws IOException {
//執行代理方法
CompletableFuture<String> rb = helloWord.helloWordAsync("RB");
rb.whenComplete((v, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("異步結果:" + v);
}
});
System.out.println("當前線程調⽤結束");
System.in.read();//阻塞,防止線程關了,回調失敗
}
生產者實現
package com.rb.producer.impl;
import lombok.SneakyThrows;
import myinterface.CallbackListener;
import myinterface.HelloWord;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.annotation.Argument;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;
import org.apache.dubbo.rpc.RpcContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* 對接口的實現
*/
//添加Dubbo注解
@DubboService(version = "v1",methods = {@Method(name="helloWord",arguments = {@Argument(index = 2, callback = true)})})
public class HelloWordImpl implements HelloWord {
public String helloWord(String param) {
return "HelloWordImpl20882:"+param;
}
@Override
public String helloWord(String s, String s1, CallbackListener callbackListener) {
callbackListener.callback("abcd");
return "HelloWordImpl20882:"+s;
}
@SneakyThrows
@Override
public CompletableFuture<String> helloWordAsync(String s) {
System.out.println("異步調⽤:" + s);
TimeUnit.SECONDS.sleep(2);//模擬耗時業務
return CompletableFuture.supplyAsync(() -> {
return "HelloWordImplAsync20882:"+s;
});
}
}