sentinel集群,內嵌模式


注:sentinel集群官方有自己的demo,地址:https://github.com/alibaba/Sentinel/tree/master/sentinel-demo/sentinel-demo-cluster

1.初始化動態數據源的流控規則

private void initDynamicRuleProperty() {
        // 從nacos讀取限流的規則,使用Lambda處理返回的數據,得到想要的數據類型
        // remoteAddress 為nacos的地址
        // groupId 為nacos配置對應的groupId
        // flowDataId 為nacos配置對應的dataId
        // 讀取到的數據source為字符串
        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
            flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        // 將讀取到的流控規則,register到對應的類
        FlowRuleManager.register2Property(ruleSource.getProperty());

        // 從nacos讀取參數限流的規則,使用Lambda處理返回的數據,得到想要的數據類型
        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
            paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        // 將讀取到的流控規則,register到對應的類
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

流控規則配置的示例:

[
    {
        "resource" : "/test/{name}",
        "grade" : 1,
        "count" : 2,                   
        "clusterMode" :  true,
        "clusterConfig" : {
            "flowId" : 333,
            "thresholdType" : 1,      
            "fallbackToLocalWhenFail" : true
        }
    }
]

resource : 資源名,限流規則作用對象,一般為請求URI
limitApp : 控流針對的調用來源,default則不區分調用來源
grade :  限流閾值類型; 0表明根據並發數量來限流,1表明根據QPS來進行流量控制
count :  限流閾值
strategy : 調用關系限流策略,(0:直接,1:關聯,2:鏈路)
controlBehavior : 限流控制行為(快速失敗 、warm up 、排隊等候)
clusterMode : 是否為集群模式 
若集群開啟:
clusterConfig下為集群流控相關配置
thresholdType:集群閾值模式(0:單機均攤,1:總體閾值)
fallbackToLocalWhenFail: 失敗退化(選擇為true,不選擇為false)

2. 設置集群的客戶端的參數

private void initClientConfigProperty() {
    // 從nacos讀取配置,設置集群的客戶端參數
    ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
        configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
    //將客戶端的參數進行認證
    ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
}

 客戶端參數示例

{
    "requestTimeout": 300 // 請求超時時間
}

3. 判斷當前服務是否是token server ,設置交互的端口

private void initServerTransportConfigProperty() {
        // 從nacos讀取數據,獲取存在哪些服務器及端口,哪台機械是server端
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
            return Optional.ofNullable(groupList)
                .flatMap(this::extractServerTransportConfig) // 邏輯運算,判斷是否此服務是token server,如果是設置端口號,超時時間
                .orElse(null);
        });
        // 認證token server 的端口,超時時間
        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }
    
    private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
        return groupList.stream()
            .filter(this::    private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
        return getCurrentMachineId().equals(group.getMachineId());
    }

    private String getCurrentMachineId() {
        // Note: this may not work well for container-based env.
//        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
    })
            .findAny()
            .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
    }
    
	private static final String SEPARATOR = "@";

 配置示例:

[
    {
        "clientSet":["192.168.164.1@8723"],
        "ip":"192.168.164.1",
        "machineId":"192.168.164.1@8722",
        "port":11111
    }
]

 4. 將流控的規則告知整個集群

private void registerClusterRuleSupplier() {
    // Register cluster flow rule property supplier which creates data source by namespace.
    // Flow rule dataId format: ${namespace}-flow-rules
    ClusterFlowRuleManager.setPropertySupplier(namespace -> {
        ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
            namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        return ds.getProperty();
    });
    // Register cluster parameter flow rule property supplier which creates data source by namespace.
    ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
        ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
            namespace + DemoConstants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        return ds.getProperty();
    });
}

 5. 告知集群的客戶端管理器,token server服務所在的地址及端口

private void initClientServerAssignProperty() {
    // Cluster map format:
    // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
    // machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
    ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
        clusterMapDataId, source -> {
        List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
        return Optional.ofNullable(groupList)
            .flatMap(this::extractClientAssignment) //具體的邏輯算法, 確認token server 的地址及端口
            .orElse(null);
    });
    // 這里主要認證2個參數 ServerHost ServerPort
    ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
}

 6. 初始化,服務的狀態,當一個服務初始化啟動時,確認此服務是token server 還是 client

private void initStateProperty() {
    // Cluster map format:
    // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}]
    // machineId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
    ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
        clusterMapDataId, source -> {
        List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
        return Optional.ofNullable(groupList)
            .map(this::extractMode)
            .orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
    });
    ClusterStateManager.registerProperty(clusterModeDs.getProperty());
}

本文僅代表個人的觀點,如有錯誤請指正


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM