3.Dubbo各種特性使用方法


准備

將producer模塊復制出來三份,分別修改以下內容

  • service.port

  • dubbo.protocol.port

  • HelloWordImpl.java中的輸出內容

    public class HelloWordImpl implements HelloWord {
        public String helloWord(String s) {
            return "HelloWordImpl20881:"+s;
    //        return "HelloWordImpl20882:"+s;
      //      return "HelloWordImpl20883:"+s;
        }
    }
    
  • 三個生產者全部啟動

version

消費者可通過指定版本來控制要RPC調用對應版本的生產者

  • 修改生產者實現類注解

    //指定對外暴漏的服務版本為20882
    @DubboReference(version = "20882")
    
  • 修改消費者@DubboReference

    //指定請求版本為20882的生產者
    @DubboReference(version = "20882")
    

protocol通訊協議

dubbo可對協議擴展,例如:Dubbo\Rest\Http\⾃定義

配置單個協議

  protocol:
    port: 20881 #配置當前這個服務在dubbo容器中的端⼝號,每個dubbo容器內部的服務的端⼝號唯一
    name: dubbo

配置多個協議

  protocols:
    protocol_1:
      id: protocol_1
      name: rest
      port: 8080
      host: 127.0.0.1
    protocol_2:
      id: protocol_2
      name: dubbo
      port: 21881
      host: 127.0.0.1
    protocol_3:
      id: protocol_3
      name: http
      port: 8181
      host: 127.0.0.1

協議指定方式

//protocol不指定,yml聲明了幾個協議,就會對每個協議分別創建一份對應請求協議的服務
@DubboService(version = "20883",protocol = "配置中指定的id")
public class HelloWordImpl implements HelloWord {
    public String helloWord(String s) {
        return "HelloWordImpl20883:"+s;
    }
}

選擇Rest協議

最新版本鼓搗了好久,最終通過翻官方代碼找到的正確使用姿勢

POM

添加GVA

        <!-- https://mvnrepository.com/artifact/org.apache.dubbo/dubbo-rpc-rest -->
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-rpc-rest</artifactId>
            <version>3.0.6</version>
        </dependency>

YML

server:
  port: 8080
dubbo:
  application:
    name: hello-word-service #服務名稱
  registry:
    address: zookeeper://192.168.0.104:2181,192.168.0.104:2182,192.168.0.104:2183,192.168.0.104:2184 #zk地址
  protocol:
    port: 6060 #rest暴漏端口
    name: rest #選擇rest

修改生產者實現類

下面最終請求地址為 127.0.0.1:6066/rest/hello/helloWord?param=value

//                               path相當於在@Path前又加了一個前綴,可省略
@DubboService(protocol = "rest",path = "rest")
@Path("hello") //@RequestMapping
public class HelloWordImpl implements HelloWord {
    @GET  //發送get請求,還有put post等
    @Path("helloWord") // 方法請求路徑 ,@GET和@Path = @GetMapping
    @Produces({ContentType.APPLICATION_JSON_UTF_8}) //設置ContentType,可設置多個
    public String helloWord(@QueryParam("param") String param) {
        return "HelloWordImpl20881:"+param;
    }
}

跳過注冊中心,直連請求

生產者配置

假設生產者通過下面配置,並且實現類只標注了@DubboService(version = "v1"),未加protocol=‘’屬性來指明到底用哪個通訊協議,則生產者啟動會開啟兩個協議通訊端口

  protocols:
    protocol_20881:
      id: protocol_20881
      name: dubbo
      port: 20881
      host: 0.0.0.0
    protocol_20882:
      id: protocol_20882
      name: dubbo
      port: 20882
      host: 0.0.0.0

消費者直連方式

	//添加版本和直連的url/端口/接口全包名
	//輸入20882和20881都可調用成功,說明開啟了兩個通訊協議端口
	@DubboReference(version = "v1",url ="dubbo://127.0.0.1:20882/myinterface.HelloWord")
    private HelloWord helloWord;

    @Test
    void contextLoads() {
        //執行代理方法
        System.out.println(helloWord.helloWord("RB"));
    }

超時配置

@DubboReference(version = "v1",timeout = 3000)
//消費者配置超過3秒就超時報錯
@DubboService(version = "v1",timeout = 2000)
//生產者配置超過2秒就報錯
  • 都不配置默認超時時間1秒

  • 如果消費者或生產者只配置一方,則超時時間消費者和生產者共享配置

  • 如果消費者配置3秒過期,生產者配置2秒過期

    1. 當方法執行超過3秒后,消費者報錯

    2. 當方法執行超過2秒后,生產者會打印WARN日志,但方法還會執行完,不會終止

      [04/04/22 14:47:39:086 CST] NettyServerWorker-3-1  WARN transport.AbstractServer:  [DUBBO] All clients has disconnected from /192.168.0.105:20882. You can graceful shutdown now., dubbo version: 3.0.6, current host: 192.168.0.105
      

集群容錯(高可用)機制

dubbo為集群調⽤提供了容錯⽅案:

  • failover(默認):

    當出現失敗時,會進⾏重試,默認重試2次,⼀共三次調⽤。但是會出現冪等性問題。 雖然會出現冪等性問題,但是依然推薦使⽤這種容錯機制,在業務層⾯解決冪等性問題:

    • ⽅案⼀:把數據的業務id作為數據庫的聯合主鍵,此時業務id不能重復。

    • ⽅案⼆(推薦):使⽤分布式鎖來解決重復消費問題。

  • failfast:當出現失敗時。⽴即報錯,不進⾏重試。

  • failsafe:失敗不報錯,記⼊⽇志。

  • failback:失敗就失敗,開啟定時任務 定時重發。

  • forking:並⾏訪問多個服務器,獲取某⼀個結果既視為成功。

結論:使⽤dubbo,不推薦把重試關掉,⽽是在⾮冪等性操作的場景下,服務提供者⽅ 要做冪等性的解決⽅案(保證)。

服務降級

可以通過服務降級功能臨時屏蔽某個出錯的非關鍵服務,並定義降級后的返回策略。

向注冊中心寫入動態配置覆蓋規則:

RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));

其中:

  • mock=force:return+null 表示消費方對該服務的方法調用都直接返回 null 值,不發起遠程調用。用來屏蔽不重要服務不可用時對調用方的影響。
  • 還可以改為 mock=fail:return+null 表示消費方對該服務的方法調用在失敗后,再返回 null 值,不拋異常。用來容忍不重要服務不穩定時對調用方的影響。
  • Demo
//消費者注解添加mock
@DubboReference(version = "v1",mock = "force:return timeout")

本地存根

類似熔斷機制

遠程服務后,客戶端通常只剩下接⼝,⽽實現全在服務器端,但提供⽅有些時候想在客戶端 也執⾏部分邏輯,⽐如:做 ThreadLocal 緩存,提前驗證參數,調⽤失敗后偽造容錯數據等 等,此時就需要在 API 中帶上 Stub,客戶端⽣成 Proxy 實例,會把 Proxy 通過構造函數傳給 Stub 1,然后把 Stub 暴露給⽤戶,Stub 可以決定要不要去調 Proxy。

消費端配置

@DubboReference(version = "v1",stub = "com.rb.customer.HelloWord1Stub")
private HelloWord helloWord;
//或
@DubboReference(version = "v1",stub = "true")  
private HelloWord helloWord;

Stub類

public class HelloWordStub implements HelloWord {
    private final HelloWord helloWord;
    public HelloWord1Stub(HelloWord helloWord) {
        this.helloWord = helloWord;
    }
    public String helloWord(String parameter) {
        try {
            return helloWord.helloWord(parameter);
        } catch (Exception e) {
            return "Stub-Error:"+parameter;
        }
    }
}

流程

stub配置為true

  1. dubbo會在接口HelloWord包路徑下尋找HelloWordStub.java

stub配置為自定義*Stub.java路徑

  1. dubbo會在使用定義路徑下的*Stub.java

    然后執行下面兩步

  2. 消費端實際調用的是HelloWordStub下helloWord()方法

  3. 如果HelloWordStub類中 Dubbo提供的HelloWord類報錯,則調用catch方法

參數回調

個人理解就是類似支付后回調的流程,用戶支付后通過Dubbo調用支付寶,支付寶入賬后再通過反向代理調用用戶端提供的方法。

通過參數回調從服務器端調用客戶端邏輯

參數回調方式與調用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中聲明哪個參數是 callback 類型即可。Dubbo 將基於長連接生成反向代理,這樣就可以從服務器端調用客戶端邏輯。可以參考 dubbo 項目中的示例代碼

創建生產者回調接口

package myinterface;

/**
 * 創建生產者回調接⼝
 */
public interface CallbackListener {
    void callback(String msg);
}

創建哪些功能方法需要回調

package myinterface;

/**
 * 定義一個功能接口
 */
public interface HelloWord {
    String helloWord(String parameter);
    //創建帶回調的方法,key用於讓dubbo知道具體回調哪個方法,callbackListener為生產者調用回調的對象
    String helloWord(String parameter,String key,CallbackListener callbackListener);
}

消費者端添加實現回調接口類

package com.rb.customer;

import myinterface.CallbackListener;
import java.io.Serializable;

public class CallbackServiceImpl implements CallbackListener, Serializable {

    @Override
    public void callback(String s) {
        //如果是支付后回調,那這就是支付成功支付寶通知用戶該修改訂單狀態了
        System.out.println("生產者回調:"+s);
    }
}

消費者發起請求

helloWord.helloWord("RB","V1",new CallbackServiceImpl())

生產者接到請求處理

package com.rb.producer.impl;


import myinterface.CallbackListener;
import myinterface.HelloWord;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.annotation.Argument;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;
import org.apache.dubbo.rpc.RpcContext;

import java.util.concurrent.TimeUnit;

/**
 * 對接口的實現
 */
//添加Dubbo注解
//name:哪個方法開啟回調,arguments.index:對應方法第幾個參數開啟回調,可配置多個
@DubboService(version = "v1",methods = {@Method(name="helloWord",arguments = {@Argument(index = 2, callback = true)})})
public class HelloWordImpl implements HelloWord {
    public String helloWord(String param) {
        return "HelloWordImpl20882:"+param;
    }

    @Override
    public String helloWord(String s, String s1, CallbackListener callbackListener) {
        callbackListener.callback("abcd");//業務處理完,要傳給消費端的參數,這個也是一個代理類,調用后消費端就可收到abcd
        return "HelloWordImpl20882:"+s;
    }
}

執行

當消費者發起調用后,消費者會打印CallbackServiceImpl.callback方法

異步調用

方法接口添加異步接口

/**
 * 定義一個功能接口
 */
public interface HelloWord {
    String helloWord(String parameter);
    String helloWord(String parameter,String key,CallbackListener callbackListener);

    //異步方法
    CompletableFuture<String> helloWordAsync(String name);
}

消費者調用

@Test
    void contextLoads() throws IOException {
        //執行代理方法
        CompletableFuture<String> rb = helloWord.helloWordAsync("RB");
        rb.whenComplete((v, e) -> {
            if (e != null) {
                e.printStackTrace();
            } else {
                System.out.println("異步結果:" + v);
            }
        });
        System.out.println("當前線程調⽤結束");
        System.in.read();//阻塞,防止線程關了,回調失敗

    }

生產者實現

package com.rb.producer.impl;


import lombok.SneakyThrows;
import myinterface.CallbackListener;
import myinterface.HelloWord;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.annotation.Argument;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;
import org.apache.dubbo.rpc.RpcContext;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * 對接口的實現
 */
//添加Dubbo注解
@DubboService(version = "v1",methods = {@Method(name="helloWord",arguments = {@Argument(index = 2, callback = true)})})
public class HelloWordImpl implements HelloWord {
    public String helloWord(String param) {
        return "HelloWordImpl20882:"+param;
    }

    @Override
    public String helloWord(String s, String s1, CallbackListener callbackListener) {
        callbackListener.callback("abcd");
        return "HelloWordImpl20882:"+s;
    }

    @SneakyThrows
    @Override
    public CompletableFuture<String> helloWordAsync(String s) {
        System.out.println("異步調⽤:" + s);
        TimeUnit.SECONDS.sleep(2);//模擬耗時業務
        return CompletableFuture.supplyAsync(() -> {
            return "HelloWordImplAsync20882:"+s;
        });
    }
}

Dubbo更多高級功能


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM