2

限流器算法实现(JUC原子类使用实践) - Cuzzz

 1 year ago
source link: https://www.cnblogs.com/cuzzz/p/17128584.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

系列文章目录和关于我

一丶限流器存在的意义#

在高并发系统中,出于系统保护角度考虑,通常会对流量进行限流。

限流*的目的是在遇到流量高峰期或者流量突增(流量尖刺)时,通过对流量速率进行限制,当达到限制速率时,可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)

二丶限流器算法与实现#

0. 写在前面#

笔者碰到这样的一道题,有下面的接口,需要实现1s秒只可以申请max次资源,需要考虑到并发性能

image-20230216222846111

显然这是一个限流器,网上有许多博主使用来实现,笔者将使用juc中的原子类,使用自旋+cas实现。并不是说自旋+cas 一定优于锁(自旋是cpu操作,在并发很高的时候会浪费cpu资源),但是相对于锁可以减少用户态到内核态的切换(这是由于java线程和操作系统线程是1比1的模型,上锁是需要切换到内核态的,尽管synchronized 具备锁升级,但是如果并发高了,最终还是重量级锁,依旧具备开销)笔者的cas都没有自定义cas次数,实际使用应该是自旋若干次就放弃,或者切换为使用锁的方式。并且这里都是单机的限流,分布式的限流需要使用其他中间件,如redis实现。

而且在并发很高,很极限的情况下,这种cas的方式存在bug,所以成熟的实现如gauva的限流器,是使用synchronized实现的。

1.计数器算法#

1.2 概述#

当前接口限定1s只可以提供n次服务,对于此需求,我们很直观的可以想到使用属性记录当前是第几秒,并且记录这一秒内请求了多少次。

在指定周期内累加访问次数,当访问次数达到设定的阈值时,触发限流策略,当进入下一个时间周期时会将访问次数清零

  • 优点:简单直接

  • 缺点:临界问题

    如果在0.51s内请求max次,从1s1.5s请求max次,这样可以实现在0.5~1.5这一秒内请求来到2max次。

1.2 实现#

首先我们要记录 在x秒请求了y次,并且这两个值必须是原子的。

这里我们使用AtomicStampedReference(JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法中讲到过),此类本意是为了防止cas的ABA问题,其中会记录值和对应的版本,并且其compareAndSet方法可以同时更新值和对应的版本,这里我们使用版本表示当前是多少秒,请求多少次使用AtomicInteger记录,方便我们在同一秒的时候更新请求数量。

public class CountRateLimiter implements RateLimiter {

    /**
     * 一秒可以接受多少个请求
     */
    private final int numAcceptablePerSecond;

    /***
     * 版本号对应秒数
     * 里面的 AtomicInteger 记录这一秒范围内的请求数量
     */
    private final AtomicStampedReference<AtomicInteger> helper;


    public CountRateLimiter(int numAcceptablePerSecond) {
        this.numAcceptablePerSecond = numAcceptablePerSecond;
        this.helper =
                new AtomicStampedReference<>(new AtomicInteger(numAcceptablePerSecond), -1);
    }


    @Override
    public boolean acquire(int n) {
        //不要太过分了
        if (n > numAcceptablePerSecond) {
            return false;
        }
        //上一次请求是多少秒的请求
        int oldSeconds = helper.getStamp();
        //当前多少秒
        int currentSeconds = currentSeconds();

        //不是同一秒的请求
        //如果和当前不是一个版本(意味着不是同一秒) 那么cas 修改版本并重置许可数量
        if (oldSeconds != currentSeconds) {
            //原剩余的许可数量
            AtomicInteger oldPermits = helper.getReference();
            //cas 修改 同时修改版本,并且扣减数量
            if (helper.compareAndSet(oldPermits,
                    //新许可的数量为  numAcceptablePerSecond - n
                    new AtomicInteger(numAcceptablePerSecond - n), oldSeconds, currentSeconds)) {
                //cas 成功 那么说明成功 拿到令牌
                return true;
            }
        }
		
        //到这里说明 是同一秒(oldSeconds == currentSeconds)
        //或者上面的if存在多线程竞争当前线程竞争失败 其他线程重置了计数器 ==> 那么cas 减少许可数量
        
   		//这里判断了一下 当前秒还是相等的,避免由于gc在第一个if中停留太久,比如第一秒线程A和B进入到第一个if,线程B成功了,但是线程A失败了,并且暂停了2s,出来的时候时间已经是3s了,我们不能让1s的请求占用3s时候的令牌数
        return currentSeconds() == currentSeconds &&
            
                //最后这里存在问题 如果在0.99999s的请求来到这里,但是时间来到1s,这个cas才成功,那么0.99999s的请求将打过来。导致1s的qps大于max
                helper.getReference().addAndGet(-n) >= 0;
    }



