使用Sentinel實現隔離、限流


在18年Hystrix停止更新,Sentinel和Resilience4j逐步成熟,在國內Sentinel的使用企業更加多一些,接下來通過一個實站例子把Sentinel的主要功能使用起來。

功能對比

  Sentinel Hystrix resilience4j
隔離策略 信號量隔離(並發線程數限流) 線程池隔離/信號量隔離 信號量隔離
熔斷降級策略 基於響應時間、異常比率、異常數 基於異常比率 基於異常比率、響應時間
實時統計實現 滑動窗口(LeapArray) 滑動窗口(基於 RxJava) Ring Bit Buffer
動態規則配置 支持多種數據源 支持多種數據源 有限支持
擴展性 多個擴展點 插件的形式 接口的形式
基於注解的支持 支持 支持 支持
限流 基於 QPS,支持基於調用關系的限流 有限的支持 Rate Limiter
流量整形 支持預熱模式、勻速器模式、預熱排隊模式 不支持 簡單的 Rate Limiter 模式
系統自適應保護 支持 不支持 不支持
控制台 提供開箱即用的控制台,可配置規則、查看秒級監控、機器發現等 簡單的監控查看 不提供控制台,可對接其它監控系統

 

Sentinel 基本概念

資源

資源是 Sentinel 的關鍵概念。它可以是 Java 應用程序中的任何內容,例如,由應用程序提供的服務,或由應用程序調用的其它應用提供的服務,甚至可以是一段代碼。在接下來的文檔中,我們都會用資源來描述代碼塊。

只要通過 Sentinel API 定義的代碼,就是資源,能夠被 Sentinel 保護起來。大部分情況下,可以使用方法簽名,URL,甚至服務名稱作為資源名來標示資源。

規則

圍繞資源的實時狀態設定的規則,可以包括流量控制規則、熔斷降級規則以及系統保護規則。所有規則可以動態實時調整。

流量控制

流量控制在網絡傳輸中是一個常用的概念,它用於調整網絡包的發送數據。然而,從系統穩定性角度考慮,在處理請求的速度上,也有非常多的講究。任意時間到來的請求往往是隨機不可控的,而系統的處理能力是有限的。我們需要根據系統的處理能力對流量進行控制。Sentinel 作為一個調配器,可以根據需要把隨機的請求調整成合適的形狀。

流量控制有以下幾個角度:

  • 資源的調用關系,例如資源的調用鏈路,資源和資源之間的關系;
  • 運行指標,例如 QPS、線程池、系統負載等;
  • 控制的效果,例如直接限流、冷啟動、排隊等。

Sentinel 的設計理念是讓您自由選擇控制的角度,並進行靈活組合,從而達到想要的效果。

熔斷降級

什么是熔斷降級

除了流量控制以外,降低調用鏈路中的不穩定資源也是 Sentinel 的使命之一。由於調用關系的復雜性,如果調用鏈路中的某個資源出現了不穩定,最終會導致請求發生堆積。這個問題和 Hystrix 里面描述的問題是一樣的。

image

Sentinel 和 Hystrix 的原則是一致的: 當調用鏈路中某個資源出現不穩定,例如,表現為 timeout,異常比例升高的時候,則對這個資源的調用進行限制,並讓請求快速失敗,避免影響到其它的資源,最終產生雪崩的效果。

熔斷降級設計理念

在限制的手段上,Sentinel 和 Hystrix 采取了完全不一樣的方法。

Hystrix 通過線程池的方式,來對依賴(在我們的概念中對應資源)進行了隔離。這樣做的好處是資源和資源之間做到了最徹底的隔離。缺點是除了增加了線程切換的成本,還需要預先給各個資源做線程池大小的分配。

Sentinel 對這個問題采取了兩種手段:

  • 通過並發線程數進行限制

和資源池隔離的方法不同,Sentinel 通過限制資源並發線程的數量,來減少不穩定資源對其它資源的影響。這樣不但沒有線程切換的損耗,也不需要您預先分配線程池的大小。當某個資源出現不穩定的情況下,例如響應時間變長,對資源的直接影響就是會造成線程數的逐步堆積。當線程數在特定資源上堆積到一定的數量之后,對該資源的新請求就會被拒絕。堆積的線程完成任務后才開始繼續接收請求。

  • 通過響應時間對資源進行降級

除了對並發線程數進行控制以外,Sentinel 還可以通過響應時間來快速降級不穩定的資源。當依賴的資源出現響應時間過長后,所有對該資源的訪問都會被直接拒絕,直到過了指定的時間窗口之后才重新恢復。

 

實戰

假如我們有一個高並發場景,需要做搶購秒殺,需要獲取用戶資料,服務類的接口需要限流保護。

實體類

package com.xin.sentinel.demo.entity;

public class User {
    int id;
    String name;
    int age;
    int level;
    String address;
    String password;
    String phone;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public User(String name) {
        this.id = -1;
        this.name = name;
    }

    public User() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getLevel() {
        return level;
    }

    public void setLevel(int level) {
        this.level = level;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", level=" + level +
                ", address='" + address + '\'' +
                ", password='" + password + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }
}
View Code

服務類

package com.xin.sentinel.demo.service;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.xin.sentinel.demo.entity.User;
import com.xin.sentinel.demo.dao.DB;
import org.springframework.stereotype.Service;

import java.util.Collections;

@Service
public class UserService {

    public static final String USER_RES = "userResource";

    public UserService(){
        // 定義熱點限流的規則,對第一個參數設置 qps 限流模式,閾值為5
        FlowRule rule = new FlowRule();
        rule.setResource(USER_RES);
        // 限流類型,qps
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // 設置閾值
        rule.setCount(4);
        // 限制哪個調用方
        rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
        // 基於調用關系的流量控制
        rule.setStrategy(RuleConstant.STRATEGY_DIRECT);
        // 流控策略
        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        FlowRuleManager.loadRules(Collections.singletonList(rule));
    }

    /**
     * SphU 包含了 try-catch 風格的 API。用這種方式,當資源發生了限流之后會拋出 BlockException。
     * 這個時候可以捕捉異常,進行限流之后的邏輯處理.
     * @param uid
     * @return
     */
    public User getUser(int uid){
        Entry entry = null;
        // 資源名可使用任意有業務語義的字符串,比如方法名、接口名或其它可唯一標識的字符串。
        try {
            // 流控代碼
            entry = SphU.entry(USER_RES);
            // 業務代碼
            User user = new User();
            user.setId(uid);
            user.setName("user-" + uid);
            DB.InsertUser(user); //長耗時的工作
            return user;
        }catch(BlockException e){
            // 被限流了
            System.out.println("[getUser] has been protected! Time="+System.currentTimeMillis());
        }finally {
            if(entry!=null){
                entry.exit();
            }
        }
        return null;
    }


    /**
     * 通過 @SentinelResource 注解定義資源並配置 blockHandler 和 fallback 函數來進行限流之后的處理
     * @param id
     * @return
     */
    @SentinelResource(blockHandler = "blockHandlerForGetUser")
    public User getUserById(String id) {
        throw new RuntimeException("getUserById command failed");
    }

    // blockHandler 函數,原方法調用被限流/降級/系統保護的時候調用
    public User blockHandlerForGetUser(String id, BlockException ex) {
        return new User("admin");
    }


}
View Code

控制器

package com.xin.sentinel.demo.controller;

import com.xin.sentinel.demo.entity.User;
import com.xin.sentinel.demo.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class Demo {

    @Autowired
    private UserService userService;
    /**
     * 獲取用戶信息
     */
    @GetMapping("/getUser")
    public @ResponseBody User getUser(@RequestParam("id") int id) {
        return userService.getUser(id);
    }


}
View Code

 

流量規則的定義

重要屬性:

Field 說明 默認值
resource 資源名,資源名是限流規則的作用對象  
count 限流閾值  
grade 限流閾值類型,QPS 或線程數模式 QPS 模式
limitApp 流控針對的調用來源 default,代表不區分調用來源
strategy 調用關系限流策略:直接、鏈路、關聯 根據資源本身(直接)
controlBehavior 流控效果(直接拒絕 / 排隊等待 / 慢啟動模式),不支持按調用關系限流 直接拒絕

同一個資源可以同時有多個限流規則。

通過代碼定義流量控制規則

理解上面規則的定義之后,我們可以通過調用 FlowRuleManager.loadRules() 方法來用硬編碼的方式定義流量控制規則,比如:

private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(resource);
    // Set max qps to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

流程圖

運行上面的demo,還有日志輸出,目錄類似:C:\Users\Administrator\logs\csp

1589891884000|2020-05-19 20:38:04|userResource|1|0|1|0|13|0|0|0

含義分別是:

流量控制主要有兩種統計類型,一種是統計線程數,另外一種則是統計 QPS。類型由 FlowRule.grade 字段來定義。其中,0 代表根據並發數量來限流,1 代表根據 QPS 來進行流量控制。

線程控制隔離

線程數限流用於保護業務線程數不被耗盡。例如,當應用所依賴的下游應用由於某種原因導致服務不穩定、響應延遲增加,對於調用者來說,意味着吞吐量下降和更多的線程數占用,極端情況下甚至導致線程池耗盡。為應對高線程占用的情況,業內有使用隔離的方案,比如通過不同業務邏輯使用不同線程池來隔離業務自身之間的資源爭搶(線程池隔離),或者使用信號量來控制同時請求的個數(信號量隔離)。這種隔離方案雖然能夠控制線程數量,但無法控制請求排隊時間。當請求過多時排隊也是無益的,直接拒絕能夠迅速降低系統壓力。Sentinel線程數限流不負責創建和管理線程池(對,說的就是hystrix),而是簡單統計當前請求上下文的線程個數,如果超出閾值,新的請求會被立即拒絕。

線程隔離的例子

public class FlowThreadDemo {

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();
    private static AtomicInteger activeThread = new AtomicInteger();

    private static volatile boolean stop = false;
    private static final int threadCount = 100;

    private static int seconds = 60 + 40;
    private static volatile int methodBRunningTime = 2000;

    public static void main(String[] args) throws Exception {
        System.out.println(
            "MethodA will call methodB. After running for a while, methodB becomes fast, "
                + "which make methodA also become fast ");
        tick();
        initFlowRule();

        for (int i = 0; i < threadCount; i++) {
            Thread entryThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        Entry methodA = null;
                        try {
                            TimeUnit.MILLISECONDS.sleep(5);
                            methodA = SphU.entry("methodA");
                            activeThread.incrementAndGet();
                            Entry methodB = SphU.entry("methodB");
                            TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
                            methodB.exit();
                            pass.addAndGet(1);
                        } catch (BlockException e1) {
                            block.incrementAndGet();
                        } catch (Exception e2) {
                            // biz exception
                        } finally {
                            total.incrementAndGet();
                            if (methodA != null) {
                                methodA.exit();
                                activeThread.decrementAndGet();
                            }
                        }
                    }
                }
            });
            entryThread.setName("working thread");
            entryThread.start();
        }
    }

    private static void initFlowRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource("methodA");
        // set limit concurrent thread for 'methodA' to 20
        rule1.setCount(20);
        rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        rule1.setLimitApp("default");

        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " total qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock
                    + " activeThread:" + activeThread.get());
                if (seconds-- <= 0) {
                    stop = true;
                }
                if (seconds == 40) {
                    System.out.println("method B is running much faster; more requests are allowed to pass");
                    methodBRunningTime = 20;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }
}
View Code

QPS隔離

當 QPS 超過某個閾值的時候,則采取措施進行流量控制。流量控制的手段包括下面 3 種,對應 FlowRule 中的 controlBehavior 字段:

  1. 直接拒絕(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式。該方式是默認的流量控制方式,當QPS超過任意規則的閾值后,新的請求就會被立即拒絕,拒絕方式為拋出FlowException。這種方式適用於對系統處理能力確切已知的情況下,比如通過壓測確定了系統的准確水位時。具體的例子參見 FlowqpsDemo

  2. 冷啟動(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式。該方式主要用於系統長期處於低水位的情況下,當流量突然增加時,直接把系統拉升到高水位可能瞬間把系統壓垮。通過"冷啟動",讓通過的流量緩慢增加,在一定時間內逐漸增加到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮的情況。具體的例子參見 WarmUpFlowDemo

  1. 勻速器(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式。這種方式嚴格控制了請求通過的間隔時間,也即是讓請求以均勻的速度通過,對應的是漏桶算法

 

項目源碼

Sentinel.zip


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM