為什么需要集群流控呢?假設需要將某個API的總qps限制在100,機器數可能為50,這時很自然的想到使用一個專門的server來統計總的調用量,其他實例與該server通信來判斷是否可以調用,這就是基本的集群流控方式,sentinel的實現就是這樣的。
如果服務調用使用輪訓或者隨機路由方式,理論上可以通過在各個單機上設置流控規則即可(單機qps上限=總qps上限 / 機器數)。集群流控可以解決流量分配不均的問題導致總體流控效果不佳的問題,其可以精確地控制整個集群的調用總量,結合單機限流兜底,可以更好地發揮流量控制的效果,不過由於會與server進行通信,所以性能上會有一定損耗。
集群流控中共有兩種身份:
- Token Client:集群流控客戶端,用於向所屬 Token Server 通信請求 token。集群限流服務端會返回給客戶端結果,決定是否限流。
- Token Server:即集群流控服務端,處理來自 Token Client 的請求,根據配置的集群規則判斷是否應該發放 token(是否允許通過)。
Sentinel 1.4.0 開始引入了集群流控模塊,主要包含以下幾部分:
sentinel-cluster-common-default: 公共模塊,包含公共接口和實體sentinel-cluster-client-default: 默認集群流控 client 模塊,使用 Netty 進行通信,提供接口方便序列化協議擴展sentinel-cluster-server-default: 默認集群流控 server 模塊,使用 Netty 進行通信,提供接口方便序列化協議擴展;同時提供擴展接口對接規則判斷的具體實現(TokenService),默認實現是復用 sentinel-core 的相關邏輯
大致了解集群流控概念之后,下面一起分析下集群流控規則、client端和server端各自處理機制~
集群流控規則
FlowRule 添加了兩個字段用於集群限流相關配置,如下所示。clusterMode在方法FlowRuleChecker.canPassCheck中會用到進行判斷是否是集群流控,false表示單機流控;true表示集群流控,會調用方法passClusterCheck與集群流控server端通信判斷是否觸發了流控,此時異常降級策略為本地流控(fallbackToLocalOrPass方法,fallbackToLocalWhenFail屬性為true時執行本地流控,否則直接返回ture不走流控檢查)。
1 private boolean clusterMode; // 標識是否為集群限流配置 2 private ClusterFlowConfig clusterConfig; // 集群限流相關配置項 3 4 // ClusterFlowConfig屬性 5 private Long flowId; // 全局唯一的規則 ID,由集群限流管控端分配. 6 private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; // 閾值模式,默認(0)為單機均攤,1 為全局閾值. 7 private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; 8 private boolean fallbackToLocalWhenFail = true; // 在 client 連接失敗或通信失敗時,是否退化到本地的限流模式 9 10 public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { 11 String limitApp = rule.getLimitApp(); 12 if (limitApp == null) { 13 return true; 14 } 15 if (rule.isClusterMode()) {// 集群模式 16 return passClusterCheck(rule, context, node, acquireCount, prioritized); 17 } 18 // 單機模式流控 19 return passLocalCheck(rule, context, node, acquireCount, prioritized); 20 }
- flowId 代表全局唯一的規則 ID,Sentinel 集群限流服務端通過此 ID 來區分各個規則,因此務必保持全局唯一。一般 flowId 由統一的管控端進行分配,或寫入至 DB 時生成。
- thresholdType 代表集群限流閾值模式。單機均攤模式表示總qps閾值等於機器數*單機qps閾值;全局閾值等於整個集群配置的閾值。
- strategy 集群策略,默認
FLOW_CLUSTER_STRATEGY_NORMAL,針對ClusterFlowConfig配置該屬性為FLOW_CLUSTER_STRATEGY_NORMAL才合法,除此之外,暫無太多業務意義。
client端處理機制
client端的處理機制和單機是一樣的,只不過clusterMode和clusterConfig屬性配置上了而已,具體的client使用可以參考官方文檔 集群流控,這里不再贅述。如果是集群流控,在FlowRuleChecker.canPassCheck方法中會調用方法passClusterCheck,如下:
1 private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, 2 boolean prioritized) { 3 try { 4 TokenService clusterService = pickClusterService(); 5 if (clusterService == null) { 6 // 為null降級處理 7 return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); 8 } 9 long flowId = rule.getClusterConfig().getFlowId(); 10 TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); 11 return applyTokenResult(result, rule, context, node, acquireCount, prioritized); 12 } catch (Throwable ex) { 13 RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); 14 } 15 // 降級處理 本地限流 16 return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); 17 }
requestToken負責與token server端通信,入參包括flowId, acquireCount, prioritized,這里是沒有Resource信息的,server端通過flowid來獲取對應規則進行流控判斷。注意,調用writeAndFlush發送請求之后等待響應結果,最大等待時間ClusterClientConfigManager.getRequestTimeout();請求發送過程中,出現任何異常或者返回錯誤(這里不包括BLOCKED情況),都會默認走降級本地流控邏輯:fallbackToLocalOrPass。
了解了client端處理流程,接下來看下server端處理流程,client和server端都是用netty作為底層網絡通信服務,關於netty的原理不是本文討論的重點因此會簡單帶過。如果小伙伴們還不太熟悉netty,請參閱對應資料即可。對於netty,每個Java開發者都需要了解甚至是熟悉的,這樣不僅僅幫助我們理解NIO及Reactor模型,還能再閱讀基於netty的框架源碼(比如dubbo/rocketmq等)時,將重點關注在框架本身實現上,而不是網絡通信流程及細節上。
server端處理機制
Sentinel 集群限流服務端有兩種啟動方式:
- 獨立模式(Alone),即作為獨立的 token server 進程啟動,獨立部署,隔離性好,但是需要額外的部署操作。獨立模式適合作為 Global Rate Limiter 給集群提供流控服務。

- 嵌入模式(Embedded),即作為內置的 token server 與服務在同一進程中啟動。在此模式下,集群中各個實例都是對等的,token server 和 client 可以隨時進行轉變,因此無需單獨部署,靈活性比較好。但是隔離性不佳,需要限制 token server 的總 QPS,防止影響應用本身。嵌入模式適合某個應用集群內部的流控。

目前針對token server高可用,sentinel並沒有對應的解決方案,不過沒有並不意味着沒考慮,因為默認可以降級走本地流控。sentinel作為一個限流組件,在大部分應用場景中,如果token server掛了降級為本地流控就可以滿足了。
如果必須考慮token server高可用,可考慮token server集群部署,每個token server都能訪問(或存儲)全量規則數據,多個client通過特定路由規則分配到不同的token server(相同類型服務路由到同一個token server,不同類型服務可路由到不同token server),token server故障時提供failover機制即可。如果此時考慮到相同類型服務出現網絡分區,也就是一部分服務可以正常與token server通信,另一個部分服務無法正常與token server通信,如果無法正常通信的這部分服務直接進行failover,會導致集群限流不准的問題,可通過zookeeper來保存在線的token server,如果zookeeper中token server列表有變化,再進行failover;此情況下再出現任何形式的網絡分區,再執行降級邏輯,執行本地限流。
server端不管是獨立模式還是嵌入模式,都是通過NettyTransportServer來啟動的:
public void start() { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); p.addLast(new NettyRequestDecoder()); p.addLast(new LengthFieldPrepender(2)); p.addLast(new NettyResponseEncoder()); p.addLast(new TokenServerHandler(connectionPool)); } }); b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() { // }); }
以上邏輯主要是netty啟動邏輯,重點關注initChannel方法,這些是往pipeline添加自定義channelHandler,主要是處理粘包、編解碼器和業務處理Handler,這里最重要的是TokenServerHandler,因為是請求處理邏輯,所以重點關注其channelRead方法:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 // 全局保存channel 3 globalConnectionPool.refreshLastReadTime(ctx.channel()); 4 if (msg instanceof ClusterRequest) { 5 ClusterRequest request = (ClusterRequest)msg; 6 if (request.getType() == ClusterConstants.MSG_TYPE_PING) { 7 // ping請求處理,會記錄namespace信息 8 handlePingRequest(ctx, request); 9 return; 10 } 11 // 根據request type獲取對應處理器 12 // 針對集群流控,type為MSG_TYPE_FLOW 13 RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType()); 14 ClusterResponse<?> response = processor.processRequest(request); 15 writeResponse(ctx, response); 16 } 17 }
針對集群流控,type為MSG_TYPE_FLOW,對應處理器為FlowRequestProcessor。首先會提取請求入參 flowId, acquireCount, prioritized,主要步驟如下:
- 根據flowId獲取規則,為空返回結果NO_RULE_EXISTS;
- 獲取請求namespace對應的RequestLimiter,非空時進行tryPass限流檢查,該檢查是針對namespace維度;
- 針對flowId對應規則進行限流檢查,acquireCount表示該請求需要獲取的token數,數據檢查基於滑動時間窗口統計來判斷的。
根據限流規則檢查之后,會統計相關的PASS/BLOCK/PASS_REQUEST/BLOCK_REQUEST等信息,該流程和單機流控流程是類似的,具體代碼不再贅述。處理完成之后,會返回client端處理結果,至此整個集群流控流程就分析完了。
往期精選
- sentinel 核心概念
- 你的Redis有類轉換異常么
- Redis 基礎數據結構
- 別再問我ConcurrentHashMap了
- 分布式鎖設計與實現
- ConcurrentHashMap竟然也有死循環問題?
- 你的ThreadLocal線程安全么
- MySQL基礎概念知多少
覺得文章不錯,對你有所啟發和幫助,希望能轉發給更多的小伙伴。如果有問題,請關注下面公眾號,發送問題給我,多謝。
歡迎小伙伴關注【TopCoder】閱讀更多精彩好文。