    private static int currentSeconds() {
        return (int) ((System.currentTimeMillis() / 1000) % Integer.MAX_VALUE);
    }


}

2.滑动窗口#

2.1 概述#

为了避免计数器中的临界问题,让限制更加平滑,将固定窗口中分割出多个小时间窗口,分别在每个小的时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口。

计数器算法,可以看做只有两个窗口,因此在两个窗口边界的时候会出现临界问题。而滑动窗口统计当前时间处于的1s内产生了多少次请求,避免了临界问题

  • 优点:实现相对简单,且没有计数器算法的临界问题
  • 缺点:无法应对短时间高并发(突刺现象),比如我在间歇性高流量请求,每一批次的请求,处于不同的窗口(图中的虚线窗口)(如10.1s,20.1s分别产生max次请求,其实的系统qps还是大于max的)

image-20230216223141292

2.2 实现#

/**
 * 滑动窗口限流器
 * <p>
 * 假设指定窗口总时长 为 1s 可以接受 10个请求,窗口分为5格
 * 说明单格时间长度为200毫秒
 * |_____|_____|_____|_____|_____|
 * 0    200   400  600    800   1000
 * <p>
 * 当前时间为 500毫秒 那么落在 (500/200)%5 也就是第二格
 * 那么500 毫秒是否可以接受请求 需要统计所有格子中的数量
 * <p>
 * 当时间来到 1500 毫秒,落在 (1500/200)%5 也是第二格
 * |_____|_____|_____|_____|_____|_____|_____|_____|
 * 0    200   400  600    800  1000   1200  1400  1600
 * 从500到1500才是我们需要记录的,窗口数组大小是不变的
 *
 * 500的窗口版本是 500/1000 = 0
 * 1500的窗口版本是 1500/1000 = 1
 *
 * 根据窗口版本来统计 哪些格子我们是要统计的,如果旧窗口版本小于当前窗口版本 不要计数
 * (这里的版本 可以理解为没过 1000秒 版本加1,版本不同意味着是一秒前的数据)
 *
 * @author cuzz
 * @version 1.0
 **/
public class SlidingWindowRateLimiter implements RateLimiter {

    //滑动窗口中的一个元素
    private static class WindowElement {
        /***
         * 版本
         */
        private volatile long version;
        /**
         * 计数
         */
        private final AtomicInteger counter;

        private WindowElement(long version, AtomicInteger counter) {
            this.version = version;
            this.counter = counter;
        }

        private void changeVersion(long newVersion) {
            this.version = newVersion;
        }

        private void reset(int n) {
            counter.set(n);
        }

        void add(int n) {
            counter.addAndGet(n);
        }
    }

    /**
     * 整个窗口的大小,比如一秒 只能接受100个请求 那么此值设置为1000(毫秒)
     */
    private final long windowTimeMillions = 1000;
    /***
     * 窗口的长度,窗口的长度,窗口越长,越能防止临界问题
     */
    private final int windowLength;
    /***
     * 窗口数组
     */
    private final AtomicReferenceArray<WindowElement> slidWindow;
    /***
     * 一秒接受 100个请求 那么此值设置为 100
     */
    private final int canAcceptRequestTimes;
    /**
     * 记录 窗口每一个元素  对应的时间跨度
     * 1秒接受100个请求 那么此处为 1000(毫秒)/100 = 10毫秒
     */
    private final int millionsEachOne;

    /**
     * @param windowLength          指定窗口数量
     * @param canAcceptRequestTimes 在 1s 内可以接受多少个请求
     */
    public SlidingWindowRateLimiter(int windowLength,
                                    int canAcceptRequestTimes) {
        this.windowLength = windowLength;
        this.canAcceptRequestTimes = canAcceptRequestTimes;
        slidWindow = new AtomicReferenceArray<>(new WindowElement[windowLength]);
        millionsEachOne = (int) (windowTimeMillions / windowLength);
    }

    @Override
    public boolean acquire(int n) {
        //1s分为5格 那么 一格200ms
        //当前时间为 500毫秒 那么落在 (500/200)%5 也就是第二格
        long currentTimeMillis = System.currentTimeMillis();
        //这次请求 落在 哪个桶
        int index = (int) ((currentTimeMillis / millionsEachOne) % windowLength);
        //当前这次请求的 version 即当前是多少秒
        long version = (currentTimeMillis - currentTimeMillis % windowTimeMillions);
        //1. 拿到当前当前的计数
        //1.1 如果计数为空 说明从来没有其他请求设置元素,这时,我们需要cas初始化结束计数
        //1.2 如果计数不为空
        // 1.2.1 是相同的版本 那么自增计数
        // 1.2.3 如果不是相同的版本(之前版本小于当前版本),那么更新版本
        // 1.2.4 如果不是相同的版本(之前版本大于当前版本),基本上不可能,因为时间是不会倒流的

        //操作这次请求落下的桶
        WindowElement currentIndex = slidWindow.accumulateAndGet(index,
                new WindowElement(version, new AtomicInteger(n)), (old, now) -> {
                    //计数为空 说明从来没有其他请求设置元素,这时,我们需要cas初始化结束计数
                    if (old == null) {
                        return now;
                    }

                    //当前请求的次数
                    int currentRequest = now.counter.get();

                    //是同一秒 那么自增
                    if (old.version == now.version) {
                        old.add(now.counter.get());
                    } else {
                        //如果不是相同的版本(之前版本小于当前版本),那么更新版本 更新技术
                        old.reset(currentRequest);
                        old.changeVersion(now.version);
                    }
                    return old;
                });

        //大于最大数量返回false 这一瞬间对应的元素 就已经超出了我们的预期 那么返回false
        if (currentIndex.counter.get() > canAcceptRequestTimes) {
            return false;
        }
        
        //统计窗口内所有请求数
        long sum = 0;
        //下面这一段 不具备瞬时一致性
        for (int i = 0; i < windowLength; i++) {
            WindowElement e = slidWindow.get(i);
            if (e != null && e.version == version) {
                sum += e.counter.get();
                if (sum > canAcceptRequestTimes) {
                    return false;
                }
            }
        }
        //小于等于才可以
        return sum <= canAcceptRequestTimes;
    }


}

感觉滑动窗口,不使用锁,是比较难实现的,因为上面的增加次数 和下面的 统计总数,不具备原子性。

3. 漏桶算法#

漏桶限流算法的核心就是, 不管上面的水流速度有多块, 漏桶水滴的流出速度始终保持不变

这里的水流就是我们的请求,漏水的速度始终不变,是指我们业务线程处理的速度不变,每隔一段时间从任务队列中拿一个请求进行处理

image-20230216223159914

漏桶算法,我觉得可以用于网关层,每次请求来了放在阻塞队列,然后网关线程,每隔一段线程拿出一个请求去转发执行,但是转发请求,拿到返回进行响应的时间是不固定,所以网关线程需要使用其他的线程去异步处理,网关线程只负责定时拿请求分配给其他线程异步处理。(本题目中接口要求立马返回false 或者true,不太契合就没写了)

大体的思路就是请求来了塞到任务队列,定时线程每隔一段时间取一个请求,交给另外的线程异步处理。如果请求太多,那么阻塞队列塞不进去,直接返回false。

4.令牌桶算法#

4.1 概述#

令牌桶算法:请求执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝本次请求,由此达到限流目的。当桶中令牌数大于最大数量的时候,将不再添加。它可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。

image-20230216223329049

4.2 实现#

Guava中的RateLimiter 使用此算法,且提供了预热模式,推荐使用

4.2.1 定时任务定时生成#

我们可以使用一个定时任务每隔一段时间想桶中生成令牌,记录令牌的数量使用原子类。

这样实现非常简单,但是每一个限流器需要一个线程去生成,如果我们存在100个接口单独限流,那么需要100个线程

4.2.2 延迟计算令牌数量#

我们需要记录上一次请求的时间,和桶中剩余的令牌数,并且桶中的数量最好为double类型,因为此次请求和上一次请求的间隔时间,生成的令牌数可以为小数。所以我实现一个Helper内部类,实现这两个值的原子更新

static class Helper {
    //同时记录上一次请求时间 和 剩余数量
    private static class Pair {
        
        public Pair(long time, double count) {
            this.time = time;
            this.count = count;
        }

        final long time;
        final double count;
    }

    private double count() {
        return pair.count;
    }

    private long time() {
        return pair.time;
    }
	
    //同时记录上一次请求时间 和 剩余数量
    private volatile Pair pair;
    //反射获取unsafe进行cas操作
    private static final Unsafe UNSAFE = getUnsafe();


    Helper(double count) {
        pair = new Pair(-1, count);
    }
	
    //pair 字段的偏移,cas需要当期更改对象的地址
    private final static long OFFSET_OF_PAIR;

    static {
        try {
            OFFSET_OF_PAIR = UNSAFE.objectFieldOffset(Helper.class.getDeclaredField("pair"));
        } catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }
	
    //cas修改,一次是,上一次的时间 和上一次剩余数量,和当前时间,当前数量
    boolean cas(long oldTime, double oldCount, long newTime, double newCount) {
        final Pair current = pair;
        return oldTime == current.time &&
            			//小于0.00001视为 数量相同,这里需要根据并发度进行设定
                        Math.abs(oldCount - current.count) < 0.00001 &&
                        casPair(current, new Pair(newTime, newCount));
    }

    //cas修改 调用unsafe 实现
    boolean casPair(Pair old, Pair newPair) {
        return UNSAFE.compareAndSwapObject(this, OFFSET_OF_PAIR, old, newPair);
    }

    private static Unsafe getUnsafe() {
        try {
            Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor();
            constructor.setAccessible(true);
            return constructor.newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
public class TokenRateLimiter3 implements RateLimiter {

	//一秒可以接受多少请求
    private final int tokenPerSeconds;
	//记录token数量和上次请求时间
    private final Helper tokenCount;

    public TokenRateLimiter3(int tokenPerSeconds) {
        this.tokenPerSeconds = tokenPerSeconds;
        tokenCount = new Helper(0);
    }

    @Override
    public boolean acquire(int n) {
        if (n > tokenPerSeconds) {
            return false;
        }
        //当前请求时间
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {

            //当前token 有多少个 
            double token = tokenCount.count();
            //上一期请求时间
            long preRequestTime = tokenCount.time();
            //这段时间可以生成多少令牌
            double canGenerate = (((double) (currentTimeMillis - preRequestTime) / 1000.0)
                    * tokenPerSeconds);

            //桶中可以存储的令牌的数量,最大不过 tokenPerSeconds
            double canStore = Math.min(token + canGenerate, tokenPerSeconds);

            //小于请求的令牌数
            if (canStore < (double) n) {
                return false;
            }
            //剩余多少令牌
            double release = canStore - n;
			
            //cas修改
            if (tokenCount.cas(preRequestTime,
                    token, currentTimeMillis, release)) {
                return true;
            }
        }
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK