注:sentinel集群官方有自己的demo,地址:https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-cluster
1.初始化動態數據源的流控規則
private void initDynamicRuleProperty() {
// 從nacos讀取限流的規則,使用Lambda處理返回的數據,得到想要的數據類型
// remoteAddress 為nacos的地址
// groupId 為nacos配置對應的groupId
// flowDataId 為nacos配置對應的dataId
// 讀取到的數據source為字符串
ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
// 將讀取到的流控規則,register到對應的類
FlowRuleManager.register2Property(ruleSource.getProperty());
// 從nacos讀取參數限流的規則,使用Lambda處理返回的數據,得到想要的數據類型
ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
// 將讀取到的流控規則,register到對應的類
ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
}
流控規則配置的示例:
[
{
"resource" : "/test/{name}",
"grade" : 1,
"count" : 2,
"clusterMode" : true,
"clusterConfig" : {
"flowId" : 333,
"thresholdType" : 1,
"fallbackToLocalWhenFail" : true
}
}
]
resource : 資源名,限流規則作用對象,一般為請求URI
limitApp : 控流針對的調用來源,default則不區分調用來源
grade : 限流閾值類型; 0表明根據並發數量來限流,1表明根據QPS來進行流量控制
count : 限流閾值
strategy : 調用關系限流策略,(0:直接,1:關聯,2:鏈路)
controlBehavior : 限流控制行為(快速失敗 、warm up 、排隊等候)
clusterMode : 是否為集群模式
若集群開啟:
clusterConfig下為集群流控相關配置
thresholdType:集群閾值模式(0:單機均攤,1:總體閾值)
fallbackToLocalWhenFail: 失敗退化(選擇為true,不選擇為false)
2. 設置集群的客戶端的參數
private void initClientConfigProperty() {
// 從nacos讀取配置,設置集群的客戶端參數
ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
//將客戶端的參數進行認證
ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
}
客戶端參數示例
{
"requestTimeout": 300 // 請求超時時間
}
3. 判斷當前服務是否是token server ,設置交互的端口
private void initServerTransportConfigProperty() {
// 從nacos讀取數據,獲取存在哪些服務器及端口,哪台機械是server端
ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.flatMap(this::extractServerTransportConfig) // 邏輯運算,判斷是否此服務是token server,如果是設置端口號,超時時間
.orElse(null);
});
// 認證token server 的端口,超時時間
ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
}
private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
return groupList.stream()
.filter(this:: private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
return getCurrentMachineId().equals(group.getMachineId());
}
private String getCurrentMachineId() {
// Note: this may not work well for container-based env.
// return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
})
.findAny()
.map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
}
private static final String SEPARATOR = "@";
配置示例:
[
{
"clientSet":["192.168.164.1@8723"],
"ip":"192.168.164.1",
"machineId":"192.168.164.1@8722",
"port":11111
}
]
4. 將流控的規則告知整個集群
private void registerClusterRuleSupplier() {
// Register cluster flow rule property supplier which creates data source by namespace.
// Flow rule dataId format: ${namespace}-flow-rules
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
return ds.getProperty();
});
// Register cluster parameter flow rule property supplier which creates data source by namespace.
ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
namespace + DemoConstants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
return ds.getProperty();
});
}
5. 告知集群的客戶端管理器,token server服務所在的地址及端口
private void initClientServerAssignProperty() {
// Cluster map format:
// [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
// machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.flatMap(this::extractClientAssignment) //具體的邏輯算法, 確認token server 的地址及端口
.orElse(null);
});
// 這里主要認證2個參數 ServerHost ServerPort
ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
}
6. 初始化,服務的狀態,當一個服務初始化啟動時,確認此服務是token server 還是 client
private void initStateProperty() {
// Cluster map format:
// [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
// machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.map(this::extractMode)
.orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
});
ClusterStateManager.registerProperty(clusterModeDs.getProperty());
}
本文僅代表個人的觀點,如有錯誤請指正
