之前我們了解了 Sentinel 集成 SpringBoot實現限流,也探討了Sentinel的限流基本原理,那么接下去我們來學習一下Sentinel整合Dubbo及 Nacos 實現動態數據源的限流以及分布式限流。
先來看一下我的工程目錄:
單服務的限流:
Provider :
首先從 api 模塊開始:
其中只是定義了一個接口:
public interface SentinelService { String sayHello(String txt); }
接下去來編寫服務端的代碼。
1.首先需要添加我們需要的依賴:
<dependency> <groupId>com.wuzz.demo</groupId> <artifactId>sentinel-dubbo-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-dubbo-adapter</artifactId> <version>1.6.3</version> </dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.6.3</version>
</dependency>
2.我們需要編寫接口的實現類並且發布成Dubbo服務:
@Service//把當前服務發布成dubbo服務 public class SentinelServiceImpl implements SentinelService { @Override public String sayHello(String txt) { return "hello world :" + LocalDateTime.now(); } }
3.添加 Dubbo相關配置,這里采用注解的方式:
@Configuration @DubboComponentScan("com.wuzz.demo") public class DubboConfig { @Bean public ApplicationConfig applicationConfig(){ ApplicationConfig applicationConfig=new ApplicationConfig(); applicationConfig.setName("sentinel-dubbo"); applicationConfig.setOwner("wuzz"); return applicationConfig; } @Bean public RegistryConfig registryConfig(){ RegistryConfig registryConfig=new RegistryConfig(); registryConfig.setAddress("zookeeper://192.168.1.101:2181"); return registryConfig; } @Bean public ProtocolConfig protocolConfig(){ ProtocolConfig protocolConfig=new ProtocolConfig(); protocolConfig.setName("dubbo"); protocolConfig.setPort(20880); return protocolConfig; } }
4.配置文件 application.properties:
server.port = 8881
5.編寫主啟動類:
@SpringBootApplication public class SentinelProviderApplication { public static void main(String[] args) throws IOException { initFlowRules(); SpringApplication.run(SentinelProviderApplication.class, args); System.in.read(); } //初始化規則 private static void initFlowRules() { List<FlowRule> rules = new ArrayList<>(); //限流規則的集合 FlowRule flowRule = new FlowRule(); flowRule.setResource("com.wuzz.demo.SentinelService:sayHello(java.lang.String)");//資源(方法名稱、接口) flowRule.setCount(10);//限流閾值 qps=10 flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);//限流閾值類型(QPS 或並發線程數) //流量控制手段(直接拒絕、Warm Up、勻速排隊) flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); flowRule.setLimitApp("sentinel-web");//流控針對的調用來源,若為 default 則不區分調用來源 rules.add(flowRule); FlowRuleManager.loadRules(rules); } }
設置限流的基准:
Service Provider 用於向外界提供服務,處理各個消費者的調用請求。為了保護 Provider 不被激增的流量拖垮影響穩定性,可以給 Provider 配置 QPS 模式的限流,這樣當每秒的請求量超過設定的閾值時會自動拒絕多的請求。限流粒度可以是服務接口和服務方法兩種粒度。若希望整個服務接口的 QPS 不超過一定數值,則可以為對應服務接口資源(resourceName 為接口全限定名)配置 QPS 閾值;若希望服務的某個方法的 QPS 不超過一定數值,則可以為對應服務方法資源(resourceName 為接口全限定名:方法簽名)配置 QPS 閾值.
LimitApp:
很多場景下,根據調用方來限流也是非常重要的。比如有兩個服務 A 和 B 都向 Service Provider 發起調用請求,我們希望只對來自服務 B 的請求進行限流,則可以設置限流規則的 limitApp 為服務 B 的名稱。Sentinel Dubbo Adapter 會自動解析 Dubbo 消費者(調用方)的 application name 作為調用方名稱(origin),在進行資源保護的時候都會帶上調用方名稱。若限流規則未配置調用方(default),則該限流規則對所有調用方生效。若限流規則配置了調用方則限流規則將僅對指定調用方生效。
注:Dubbo 默認通信不攜帶對端 application name 信息,因此需要開發者在調用端手動將 applicationname 置入 attachment 中,provider 端進行相應的解析。Sentinel Dubbo Adapter 實現了一個 Filter 用於自動從 consumer 端向 provider 端透傳 application name。若調用端未引入 Sentinel DubboAdapter,又希望根據調用端限流,可以在調用端手動將 application name 置入 attachment 中,key 為dubboApplication.
ControlBehavior:
當 QPS 超過某個閾值的時候,則采取措施進行流量控制。流量控制的手段包括以下幾種:直接拒絕、Warm Up、勻速排隊。對應 FlowRule 中的 controlBehavior 字段
- 直接拒絕(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默認的流量控制方式,當QPS超過任意規則的閾值后,新的請求就會被立即拒絕,拒絕方式為拋出FlowException。這種方式適用於對系統處理能力確切已知的情況下,比如通過壓測確定了系統的准確水位時
- Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即預熱/冷啟動方式,當系統長期處於低並發的情況下,流量突然增加到qps的最高峰值,可能會造成系統的瞬間流量過大把系統壓垮。所以warmup,相當於處理請求的數量是緩慢增加,經過一段時間以后,到達系統處理請求個數的最大值
- 勻速排隊(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式會嚴格控制請求通過的間隔時間,也即是讓請求以均勻的速度通過,對應的是漏桶算法它的原理是,以固定的間隔時間讓請求通過。當請求過來的時候,如果當前請求距離上個通過的請求通過的時間間隔不小於預設值,則讓當前請求通過;否則,計算當前請求的預期通過時間,如果該請求的預期通過時間小於規則預設的 timeout 時間,則該請求會等待直到預設時間到來通過;反之,則馬上拋出阻塞異常。可以設置一個最長排隊等待時間: flowRule.setMaxQueueingTimeMs(5 * 1000); // 最長排隊等待時間:5s這種方式主要用於處理間隔性突發的流量,例如消息隊列。想象一下這樣的場景,在某一秒有大量的請求到來,而接下來的幾秒則處於空閑狀態,我們希望系統能夠在接下來的空閑期間逐漸處理這些請求,而不是在第一秒直接拒絕多余的請求。
Consumer :
1.添加依賴:
<dependency> <groupId>com.wuzz.demo</groupId> <artifactId>sentinel-dubbo-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.7.1</version> </dependency>
2.consumer主要是對外提供服務的,我們需要一個controller:
@RestController public class SentinelController { @Reference(timeout = 3000,check = false) SentinelService sentinelService;//proxy$0 @GetMapping("/say") public String sayHello(){ RpcContext.getContext().setAttachment("dubboApplication","sentinel-web"); return sentinelService.sayHello("test"); } @GetMapping("/say2") public String sayHello2(){ return sentinelService.sayHello("test2"); } }
3.主啟動類:
@SpringBootApplication public class SentinelConsumerApplication { public static void main(String[] args) { SpringApplication.run(SentinelConsumerApplication.class, args); } }
4.配置文件 application.properties :
server.port = 8882 dubbo.registry.address=zookeeper://192.168.1.101:2181 dubbo.scan.base-packages=com.wuzz.demo dubbo.application.name=sentinel-web
服務端及客戶端代碼編寫完畢,這個時候我們需要先啟動Sentinel-Dashboard 以便直觀的去看到限流的效果。然后啟動服務端,並且在啟動時加入 JVM 參數 -Dcsp.sentinel.dashboard.server=localhost:8080 指定控制台地址和端口。其他具體的參數如下:
-server -XX:MaxHeapSize=128m -Xms256m -Xmx256m -XX:PermSize=128M -XX:MaxPermSize=256m -Dproject.name=app-demo -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true
然后啟動客戶端,通過 JMeter 進行壓測,結果如下:
另一方面我們啟動了 Sentinel-Dashboard 所以我們可以通過控制台查看:
如何實現分布式限流:
為什么要使用集群流控呢?假設我們希望給某個用戶限制調用某個 API 的總 QPS 為 50,但機器數可能很多(比如有 100 台)。這時候我們很自然地就想到,找一個 server 來專門來統計總的調用量,其它的實例都與這台 server 通信來判斷是否可以調用。這就是最基礎的集群流控的方式。
另外集群流控還可以解決流量不均勻導致總體限流效果不佳的問題。假設集群中有 10 台機器,我們給每台機器設置單機限流閾值為 10 QPS,理想情況下整個集群的限流閾值就為 100 QPS。不過實際情況下流量到每台機器可能會不均勻,會導致總量沒有到的情況下某些機器就開始限流。因此僅靠單機維度去限制的話會無法精確地限制總體流量。而集群流控可以精確地控制整個集群的調用總量,結合單機限流兜底,可以更好地發揮流量控制的效果。
集群流控中共有兩種身份:
- Token Client:集群流控客戶端,用於向所屬 Token Server 通信請求 token。集群限流服務端會返回給客戶端結果,決定是否限流。
- Token Server:即集群流控服務端,處理來自 Token Client 的請求,根據配置的集群規則判斷是否應該發放 token(是否允許通過)。
要想使用集群流控功能,我們需要在應用端配置動態規則源,並通過 Sentinel 控制台實時進行推送。如下圖所示:
搭建 Token-Server:
1.添加pom依賴:
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-cluster-server-default</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-transport-simple-http</artifactId> <version>1.6.3</version> </dependency>
2.dubbo中利用Nacos實現動態數據源要求實現 InitFunc 接口:
public class NacosDataSourceInitFunc implements InitFunc { private final String remoteAddress="localhost"; //nacos 配置中心的服務host private final String groupId="SENTINEL_GROUP"; private final String FLOW_POSTFIX="-flow-rules"; //dataid(names+postfix) //意味着當前的token-server會從nacos上獲得限流的規則 @Override public void init() throws Exception { ClusterFlowRuleManager.setPropertySupplier(namespace ->{ ReadableDataSource<String, List<FlowRule>> rds= new NacosDataSource<List<FlowRule>>(remoteAddress,groupId,namespace+FLOW_POSTFIX, source -> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){})); return rds.getProperty(); }); } }
然后需要在META-INF/services 下添加拓展點文件 com.alibaba.csp.sentinel.init.InitFunc 。其內容就是我們的實現:
com.wuzz.demo.NacosDataSourceInitFunc
3.主啟動類:
public class ClusterServer { public static void main(String[] args) throws Exception { ClusterTokenServer tokenServer=new SentinelDefaultTokenServer(); ClusterServerConfigManager.loadGlobalTransportConfig( new ServerTransportConfig().setIdleSeconds(600).setPort(9999)); ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton("app-wuzz")); //設置成動態 tokenServer.start(); } }
啟動Sentinel-dashboard:
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -XX:MaxHeapSize=128m -Xms256m -Xmx256m -XX:PermSize=128M -XX:MaxPermSize=256m -jar sentinel-dashboard-1.6.3.jar
啟動nacos以及增加配置:Data Id=app-wuzz-flow-rules,Group=SENTINEL_GROUP
[ { "resource":"com.wuzz.demo.SentinelService:sayHello(java.lang.String)", "grade":1, //限流模式 qps "count":10, // 限流總閾值 "clusterMode":true, //集群模式 true "clusterConfig":{ "flowId":100001,//全局唯一ID "thresholdType":1,//閾值模式,全局閾值 "fallbackToLocalWhenFail":true //client連接失敗使用本地限流模式 } } ]
啟動 Token-Server 並且添加以下JVM參數,將其加入到 Sentinel-Dashboard中進行管理:
-server -Dproject.name=app-wuzz -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true
電腦內存不足加入 -XX:MaxHeapSize=128m -Xms256m -Xmx256m -XX:PermSize=128M -XX:MaxPermSize=256m。服務啟動之后,在$user.home$/logs/csp/ 可以找到sentinel-record.log.pid*.date文件,如果看到日志文件中獲取到了遠程服務的信息,說明token-server啟動成功了,也可以通過Sentinel-Dashboard看到注冊的列表:
Provider :
1.添加pom依賴:
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-cluster-client-default</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-datasource-nacos</artifactId> <version>1.6.3</version> </dependency>
2.在META-INF/services 下添加拓展點文件 com.alibaba.csp.sentinel.init.InitFunc 。其內容就是我們的實現,剛剛哎token-server中配置的是直接從Nacos中獲取。在服務端配置如下:
public class NacosDataSourceInitFunc implements InitFunc { private final String CLUSTER_SERVER_HOST="localhost"; //token-server的地址 private final int CLUSTER_SERVER_PORT=9999; private final int REQUEST_TIME_OUT=200000; //請求超時時間 private final String APP_NAME="app-wuzz"; //namespace //nacos的配置() private final String remoteAddress="localhost"; //nacos 配置中心的服務host private final String groupId="SENTINEL_GROUP"; private final String FLOW_POSTFIX="-flow-rules"; //dataid(names+postfix) //意味着當前的token-server會從nacos上獲得限流的規則 @Override public void init() throws Exception { //加載集群-信息 loadClusterClientConfig(); registryClusterFlowRuleProperty(); } private void loadClusterClientConfig(){ ClusterClientAssignConfig assignConfig=new ClusterClientAssignConfig(); assignConfig.setServerHost(CLUSTER_SERVER_HOST); assignConfig.setServerPort(CLUSTER_SERVER_PORT); ClusterClientConfigManager.applyNewAssignConfig(assignConfig); ClusterClientConfig clientConfig=new ClusterClientConfig(); clientConfig.setRequestTimeout(REQUEST_TIME_OUT); ClusterClientConfigManager.applyNewConfig(clientConfig); } //注冊動態數據源 private void registryClusterFlowRuleProperty(){ ReadableDataSource<String, List<FlowRule>> rds= new NacosDataSource<List<FlowRule>>(remoteAddress,groupId,APP_NAME+FLOW_POSTFIX, source -> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){})); FlowRuleManager.register2Property(rds.getProperty()); } }
3.修改主啟動類:
@SpringBootApplication public class SentinelProviderApplication { public static void main(String[] args) throws IOException {//表示當前的節點是集群客戶端 ClusterStateManager.applyState(ClusterStateManager.CLUSTER_CLIENT); SpringApplication.run(SentinelProviderApplication.class, args); System.in.read(); } }
啟動服務端並且將其加入到Sentinel-Dashboard中,另外這里再添加JVM參數的時候需要注意,這里的project-name要包含在token-server中配置的namespace中,token server 會根據客戶端對應的 namespace(默認為 project.name 定義的應用名)下的連接數來計算總的閾值,我這里設置成app-wuzz.:
-server -XX:MaxHeapSize=128m -Xms256m -Xmx256m -XX:PermSize=128M -XX:MaxPermSize=256m -Dproject.name=app-wuzz -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true
服務啟動之后,在$user.home$/logs/csp/ 可以找到sentinel-record.log.pid*.date文件,如果看到日志文件中獲取到了token-server的信息,說明連接成功了。
由於我們要實現分布式限流,也就是需要部署我們的局群服務,我們可以利用IDEA來實現:添加一個 SentinelProviderApplication。同時運行兩個程序:
這里 JVM 參數需要多增加一個 -Ddubbo.protocol.port=20881 才可以。然后我們啟動兩個服務及客戶端。通過JMeter 壓測就可以看到結果(多發幾次請求),我們也可以直接看Sentinel-Dashboard:
就這樣實現了分布式限流。