RateLimit--使用guava來做接口限流


轉:https://blog.csdn.net/jiesa/article/details/50412027

一、問題描述  
  某天A君突然發現自己的接口請求量突然漲到之前的10倍,沒多久該接口幾乎不可使用,並引發連鎖反應導致整個系統崩潰。如何應對這種情況呢?生活給了我們答案:比如老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的接口也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以采取拒絕或者引流等機制。 

二、常用的限流算法
      常用的限流算法有兩種:漏桶算法和令牌桶算法,這篇博文介紹得比較清晰(過載保護算法淺析)。

      漏桶算法思路很簡單,請求先進入到漏桶里,漏桶以一定的速度出水,當水請求過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。


圖1 漏桶算法示意圖

      對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。如圖2所示,令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。


圖2 令牌桶算法示意圖

三、限流工具類RateLimiter
   google開源工具包guava提供了限流工具類RateLimiter,該類基於“令牌桶算法”,非常方便使用。該類的接口描述請參考:RateLimiter接口描述,具體的使用請參考:RateLimiter使用實踐。


RateLimiter 使用Demo

package ratelimite;

import com.google.common.util.concurrent.RateLimiter;

public class RateLimiterDemo {
public static void main(String[] args) {
testNoRateLimiter();
testWithRateLimiter();
}

public static void testNoRateLimiter() {
Long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
System.out.println("call execute.." + i);

}
Long end = System.currentTimeMillis();

System.out.println(end - start);

}

public static void testWithRateLimiter() {
Long start = System.currentTimeMillis();
RateLimiter limiter = RateLimiter.create(10.0); // 每秒不超過10個任務被提交
for (int i = 0; i < 10; i++) {
limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞
System.out.println("call execute.." + i);

}
Long end = System.currentTimeMillis();

System.out.println(end - start);

}

}

 


四 Guava並發:ListenableFuture與RateLimiter示例
概念
        ListenableFuture顧名思義就是可以監聽的Future,它是對java原生Future的擴展增強。我們知道Future表示一個異步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態。這樣做,代碼復雜,而且效率低下。使用ListenableFuture Guava幫我們檢測Future是否完成了,如果完成就自動調用回調函數,這樣可以減少並發程序的復雜度。      

        推薦使用第二種方法,因為第二種方法可以直接得到Future的返回值,或者處理錯誤情況。本質上第二種方法是通過調動第一種方法實現的,做了進一步的封裝。

另外ListenableFuture還有其他幾種內置實現:

SettableFuture:不需要實現一個方法來計算返回值,而只需要返回一個固定值來做為返回值,可以通過程序設置此Future的返回值或者異常信息

CheckedFuture: 這是一個繼承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future執行發生異常時,可以拋出指定類型的異常。


    RateLimiter類似於JDK的信號量Semphore,他用來限制對資源並發訪問的線程數,本文介紹RateLimiter使用

代碼示例

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

import com.google.common.util.concurrent.FutureCallback;

import com.google.common.util.concurrent.Futures;

import com.google.common.util.concurrent.ListenableFuture;

import com.google.common.util.concurrent.ListeningExecutorService;

import com.google.common.util.concurrent.MoreExecutors;

import com.google.common.util.concurrent.RateLimiter;

 

public class ListenableFutureDemo {

    public static void main(String[] args) {

        testRateLimiter();

        testListenableFuture();

    }

 

    /**

     * RateLimiter類似於JDK的信號量Semphore,他用來限制對資源並發訪問的線程數

     */

    public static void testRateLimiter() {

        ListeningExecutorService executorService = MoreExecutors

                .listeningDecorator(Executors.newCachedThreadPool());

 

        RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超過4個任務被提交

 

        for (int i = 0; i < 10; i++) {

            limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞

 

            final ListenableFuture<Integer> listenableFuture = executorService

                    .submit(new Task("is "+ i));

        }

    }

 

    public static void testListenableFuture() {

        ListeningExecutorService executorService = MoreExecutors

                .listeningDecorator(Executors.newCachedThreadPool());

 

        final ListenableFuture<Integer> listenableFuture = executorService

                .submit(new Task("testListenableFuture"));

 

         

        //同步獲取調用結果

        try {

            System.out.println(listenableFuture.get());

        } catch (InterruptedException e1) {

            e1.printStackTrace();

        } catch (ExecutionException e1) {

            e1.printStackTrace();

        }

         

        //第一種方式

        listenableFuture.addListener(new Runnable() {

            @Override

            public void run() {

                try {

                    System.out.println("get listenable future's result "

                            + listenableFuture.get());

                } catch (InterruptedException e) {

                    e.printStackTrace();

                } catch (ExecutionException e) {

                    e.printStackTrace();

                }

            }

        }, executorService);

 

        //第二種方式

        Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {

            @Override

            public void onSuccess(Integer result) {

                System.out

                        .println("get listenable future's result with callback "

                                + result);

            }

 

            @Override

            public void onFailure(Throwable t) {

                t.printStackTrace();

            }

        });

    }

}

 

class Task implements Callable<Integer> {

    String str;

    public Task(String str){

        this.str = str;

    }

    @Override

    public Integer call() throws Exception {

        System.out.println("call execute.." + str);

        TimeUnit.SECONDS.sleep(1);

        return 7;

    }

}

Guava版本

<dependency>

            <groupId>com.google.guava</groupId>

            <artifactId>guava</artifactId>

            <version>14.0.1</version>

        </dependency>

 

本文是對以下文章的加工整合。

http://my.oschina.net/cloudcoder/blog/359598

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM