brpc學習筆記之client基礎功能


https://github.com/apache/incubator-brpc/blob/master/docs/cn/client.md

1. Client端示例程序

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// A client sending requests to server every 1 second.

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include "echo.pb.h"

DEFINE_string(attachment, "", "Carry this along with requests");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");

int main(int argc, char* argv[]) {
    // Parse gflags. We recommend you to use gflags as well.
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    
    // A Channel represents a communication line to a Server. Notice that 
    // Channel is thread-safe and can be shared by all threads in your program.
    brpc::Channel channel;
    
    // Initialize the channel, NULL means using default options.
    brpc::ChannelOptions options;
    options.protocol = FLAGS_protocol;
    options.connection_type = FLAGS_connection_type;
    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
    options.max_retry = FLAGS_max_retry;
    if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
        LOG(ERROR) << "Fail to initialize channel";
        return -1;
    }

    // Normally, you should not call a Channel directly, but instead construct
    // a stub Service wrapping it. stub can be shared by all threads as well.
    example::EchoService_Stub stub(&channel);

    // Send a request and wait for the response every 1 second.
    int log_id = 0;
    while (!brpc::IsAskedToQuit()) {
        // We will receive response synchronously, safe to put variables
        // on stack.
        example::EchoRequest request;
        example::EchoResponse response;
        brpc::Controller cntl;

        request.set_message("hello world");

        cntl.set_log_id(log_id ++);  // set by user
        // Set attachment which is wired to network directly instead of 
        // being serialized into protobuf messages.
        cntl.request_attachment().append(FLAGS_attachment);

        // Because `done'(last parameter) is NULL, this function waits until
        // the response comes back or error occurs(including timedout).
        stub.Echo(&cntl, &request, &response, NULL);
        if (!cntl.Failed()) {
            LOG(INFO) << "Received response from " << cntl.remote_side()
                << " to " << cntl.local_side()
                << ": " << response.message() << " (attached="
                << cntl.response_attachment() << ")"
                << " latency=" << cntl.latency_us() << "us";
        } else {
            LOG(WARNING) << cntl.ErrorText();
        }
        usleep(FLAGS_interval_ms * 1000L);
    }

    LOG(INFO) << "EchoClient is going to quit";
    return 0;
}
View Code

2. 事實速查

  • Channel.Init()是線程不安全的。
  • Channel.CallMethod()是線程安全的,一個Channel可以被所有線程同時使用。
  • Channel可以分配在棧上。
  • Channel在發送異步請求后可以析構。
  • 沒有brpc::Client這個類。

3. Channel

Client指發起請求的一端,在brpc中沒有對應的實體,取而代之的是brpc::Channel,它代表和一台或一組服務器的交互通道,Client和Channel在角色上的差異在實踐中並不重要,你可以把Channel視作Client。

吐槽一下:那為什么不使用Client的概念,以減少大家的學習成本呢?

Channel可以被所有線程共用,你不需要為每個線程創建獨立的Channel,也不需要用鎖互斥。不過Channel的創建和Init並不是線程安全的,請確保在Init成功后再被多線程訪問,在沒有線程訪問后再析構。

一些RPC實現中有ClientManager的概念,包含了Client端的配置信息和資源管理。brpc不需要這些,以往在ClientManager中配置的線程數、長短連接等等要么被加入了brpc::ChannelOptions,要么可以通過gflags全局配置,這樣做的好處:

  • 方便。不需要在創建Channel時傳入ClientManager,也不需要存儲ClientManager。否則不少代碼需要一層層地傳遞ClientManager,很麻煩。gflags使一些全局行為的配置更加簡單。
  • 共用資源。比如server和channel可以共用后台線程(bthread的工作線程)。 ???
  • 生命周期。析構ClientManager的過程很容易出錯,現在由框架負責,則不會有問題。

就像大部分類那樣,Channel必須在Init之后才能使用,options為NULL時所有參數取默認值,如果你要使用非默認值,這么做就行了:

brpc::ChannelOptions options;  // 包含了默認值
options.xxx = yyy;
...
channel.Init(..., &options);

注意Channel不會修改options,Init結束后不會再訪問options。所以options一般就像上面代碼中那樣放棧上。Channel.options()可以獲得channel在使用的所有選項。

Init函數分為連接一台服務器和連接服務集群。

4. 連接一台服務器

// options為NULL時取默認值
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);

這類Init連接的服務器往往有固定的ip地址,不需要命名服務和負載均衡,創建起來相對輕量。但是請勿頻繁創建使用域名的Channel。這需要查詢dns,可能最多耗時10秒(查詢DNS的默認超時)。重用它們。

合法的“server_addr_and_port”:

  • 127.0.0.1:80
  • www.foo.com:8765
  • localhost:9000

5. 連接服務集群

int Init(const char* naming_service_url,
         const char* load_balancer_name,
         const ChannelOptions* options);

這類Channel需要定期從naming_service_url指定的命名服務中獲得服務器列表,並通過load_balancer_name指定的負載均衡算法選擇出一台機器發送請求。

你不應該在每次請求前動態地創建此類(連接服務集群的)Channel。因為創建和析構此類Channel牽涉到較多的資源,比如在創建時得訪問一次命名服務,否則便不知道有哪些服務器可選。由於Channel可被多個線程共用,一般也沒有必要動態構建。

當load_balanver_name為NULL或空時,此Init等同於連接單台server的Init,naming_service_url應該是“ip:port”或“域名:port”。你可以通過這個Init函數統一Channel的初始化方式。比如你可以把naming_service_url和load_balancer_name放在配置文件中,要連接單台server時把load_balancer_name置空,要連接服務集群時則設置一個有效的算法名稱。

5.1 命名服務

命名服務把一個名字映射為可修改的機器列表,在client端的位置如下:

<這缺個圖>

有了命名服務后client記錄的是一個名字,而不是每一台下游機器。而當下游機器變化時,就只需要修改命名服務中的列表,而不需要逐台修改每個上游。這個過程也常被稱為“解耦上下游”。當然在具體實現上,上游會記錄每一台下游機器,並定期向命名服務請求或被推送最新的列表,以避免在RPC請求時采取訪問命名服務。使用命名服務一般不會對性能造成影響,對命名服務的壓力也很小。

naming_service_url的一般形式是“protocol://service_name”。

作者給了幾種例子,這里僅列舉出來,不展開了:

  • bns://<bns-name>
  • file://<path>
  • list://<addr1>,<addr2>...
  • http://<url>
  • https://<url>
  • consul://<service-name>
更多命名服務

用戶可以通過實現brpc::NamingService來對接更多命名服務,可以參考 《brpc學習筆記之負載均衡》

命名服務中的tag

每個地址可以附帶一個tag,在常見的命名服務中,如果地址后有空格,則空格之后的內容均為tag。相同地址配合不同的tag,被認為是不同的實例,brpc會建立不同的連接。用戶可利用這個特性更靈活地控制與單個地址的連接方式。如果你需要“帶權重的輪詢”,你應當優先考慮使用wrr(weighted round roubin)算法,而不是用tag來模擬。

VIP相關的問題

TODO

命名服務過濾器

當命名服務獲得機器列表后,可以自定義一個過濾器進行篩選,最后把結果傳遞給負載均衡:

<這缺個圖>

過濾器接口如下:

// naming_service_filter.h
class NamingServiceFilter {
public:
    // Return true to take this `server' as a candidate to issue RPC
    // Return false to filter it out
    virtual bool Accept(const ServerNode& server) const = 0;
};
 
// naming_service.h
struct ServerNode {
    butil::EndPoint addr;
    std::string tag;
};

常見的業務策略如根據server的tag進行過濾。

自定義的過濾器配置在ChannelOptions中,默認為NULL(不過濾)。

class MyNamingServiceFilter : public brpc::NamingServiceFilter {
public:
    bool Accept(const brpc::ServerNode& server) const {
        return server.tag == "main";
    }
};
 
int main() {
    ...
    MyNamingServiceFilter my_filter;
    ...
    brpc::ChannelOptions options;
    options.ns_filter = &my_filter;
    ...
}

5.2 負載均衡

當下游機器超過一台時,我們需要分割流量,此過程一般稱為負載均衡,在client端的位置如下圖所示:

<這缺個圖>

理想的算法是每個請求都得到及時的處理,且任意機器crash對全局影響較小。但由於client端無法及時獲得server端的延遲或擁塞,而且負載均衡算法不能耗費太多的cpu,一般來說用戶得根據具體的場景選擇合適的算法,目前rpc提供的算法有(通過load_balancer_name制定):

  • rr
    • 即round robin,總是選擇列表中的下一台服務器,結尾的下一台是開頭,無需其他設置。比如有3台機器a,b,c,那么brpc會一次想a,b,c,a,b,c,...發送請求。注意這個算法的前提是服務器的配置、網絡條件、負載都是類似的。
  • wrr
    • 即weighted round robin,根據服務器列表配置的權重值來選擇服務器。服務器被選到的機會正比於其權重值,並且該算法能保證同一服務器被選到的結果較均衡的散開。
  • random
    • 隨機從列表中選擇一台服務器,無需其他設置。和round robin類似,這個算法的前提也是服務器都是類似的。
  • la
  • c_murmurhash or c_md5
    • 一致性哈希,與簡單hash的不同之處在於增加和刪除機器時不會使分桶結果劇烈變化,特別適合cache類服務。 ???

發起RPC前需要設置Controller.set_request_code(),否則RPC會失敗。request_code一般是請求中主鍵部分的32位哈希值,不需要和負載均衡使用的哈希算法一致。比如用c_murmurhash算法也可以用md5計算哈希值。

src/brpc/policy/hasher.h中包含了常用的hash函數。如果用std::string key代表請求的主鍵,controller.set_request_code(brpc::policy::MurmurHash32(key.data(), key.size()))就正確設置了request_code。

注意甄別請求中的“主鍵”部分和“屬性”部分,不要為了偷懶或通用,就把請求的所有內容一股腦兒計算出哈希值,屬性的變化會使請求的目的地 發生劇烈的變化。另外也要注意padding的問題,比如struct Foo { int32_t a; int64_t b; }在64位機器上a和b之間有4個字節的空隙,內容未定義,如果像hash(&foo, sizeof(foo))這樣計算哈希值,結果就是未定義的,得把內容緊密排列或序列化后再算。

實現原理請查看 《brpc學習筆記之一致性哈希》

從集群宕機后恢復時的客戶端限流

集群宕機指的是集群中所有server都處於不可用的狀態。由於健康檢查機制,當集群恢復正常后,server會間隔性地上線。當某一個server上線后,所有的流量都會發送過去,可能導致服務再次過載。若熔斷開啟,則可能導致其它server上線前該server再次熔斷,集群永遠無法恢復。作為解決方案,brpc提供了在集群宕機后恢復時的限流機制:當集群中沒有可用server時,集群進入恢復狀態,假設正好能服務所有請求的server數量為min_working_instances,當前集群可用的server數量為q,則在恢復狀態時,client接受請求的概率為q/min_working_instances,否則丟棄;若一段時間hold_seconds內q保持不變,則把流量重新發送到全部可用的server上,並離開恢復狀態。

在恢復階段時,可以通過判斷controller.ErrorCode()是否等於brpc::ERJECT來判斷該次請求是否被拒絕,被拒絕的請求不會被框架重試。

此恢復機制要求下游server的能力是類似的,所以目前只針對rr和random有效,開啟方式是在load_balancer_name后面加上min_working_instances和hold_seconds參數的值,例如

channel.Init("http://...", "random:min_working_instances=6 hold_seconds=10", &options);

5.3 健康檢查

連接斷開的server會被暫時隔離而不會被負載均衡算法選中,brpc會定期連接被隔離的server,以檢查他們是否恢復正常,間隔由參數-health_check_interval控制。

在默認的配置下,一旦server被連接上,它會恢復為可用狀態;brpc還提供了應用層健康檢查的機制,框架會發送一個HTTP GET請求到該server,請求路徑通過-health_check_path設置(默認為空),只有當server返回200時,它才會恢復。在兩種健康檢查機制下,都可通過-health_check_timeout_ms設置超時(默認500ms)。如果在隔離過程中,server從命名服務中刪除了,brpc也會停止連接嘗試。

6. 發起訪問

一般來說,我們不直接調用Channel.CallMethod,而是通過protobuf生成的樁XXX_Stub,過程更像是“調用函數”。stub內沒什么成員變量,建議在棧上創建和使用,而不必new,當然你也可以把stub存下來復用。Channel::CallMethod和stub訪問都是線程安全的,可以被所有線程同時訪問。比如:

XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);

甚至

XXX_Stub(&channel).some_method(controller, request, response, done);

一個例外是http/h2 client。訪問http服務和protobuf沒什么關系,直接調用CallMethod即可,除了Controller和done均為NULL,詳見 《brpc學習筆記之http client》

6.1 同步訪問

指的是:CallMethod會阻塞到收到server端返回response或發生錯誤(包括超時)。

同步訪問中的response/controller不會在CallMethod后被框架使用,它們都可以分配在棧上。注意,如果request/response字段特別多、字節數特別大的話,還是更適合分配在堆上。

MyRequest request;
MyResponse response;
brpc::Controller cntl;
XXX_Stub stub(&channel);
 
request.set_foo(...);
cntl.set_timeout_ms(...);
stub.some_method(&cntl, &request, &response, NULL);
if (cntl->Failed()) {
    // RPC失敗了. response里的值是未定義的,勿用。
} else {
    // RPC成功了,response里有我們想要的回復數據。
}

6.2 異步訪問

指的是:給CallMethod傳遞一個額外的回調對象done,CallMethod在發出request就結束了,而不是在RPC結束后。當server端返回response或發生錯誤(包括超時)時,done->Run()會被調用。對RPC的后續處理應該寫在done->Run()里,而不是CallMethod后。

由於CallMethod結束並不意味着RPC結束,response/controller仍可能被框架及done->Run()使用,它們一般得創建在堆上,並在done->Run()中刪除。如果提前刪除了它們,那當done->Run()被調用時,將訪問到無效內存。

你可以獨立地創建這些對象,並使用NewCallback生成done,也可以把Response和Controller作為done的成員變量,一起new出來,一般使用前一種方法。

發起異步請求后Request和Channel也可以立刻析構。這兩樣和response/controller是不同的。注意:這是說Channel的析構可以立刻發生在CallMethod之后,並不是說析構可以和CallMethod同時發生,刪除正被另一個線程使用的Channel是未定義行為(很可能crash)。

使用NewCallback
static void OnRPCDone(MyResponse* response, brpc::Controller* cntl) {
    // unique_ptr會幫助我們在return時自動刪掉response/cntl,防止忘記。gcc 3.4下的unique_ptr是模擬版本。
    std::unique_ptr<MyResponse> response_guard(response);
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    if (cntl->Failed()) {
        // RPC失敗了. response里的值是未定義的,勿用。
    } else {
        // RPC成功了,response里有我們想要的數據。開始RPC的后續處理。    
    }
    // NewCallback產生的Closure會在Run結束后刪除自己,不用我們做。
}
 
MyResponse* response = new MyResponse;
brpc::Controller* cntl = new brpc::Controller;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在異步訪問中.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl, &request, response, google::protobuf::NewCallback(OnRPCDone, response, cntl));

由於protobuf 3把NewCallback設置為私有,r32035后brpc把NewCallback獨立於src/brpc/callback.h(並增加了一些重載)。如果你的程序出現NewCallback相關的編譯錯誤,把google::protobuf::NewCallback替換為brpc::NewCallback就行了。

繼承google::protobuf::Closure

 使用NewCallback的缺點是要分配三次內存:response,controller,done。如果profiler證明這兒的內存分配有瓶頸,可以考慮自己繼承Closure,把response/controller作為成員變量,這樣可以把三次new合並為一次。但缺點就是代碼不夠美觀,如果內存分配不是瓶頸,別用這種方法。

class OnRPCDone: public google::protobuf::Closure {
public:
    void Run() {
        // unique_ptr會幫助我們在return時自動delete this,防止忘記。gcc 3.4下的unique_ptr是模擬版本。
        std::unique_ptr<OnRPCDone> self_guard(this);
          
        if (cntl->Failed()) {
            // RPC失敗了. response里的值是未定義的,勿用。
        } else {
            // RPC成功了,response里有我們想要的數據。開始RPC的后續處理。
        }
    }
 
    MyResponse response;
    brpc::Controller cntl;
}
 
OnRPCDone* done = new OnRPCDone;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在異步訪問中.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);
如果異步訪問中的回調函數特別復雜會有什么影響嗎?

沒有特別的影響,回調會運行在獨立的bthread中,不會阻塞其他的邏輯。你可以在回調中做各種阻塞操作。

rpc發送處的代碼和回調函數是在同一個線程里執行嗎?

一定不在同一個線程里運行,即使該次rpc調用剛進去就失敗了,回調也會在另一個bthread中運行。這可以在加鎖進行rpc(不推薦)的代碼中避免死鎖。???

6.3 等待RPC完成

注意:當你需要發起多個並發操作時,可能ParallelChannel更方便。

如下代碼發起兩個異步RPC后等待他們。

const brpc::CallId cid1 = controller1->call_id();
const brpc::CallId cid2 = controller2->call_id();
...
stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);
...
brpc::Join(cid1);
brpc::Join(cid2);

在發起RPC前調用Controller.call_id()獲得一個id,發起RPC調用后Join那個id。

Join()的行為是等到RPC結束且done->Run()運行后,一些Join的性質如下:

  • 如果對應的RPC已經結束,Join將立刻返回。
  • 多個線程可以Join同一個id,它們都會醒來。
  • 同步RPC也可以在另一個線程中被Join,但一般不會這么做。

Join()在之前的版本叫做JoinResponse(),如果你在編譯時被提示deprecated之類的,修改為Join()。

在RPC調用后Join(controller->call_id())是錯誤的行為,一定要先把call_id保存下來。因為RPC調用后controller可能被隨時開始運行的done刪除。下面代碼的Join方式是錯誤的。

static void on_rpc_done(Controller* controller, MyResponse* response) {
    ... Handle response ...
    delete controller;
    delete response;
}
 
Controller* controller1 = new Controller;
Controller* controller2 = new Controller;
MyResponse* response1 = new MyResponse;
MyResponse* response2 = new MyResponse;
...
stub.method1(controller1, &request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2, &request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
brpc::Join(controller1->call_id());   // 錯誤,controller1可能被on_rpc_done刪除了
brpc::Join(controller2->call_id());   // 錯誤,controller2可能被on_rpc_done刪除了

6.4 半同步

Join可用來實現“半同步”訪問:即等待多個異步訪問完成。由於調用處的代碼會等到所有RPC都結束后再醒來,所以controller和response都可以放棧上。

brpc::Controller cntl1;
brpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
...
stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
...
brpc::Join(cntl1.call_id());
brpc::Join(cntl2.call_id());

brpc::DoNothing()可獲得一個什么都不干的done,專門用於半同步訪問。它的聲明周期由框架管理,用戶不用關心。

注意在上面的代碼中,我們在RPC結束后又訪問了controller.call_id(),這是沒有問題的,因為DoNothing中並不會像上節中的on_rpc_done中那樣刪除Controller。

6.5 取消RPC

brpc::StartCancel(call_id)可取消對應的RPC,call_id必須在發起RPC前通過Controller.call_id()獲得,其他時刻都可能有race condition。

注意:是brpc::StartCancel(call_id),不是controller->StartCancel(),后者被禁用,沒有效果。后者是protobuf默認提供的接口,但是在controller對象的生命周期上有嚴重的競爭問題。

顧名思義,StartCancel調用完成后RPC並未立刻結束,你不應該碰觸Controller的任何字段或刪除任何資源,它們自然會在RPC結束時被done中對應邏輯處理。如果你一定要在原地等到RPC結束(一般不需要),則可通過Join(call_id)。

關於StartCancel的一些事實:

  • call_id在發起RPC前就可以被取消,RPC會直接結束(done仍會被調用)。
  • call_id可以在另一個線程中被取消。
  • 取消一個已經取消的call_id不會有任何效果。推論:同一個call_id可以被多個線程同時取消,但最多一次有效果。
  • 這里的取消是純client端的功能,server端未必會取消對應的操作,server cancelation是另一個功能。

6.6 獲取Server的地址和端口

remote_side()方法可知道request被送向了哪個server,返回值類型是butil::EndPoint,包含一個ip4地址和端口。在RPC結束前調用這個方法都是沒有意義的。

打印方式:

LOG(INFO) << "remote_side=" << cntl->remote_side();
printf("remote_side=%s\n", butil::endpoint2str(cntl->remote_side()).c_str());

6.7 獲取Client的地址和端口

r31384后通過local_side()方法可在RPC結束后獲得發起RPC的地址和端口。

打印方式:

LOG(INFO) << "local_side=" << cntl->local_side(); 
printf("local_side=%s\n", butil::endpoint2str(cntl->local_side()).c_str());

6.8 應該重用brpc::Controller嗎?

不用刻意地重用,但Controller是個大雜燴,可能會包含一些緩存,Reset()可以避免反復地創建這些緩存。

在大部分場景下,構造Controller(snippet1)和重置Controller(snippet2)的性能差異不大。

// snippet1
for (int i = 0; i < n; ++i) {
    brpc::Controller controller;
    ...
    stub.CallSomething(..., &controller);
}
 
// snippet2
brpc::Controller controller;
for (int i = 0; i < n; ++i) {
    controller.Reset();
    ...
    stub.CallSomething(..., &controller);
}

但如果snippet1中的Controller是new出來的,那么snippet1就會多出“內存分配”的開銷,在一些情況下可能會慢一些。

7. 設置

Client端的設置主要由三部分組成:

  • brpc::ChannelOptions:定義在src/brpc/channel.h中,用於初始化Channel,一旦初始化成功無法修改。
  • brpc::Controller:定義在src/brpc/controller.h中,用於某次RPC中覆蓋ChannelOptions中的選項,可根據上下文每次均不同。
  • 全局gflags:常用於調用一些底層代碼的行為,一般不用修改。請自行閱讀服務flags頁面中的說明。

Controller包含了request中沒有的數據和選項。server端和client端的Controller結構體是一樣的,但使用的字段可能是不同的,你需要仔細閱讀Controller中的注釋,明確哪些字段可以在server端使用,哪些可以在client端使用。

一個Controller對應一次RPC。一個Controller可以在Reset()后被另一個RPC復用,但一個Controller不能被多個RPC同時使用(不論是否在同一個線程發起)。

Controller的特點:

  1. 一個Controller只能有一個使用者,沒有特殊說明的話,Controller中的方法默認線程不安全。
  2. 因為不能共享,所以一般不會用共享指針管理Controller,如果你用共享指針了,很可能意味着出錯了。
  3. Controller創建於開始RPC前,析構於RPC結束后,常見幾種模式:
    • 同步RPC前Controller放棧上,出作用域后自行析構。注意異步RPC的Controller絕對不能放棧上,否則其析構時異步調用很可能還在進行中,從而引發未定義行為。
    • 異步RPC前new Controller,done中刪除。

7.1 線程數

和大部分的RPC框架不同,brpc中並沒有獨立的Client線程池。所有Channel和Server通過bthread共享相同的線程池。如果你的程序同樣適用了brpc的server,僅僅需要設置Server的線程數。或者可以通過gflags設置-bthread_concurrency來設置全局的線程數。

7.2 超時

ChannelOptions.timeout_ms是對應Channel上所有RPC的總超時,Controller.set_timeout_ms()可修改某次RPC的值。單位毫秒,默認值1秒,最大值2^31(約24天),-1表示一直等到回復或錯誤。

ChannelOptions.connect_timeout_ms是對應Channel上所有RPC的連接超時(單位毫秒)。-1表示等到連接建立或出錯,此值被限制為不能超過timeout_ms。注意此超時獨立於TCP的連接超時,一般來說前者小於后者,反之則可能在connect_timeout_ms未達到前由於TCP連接超時而出錯。

注意1:brpc中的超時是deadline,超過就意味着RPC結束,超時后沒有重試。其他實現可能既有單次訪問的超時,也有代表deadline的超時。遷移到brpc時請仔細區分。

注意2:RPC超時的錯誤碼為ERPCTIMEOUT(1008),ETIMEOUT的意思是連接超時,且可重試。

7.3 重試

ChannelOptions.max_retry是該Channel上所有RPC的默認最大重試次數,Controller.set_max_retry()可修改某次RPC的值,默認值3,0表示不重試。

r32111后Controller.retried_count()返回重試次數。

r34717后Controller.has_backup_request()獲知是否發送過backup_request。

重試時框架會盡量避開之前嘗試過的server。

重試的觸發條件有(條件之間是AND關系):

  • 連接出錯
  • 沒到超時
  • 有剩余重試次數
  • 錯誤值得重試
連接出錯

如果server一直沒有返回,但連接沒有問題,這種情況下不會重試。如果你需要在一定時間后發送另一個請求,使用backup request。

工作機制如下:如果response沒有在backup_request_ms內返回,則發送另外一個請求,哪個先回來就取哪個。新請求會被盡量送到不同的server。注意如果backup_request_ms大於超時,則backup request總不會被發送。backup request會消耗一次重試次數。backup request不意味着server端cancel。

ChannelOptions.backup_request_ms影響該Channel上所有RPC,單位毫秒,默認值-1(表示不開啟),Controller.set_backup_request_ms()可修改某次RPC的值。

沒到超時

超時后RPC會盡快結束。

有剩余重試次數

Controller.set_max_retry(0)或ChannelOptions.max_retry=0關閉重試。

錯誤值得重試

一些錯誤重試是沒有意義的,就不會重試,比如請求有錯時(EREQUEST)不會重試,因為server總不會接受,沒有意義。

用戶可以通過繼承brpc::RetryPolicy自定義重試條件。比如brpc默認不重試http/h2相關的錯誤,而你的程序中希望在碰到HTTP_STATUS_FORBIDDEN(403)時重試,可以這么做:

#include <brpc/retry_policy.h>
 
class MyRetryPolicy : public brpc::RetryPolicy {
public:
    bool DoRetry(const brpc::Controller* cntl) const {
        if (cntl->ErrorCode() == brpc::EHTTP && // http/h2錯誤
            cntl->http_response().status_code() == brpc::HTTP_STATUS_FORBIDDEN) {
            return true;
        }
        // 把其他情況丟給框架。
        return brpc::DefaultRetryPolicy()->DoRetry(cntl);
    }
};
...
 
// 給ChannelOptions.retry_policy賦值就行了。
// 注意:retry_policy必須在Channel使用期間保持有效,Channel也不會刪除retry_policy,所以大部分情況下RetryPolicy都應以單例模式創建。
brpc::ChannelOptions options;
static MyRetryPolicy g_my_retry_policy;
options.retry_policy = &g_my_retry_policy;
...

一些提示:

  • 通過cntl->response()可獲得對應RPC的response。
  • 對ERPCTIMEOUT代表的RPC超時總是不重試,即使你繼承的RetryPolicy中允許。
重試應當保守

由於成本的限制,大部分線上server的冗余度是有限的,主要是滿足多機房互備的需求。而激進的重試邏輯很容易導致眾多client對server集群造成2-3倍的壓力,最終使集群雪崩:由於server來不及處理導致隊列越積越長,使所有的請求得經過很長的排隊才被處理而最終超時,相當於服務停擺。默認的重試是比較安全的:只要連接不斷RPC就不會重試,一般不會產生大量的重試請求。用戶可以通過RetryPolicy定制重試策略,但也可能使重試變成一場“風暴”。當你定制RetryPolicy時,你需要仔細考慮client和server的協作關系,並設計對應的異常測試,以確保行為符合預期。

7.4 熔斷

《brpc學習筆記之熔斷》

7.5 協議

Channel的默認協議是baidu_std,可通過設置ChannelOptions.protocol換為其他協議,這個字段既接受enum也接受字符串。

目前支持的有:

  • baidu_std
  • http
  • h2
  • h2:grpc
  • thrift
  • memcache
  • redis
  • hulu_pbrpc
  • nova_pbrpc
  • sofa_pbrpc
  • public_pbrpc
  • ubrpc_compack
  • nshead_client
  • nshead
  • nshead_mcpack
  • esp

7.6 連接方式

brpc支持以下連接方式:

  • 短連接:每次RPC前建立連接,結束后關閉連接。由於每次調用得有建立連接的開銷,這種方式一般用於偶爾發起的操作,而不是持續發起請求的場景。沒有協議默認使用這種連接方式,http/1.0對連接的處理效果類似短連接。
  • 連接池:每次RPC前取用空閑連接,結束后歸還,一個連接上最多只有一個請求,一個client對一台server可能有多條連接。http/1.1和各類使用nshead的協議都是這個方式。
  • 單連接:進程內所有client與一台server最多只有一個連接,一個連接上可能同時有多個請求,回復返回順序和請求順序不需要一致,這是baidu_std,hulu_pbrpc,sofa_pbrpc協議的默認選項。

框架會為協議選擇默認的連接方式,用戶一般不用修改。若需要,把ChannelOptions.connection_type設為:

  • “single”為單連接
  • “pooled”為連接池
  • “short”為短連接
  • 空字符串則讓框架選擇協議對應的默認連接方式。

brpc支持Streaming RPC,這是一種應用層的連接,用於傳遞流式數據。

7.7 關閉連接池中的閑置連接

當連接池中的某個連接在-ide_timeout_second時間內沒有讀寫,則被視作“閑置”,會被自動關閉。默認值為10秒。此功能只對連接池(pooled)有效。打開-log_idle_connection_close在關閉前會打印一條日志。

7.8 延遲關閉連接

多個channel可能通過引用計數引用同一個連接,當引用某個連接的最后一個channel析構時,該連接將被關閉。但在一些場景中,channel在使用前才被創建,用完立刻析構,這時其中一些連接就會被無謂地關閉再被打開,效果類似短連接。

一個解決辦法是用戶把所有或常用的channel緩存下來,這樣自然能避免channel頻繁產生和析構,但目前brpc沒有提供這樣一個utility,用戶自己(正確)實現有一些工作量。

另一個解決辦法是設置全局選項-defer_close_second,設置后引用計數清0時連接並不會立刻被關閉,而是會等待這么多秒再關閉,如果在這段時間內又有channel引用了這個連接,它會恢復正常被使用的狀態。不管channel創建析構有多頻繁,這個選項使得關閉連接的頻率有上限。這個選項的副作用是一些fd不會被及時關閉,如果延時被誤設為一個大數值,程序占據的fd個數可能會很大。

7.9 連接的緩沖區大小

-socket_recv_buffer_size設置所有連接的接收緩沖區大小,默認-1(不修改)

-socket_send_buffer_size設置所有連接的發送緩沖區大小,默認-1(不修改)

7.10 log_id

通過set_log_id()可設置64位整型log_id。這個id會和請求一起被送到服務器端,一般會被打在日志里,從而把一次檢索經過的所有服務串聯起來。字符串格式的需要轉化為64位整形才能設入log_id。

7.11 附件

baidu_std和hulu_pbrpc協議支持附件,這段數據由用戶自定義,不經過protobuf的序列化。站在client的角度,設置在Controller::request_attachment()的附件會被server端收到,response_attachment()則包含了server端送回的附件。附件不受壓縮選項影響。

在http/h2協議中,附件對應message body,比如要POST的數據就設置在request_attachment()中。

7.12 開啟SSL

要開啟SSL,首先確保代碼依賴了最新的openssl庫。如果openssl版本很舊,會有嚴重的安全漏洞,支持的加密算法也少,違背了開啟SSL的初衷。 然后設置ChannelOptions.mutable_ssl_options(),具體選項見ssl_options.h。

ChannelOptions.has_ssl_options()可查詢是否設置過ssl_options, ChannelOptions.ssl_options()可訪問到設置過的只讀ssl_options。

// 開啟客戶端SSL並使用默認值。
options.mutable_ssl_options();

// 開啟客戶端SSL並定制選項。
options.mutable_ssl_options()->ciphers_name = "...";
options.mutable_ssl_options()->sni_name = "...";
  • 連接單點和集群的Channel均可以開啟SSL訪問(初始實現曾不支持集群)。
  • 開啟后,該Channel上任何協議的請求,都會被SSL加密后發送。如果希望某些請求不加密,需要額外再創建一個Channel。
  • 針對HTTPS做了些易用性優化:Channel.Init能自動識別 https:// 前綴並自動開啟SSL;開啟-http_verbose也會輸出證書信息。

7.13 認證

client端的認證一般分為2種:

  1. 基於請求的認證:每次請求都會帶上認證信息。這種方式比較靈活,認證信息中可以含有本次請求中的字段,但是缺點是每次請求都會需要認證,性能上有所損失。
  2. 基於連接的認證:當TCP連接建立后,client發送認證包,認證成功后,后續該連接上的請求不再需要認證。相比前者,這種方式靈活度不高(一般認證包里只能攜帶本機一些靜態信息),但性能較好,一般用於單連接/連接池場景。

針對第一種認證場景,在實現上非常簡單,將認證的格式定義加到請求結構體中,每次當做正常RPC發送出去即可;針對第二種場景,brpc提供了一種機制,只要用戶繼承實現:

class Authenticator {
public:
    virtual ~Authenticator() {}

    // Implement this method to generate credential information
    // into `auth_str' which will be sent to `VerifyCredential'
    // at server side. This method will be called on client side.
    // Returns 0 on success, error code otherwise
    virtual int GenerateCredential(std::string* auth_str) const = 0;
};

那么當用戶並發調用RPC接口用單連接往同一個server發射請求時,框架會自動保證:建立TCP連接后,連接上的第一個請求中會帶有上述GenerateCredential產生的認證包,其余剩下的並發請求不會帶有認證信息,依次排在第一個請求之后。整個發送過程依舊是並發的,並不會等第一個請求先返回。若server端認證成功,那么所有請求都能成功返回;若認證失敗,一般server端則會關閉連接,這些請求則會收到相應錯誤。

目前自帶協議中支持客戶端認證的有:baidu_std,HTTP,hulu_pbrpc,ESP。對於自定義協議,一般可以在組裝請求階段,調用Authenticator接口生成認證串,來支持客戶端認證。

7.14 重置

調用Reset方法可讓Controller回到剛創建時的狀態。

別在RPC結束前重置Controller,行為是未定義的。

7.15 壓縮

set_request_compress_type()設置request的壓縮方式,默認不壓縮。

注意:附件不會被壓縮。

http/h2 body的壓縮方法見 ...<失效鏈接>

支持的壓縮方法有:

  • snappy
  • gzip
  • zlib

8. FAQ

Q:brpc能用unix domain socket嗎

不能。統計TCP socket並不走網絡,相比unix domain socket性能只會略微下降。一些不能用TCP socket的特殊場景可能會需要,以后可能會擴展支持。

Q:Fail to connect to xx.xx.xx.xx:xxxx, Connection refused

一般是對端server沒有打開端口(很可能掛了)。

Q:經常遇到至另一個機房的Connection timedout

這個就是連接超時了,調大連接和RPC超時:

struct ChannelOptions {
    ...
    // Issue error when a connection is not established after so many
    // milliseconds. -1 means wait indefinitely.
    // Default: 200 (milliseconds)
    // Maximum: 0x7fffffff (roughly 30 days)
    int32_t connect_timeout_ms;
    
    // Max duration of RPC over this Channel. -1 means wait indefinitely.
    // Overridable by Controller.set_timeout_ms().
    // Default: 500 (milliseconds)
    // Maximum: 0x7fffffff (roughly 30 days)
    int32_t timeout_ms;
    ...
};

注意:連接超時不是RPC超時,RPC超時打印的日志是“Reached timeout=...”。

Q:為什么同步方式是好的,異步就crash了

重點檢查Controller,Response和done的生命周期。在異步訪問中,RPC調用結束並不意味着RPC整個過程結束,而是在進入done->Run()時才會結束。所以這些對象不應在調用RPC后就釋放,而是要在done->Run()里釋放。你一般不能把這些對象分配在棧上,而是應該分配在堆上。

Q:怎么確保請求只被處理一次

這不是RPC層面的事情。當response返回且成功時,我們確認這個過程一定成功了。當response返回且失敗時,我們確認這個過程一定失敗了。但當response沒有返回時,它可能失敗,也可能成功。如果我們選擇重試,那一個成功的過程也可能會被再執行一次。一般來說帶副作用的RPC服務都應當考慮冪等問題,否則重試可能會導致多次疊加副作用而產生意想不到的結果。只有讀的檢索服務大都沒有副作用而天然冪等,無需特殊處理。而帶寫的存儲服務則要在設計時就加入版本號或序列號之類的機制以拒絕已經發生的過程,保證冪等。

Q:Invalid address=`bns://group.user-persona.dumi.nj03'

FATAL 04-07 20:00:03 7778 src/brpc/channel.cpp:123] Invalid address=`bns://group.user-persona.dumi.nj03'. You should use Init(naming_service_name, load_balancer_name, options) to access multiple servers.

訪問命名服務要使用三個參數的Init,其中第二個參數是load_balancer_name,而這里用的是兩個參數的Init,框架認為是訪問單點,就會報這個錯。

Q:兩端都用protobuf,為什么不能互相訪問

協議 != protobuf。protobuf負責一個包的序列化,協議中的一個消息可能會包含多個protobuf包,以及額外的長度、校驗碼、magic number等等。打包格式相同不意味着協議可以互通。在brpc中寫一份代碼就能服務多協議的能力是通過把不同協議的數據轉化為統一的編程接口完成的,而不是在protobuf層面。 

Q:為什么C++ client/server能夠互相通信,和其他語言的client/server通信會報序列化失敗的錯誤

檢查一下C++版本是否開啟了壓縮(Controller::set_compress_type),目前其他語言的rpc框架還沒有實現壓縮,互相返回會出現問題。

9. Client端基本流程

主要步驟:

  1. 創建一個bthread_id作為本次RPC的correlation_id。
  2. 根據Channel的創建方式,從進程級的SocketMap中或從LoadBalancer中選擇一台下游server作為本次RPC發送的目的地。
  3. 根據連接方式(單連接、連接池、短連接),選擇一個Socket。
  4. 如果開啟驗證且當前Socket沒有被驗證過,第一個請求進入驗證分支,其余請求會阻塞直到第一個包含認證信息的請求寫入Socket。server端只對第一個請求進行驗證。
  5. 根據Channel的協議,選擇對應的序列化函數把request序列化至IOBuf。
  6. 如果配置了超時,設置定時器。從這個點開始要避免使用Controller對象,因為在設定定時器后隨時可能觸發超時 -> 調用到用戶的超時回調 -> 用戶在回調中析構Controller。
  7. 發送准備階段結束,若上述任何步驟出錯,會調用Channel::HandleSendFailed。
  8. 將之前序列化好的IOBuf寫出到Socket上,同時傳入回調Channel::HandleSocketFailed,當連接斷開、寫失敗等錯誤發生時會調用此回調。
  9. 如果是同步發送,Join correlation_id;否則至此CallMethod結束。
  10. 網絡上發消息+收消息。
  11. 收到response后,提取出其中的correlation_id,在O(1)時間內找到對應的Controller。這個過程中不需要查找全局哈希表,有良好的多核擴展性。
  12. 根據協議格式反序列化response。
  13. 調用Controller::OnRPCReturned,可能會根據錯誤碼判斷是否需要重試,或讓RPC結束。如果是異步發送,調用用戶回調。最后摧毀correlation_id喚醒Join着的線程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM