sentinel集群,内嵌模式


注:sentinel集群官方有自己的demo,地址:https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-cluster

1.初始化动态数据源的流控规则

private void initDynamicRuleProperty() {
        // 从nacos读取限流的规则,使用Lambda处理返回的数据,得到想要的数据类型
        // remoteAddress 为nacos的地址
        // groupId 为nacos配置对应的groupId
        // flowDataId 为nacos配置对应的dataId
        // 读取到的数据source为字符串
        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
            flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        // 将读取到的流控规则,register到对应的类
        FlowRuleManager.register2Property(ruleSource.getProperty());

        // 从nacos读取参数限流的规则,使用Lambda处理返回的数据,得到想要的数据类型
        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
            paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        // 将读取到的流控规则,register到对应的类
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

流控规则配置的示例:

[
    {
        "resource" : "/test/{name}",
        "grade" : 1,
        "count" : 2,                   
        "clusterMode" :  true,
        "clusterConfig" : {
            "flowId" : 333,
            "thresholdType" : 1,      
            "fallbackToLocalWhenFail" : true
        }
    }
]

resource : 资源名,限流规则作用对象,一般为请求URI
limitApp : 控流针对的调用来源,default则不区分调用来源
grade :  限流阈值类型; 0表明根据并发数量来限流,1表明根据QPS来进行流量控制
count :  限流阈值
strategy : 调用关系限流策略,(0:直接,1:关联,2:链路)
controlBehavior : 限流控制行为(快速失败 、warm up 、排队等候)
clusterMode : 是否为集群模式 
若集群开启:
clusterConfig下为集群流控相关配置
thresholdType:集群阈值模式(0:单机均摊,1:总体阈值)
fallbackToLocalWhenFail: 失败退化(选择为true,不选择为false)

2. 设置集群的客户端的参数

private void initClientConfigProperty() {
    // 从nacos读取配置,设置集群的客户端参数
    ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
        configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
    //将客户端的参数进行认证
    ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
}

 客户端参数示例

{
    "requestTimeout": 300 // 请求超时时间
}

3. 判断当前服务是否是token server ,设置交互的端口

private void initServerTransportConfigProperty() {
        // 从nacos读取数据,获取存在哪些服务器及端口,哪台机械是server端
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
            return Optional.ofNullable(groupList)
                .flatMap(this::extractServerTransportConfig) // 逻辑运算,判断是否此服务是token server,如果是设置端口号,超时时间
                .orElse(null);
        });
        // 认证token server 的端口,超时时间
        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }
    
    private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
        return groupList.stream()
            .filter(this::    private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
        return getCurrentMachineId().equals(group.getMachineId());
    }

    private String getCurrentMachineId() {
        // Note: this may not work well for container-based env.
//        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
    })
            .findAny()
            .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
    }
    
	private static final String SEPARATOR = "@";

 配置示例:

[
    {
        "clientSet":["192.168.164.1@8723"],
        "ip":"192.168.164.1",
        "machineId":"192.168.164.1@8722",
        "port":11111
    }
]

 4. 将流控的规则告知整个集群

private void registerClusterRuleSupplier() {
    // Register cluster flow rule property supplier which creates data source by namespace.
    // Flow rule dataId format: ${namespace}-flow-rules
    ClusterFlowRuleManager.setPropertySupplier(namespace -> {
        ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
            namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        return ds.getProperty();
    });
    // Register cluster parameter flow rule property supplier which creates data source by namespace.
    ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
        ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
            namespace + DemoConstants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        return ds.getProperty();
    });
}

 5. 告知集群的客户端管理器,token server服务所在的地址及端口

private void initClientServerAssignProperty() {
    // Cluster map format:
    // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
    // machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
    ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
        clusterMapDataId, source -> {
        List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
        return Optional.ofNullable(groupList)
            .flatMap(this::extractClientAssignment) //具体的逻辑算法, 确认token server 的地址及端口
            .orElse(null);
    });
    // 这里主要认证2个参数 ServerHost ServerPort
    ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
}

 6. 初始化,服务的状态,当一个服务初始化启动时,确认此服务是token server 还是 client

private void initStateProperty() {
    // Cluster map format:
    // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
    // machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
    ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
        clusterMapDataId, source -> {
        List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
        return Optional.ofNullable(groupList)
            .map(this::extractMode)
            .orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
    });
    ClusterStateManager.registerProperty(clusterModeDs.getProperty());
}

本文仅代表个人的观点,如有错误请指正


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM