注:sentinel集群官方有自己的demo,地址:https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-cluster
1.初始化动态数据源的流控规则
private void initDynamicRuleProperty() {
// 从nacos读取限流的规则,使用Lambda处理返回的数据,得到想要的数据类型
// remoteAddress 为nacos的地址
// groupId 为nacos配置对应的groupId
// flowDataId 为nacos配置对应的dataId
// 读取到的数据source为字符串
ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
// 将读取到的流控规则,register到对应的类
FlowRuleManager.register2Property(ruleSource.getProperty());
// 从nacos读取参数限流的规则,使用Lambda处理返回的数据,得到想要的数据类型
ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
// 将读取到的流控规则,register到对应的类
ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
}
流控规则配置的示例:
[
{
"resource" : "/test/{name}",
"grade" : 1,
"count" : 2,
"clusterMode" : true,
"clusterConfig" : {
"flowId" : 333,
"thresholdType" : 1,
"fallbackToLocalWhenFail" : true
}
}
]
resource : 资源名,限流规则作用对象,一般为请求URI
limitApp : 控流针对的调用来源,default则不区分调用来源
grade : 限流阈值类型; 0表明根据并发数量来限流,1表明根据QPS来进行流量控制
count : 限流阈值
strategy : 调用关系限流策略,(0:直接,1:关联,2:链路)
controlBehavior : 限流控制行为(快速失败 、warm up 、排队等候)
clusterMode : 是否为集群模式
若集群开启:
clusterConfig下为集群流控相关配置
thresholdType:集群阈值模式(0:单机均摊,1:总体阈值)
fallbackToLocalWhenFail: 失败退化(选择为true,不选择为false)
2. 设置集群的客户端的参数
private void initClientConfigProperty() {
// 从nacos读取配置,设置集群的客户端参数
ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
//将客户端的参数进行认证
ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
}
客户端参数示例
{
"requestTimeout": 300 // 请求超时时间
}
3. 判断当前服务是否是token server ,设置交互的端口
private void initServerTransportConfigProperty() {
// 从nacos读取数据,获取存在哪些服务器及端口,哪台机械是server端
ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.flatMap(this::extractServerTransportConfig) // 逻辑运算,判断是否此服务是token server,如果是设置端口号,超时时间
.orElse(null);
});
// 认证token server 的端口,超时时间
ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
}
private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
return groupList.stream()
.filter(this:: private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
return getCurrentMachineId().equals(group.getMachineId());
}
private String getCurrentMachineId() {
// Note: this may not work well for container-based env.
// return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
})
.findAny()
.map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
}
private static final String SEPARATOR = "@";
配置示例:
[
{
"clientSet":["192.168.164.1@8723"],
"ip":"192.168.164.1",
"machineId":"192.168.164.1@8722",
"port":11111
}
]
4. 将流控的规则告知整个集群
private void registerClusterRuleSupplier() {
// Register cluster flow rule property supplier which creates data source by namespace.
// Flow rule dataId format: ${namespace}-flow-rules
ClusterFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
return ds.getProperty();
});
// Register cluster parameter flow rule property supplier which creates data source by namespace.
ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
namespace + DemoConstants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
return ds.getProperty();
});
}
5. 告知集群的客户端管理器,token server服务所在的地址及端口
private void initClientServerAssignProperty() {
// Cluster map format:
// [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
// machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.flatMap(this::extractClientAssignment) //具体的逻辑算法, 确认token server 的地址及端口
.orElse(null);
});
// 这里主要认证2个参数 ServerHost ServerPort
ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
}
6. 初始化,服务的状态,当一个服务初始化启动时,确认此服务是token server 还是 client
private void initStateProperty() {
// Cluster map format:
// [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
// machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
clusterMapDataId, source -> {
List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
return Optional.ofNullable(groupList)
.map(this::extractMode)
.orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
});
ClusterStateManager.registerProperty(clusterModeDs.getProperty());
}
本文仅代表个人的观点,如有错误请指正