Java接口限流小结


1. 引子

在高并发系统开发时有时候需要进行接口保护,防止高并发的情况把系统搞崩,因此需要对一个查询接口进行限流,主要的目的就是限制单位时间内请求此查询的次数,例如 1000 次,来保护接口。

2. 计数器 AtomicLong

可以使用Java中的AtomicLong进行限流:

try { if(atomic.incrementAndGet() > 限流数) { //拒绝请求 } //处理请求 } finally { atomic.decrementAndGet(); } 

适合对业务无损的服务或者需要过载保护的服务进行限流,如抢购业务,超出了大小要么让用户排队,要么告诉用户没货了,对用户来说是可以接受的。而一些开放平台也会限制用户调用某个接口的试用请求量,也可以用这种计数器方式实现。这种方式也是简单粗暴的限流,没有平滑处理,需要根据实际情况选择使用.

3. Semaphore

private static Semaphore apiSemaphore = new Semaphore(100); // 并发访问控制 boolean concurrentPermission = apiSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS); if (concurrentPermission) { // 允许访问 } else { // 并发访问超过系统允许上限,请稍后再试! } 

4. RateLimiter限制资源的并发访问线程数

RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数。

RateLimiter limiter = RateLimiter.create(4.0); //每秒不超过4个任务被提交 limiter.acquire(); //请求RateLimiter, 超过permits会被阻塞 executor.submit(runnable); //提交任务 

也可以以非阻塞的形式来使用:

if (limiter.tryAcquire()) { //未请求到limiter则立即返回false doSomething(); } else { doSomethingElse(); } 

5. 利用缓存,存储一个计数器,然后用这个计数器来实现限流

限流某个接口的时间窗请求数:即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/每天的请求数/调用量。如一些基础服务会被很多其他系统调用,比如商品详情页服务会调用基础商品服务调用,但是怕因为更新量比较大将基础服务打挂,这时我们要对每秒/每分钟的调用量进行限速;一种实现方式如下所示:

static LoadingCache<Long, AtomicLong> count = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(new CacheLoader<Long, AtomicLong>() { @Override public AtomicLong load(Long o) throws Exception { //System.out.println("Load call!"); return new AtomicLong(0L); } }); 

我们通过 CacheBuilder 来新建一个 LoadingCache 缓存对象 count,然后设置其有效时间为 1 秒,即每 1 秒钟刷新一次;缓存中,key 为一个 long 型的时间戳类型,value 是一个计数器,使用原子性的 AtomicLong 保证自增和自减操作的原子性, 每次查询缓存时如果不能命中,即查询的时间戳不在缓存中,则重新加载缓存,执行 load 将当前的时间戳的计数值初始化为 0。这样对于每一秒的时间戳,能计算这一秒内执行的次数,从而达到限流的目的;
测试代码如下:

public class Counter { static int counter = 0; public static int getCounter() throws Exception{ return counter++; } } 

现在我们创建多个线程来执行这个方法:

public class Test { public static void main(String args[]) throws Exception { for (int i = 0; i < 100; i++) { new Thread() { @Override public void run() { try { System.out.println(Counter.getCounter()); } catch (Exception e) { e.printStackTrace(); } } }.start(); } } } 

这里的 for 循环执行 100 个进程时间是很快的,那么现在我们要限制每秒只能有 10 个线程来执行 getCounter() 方法,该怎么办呢,上面讲的限流方法就派上用场了:

public class Counter { static LoadingCache<Long, AtomicLong> count = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(new CacheLoader<Long, AtomicLong>() { @Override public AtomicLong load(Long o) throws Exception { System.out.println("Load call!"); return new AtomicLong(0L); } }); static long limits = 10; static int counter = 0; public static synchronized int getCounter() throws Exception { while (true) { //获取当前的时间戳作为key Long currentSeconds = System.currentTimeMillis() / 1000; if (count.get(currentSeconds).getAndIncrement() > limits) { continue; } return counter++; } } } 

这样一来,就可以限制每秒的执行数了。对于每个线程,获取当前时间戳,如果当前时间 (当前这 1 秒) 内有超过 10 个线程正在执行,那么这个进程一直在这里循环,直到下一秒,或者更靠后的时间,重新加载,执行 load,将新的时间戳的计数值重新为 0。
执行结果可以看出每秒执行 11 个(因为从 0 开始),每一秒之后,load 方法会执行一次;为了更加直观,我们可以让每个for循环sleep一段时间:

public class Test { public static void main(String args[]) throws Exception { for (int i = 0; i < 100; i++) { new Thread() { @Override public void run() { try { System.out.println(Counter.getCounter()); } catch (Exception e) { e.printStackTrace(); } } }.start(); Thread.sleep(100); } } } 

在上述这样的情况下,一个线程如果遇到当前时间正在执行的线程超过 limit 值就会一直在 while 循环,这样会浪费大量的资源,我们在做限流的时候,如果出现这种情况,可以不进行 while 循环,而是直接抛出异常或者返回,来拒绝这次执行(查询),这样便可以节省资源。

参考:
http://www.54tianzhisheng.cn/2017/09/23/Guava-limit/


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM