分布式接口限流实现

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 分布式接口限流实现

@[toc]
## 为什么要接口限流

  • 在我们项目开发过程中,有些接口是暴露在用户的常用中,包括一些高危接口,如 (支付,开发票,订单),这些接口 都是高危接口,且被用户经常使用,在高并发的情况下,io阻塞,不可避免的出现重复提交,或者点击频繁的操作,所以我们就要加入限流,避免用户多次点击,减少我们接口的压力,把整数据不会重复,接口压力减小

为什么要做分布式

  • 在我们做项目负载均衡的时候, 分布式,微服务架构的时候,不可避免的多个节点,这个时候我们就要考虑会被随机分配到各个节点,如果 我们使用 令牌桶 或者 漏斗桶 算法到话,存到 本地,各个节点不会共享,所以
    我们要考虑模块,节点间的共享

实现方式

1. 算法实现(无分布式,单体架构,单节点)

  1. 自定义注解
package com.yxl.annotation;

import org.springframework.core.annotation.AliasFor;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 限流注解,
 * </p>
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {

    int NOT_LIMITED = 0;

    /**
     * qps
     */
    @AliasFor("qps") double value() default NOT_LIMITED;

    /**
     * qps
     */
    @AliasFor("value") double qps() default NOT_LIMITED;

    /**
     * 超时时长
     */
    int timeout() default 0;

    /**
     * 超时时间单位
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

}
  1. AOP实现切面 + 令牌桶算法实现
package com.yxl.aspect;

import com.yxl.annotation.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


/**
 * <p>
 * 限流切面
 * </p>
 *
 * @author yxl
 * @date Created in 2019/9/12 14:27
 */
@Slf4j
@Aspect
@Component
public class RateLimiterAspect {
    private static final ConcurrentMap<String, com.google.common.util.concurrent.RateLimiter> RATE_LIMITER_CACHE = new ConcurrentHashMap<>();

    @Pointcut("@annotation(com.yxl.annotation.RateLimiter)")
    public void rateLimit() {

    }

    @Around("rateLimit()")
    public Object pointcut(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        // 通过 AnnotationUtils.findAnnotation 获取 RateLimiter 注解
        RateLimiter rateLimiter = AnnotationUtils.findAnnotation(method, RateLimiter.class);
        if (rateLimiter != null && rateLimiter.qps() > RateLimiter.NOT_LIMITED) {
            double qps = rateLimiter.qps();
            if (RATE_LIMITER_CACHE.get(method.getName()) == null) {
                // 初始化 QPS
                RATE_LIMITER_CACHE.put(method.getName(), com.google.common.util.concurrent.RateLimiter.create(qps));
            }

            log.debug("【{}】的QPS设置为: {}", method.getName(), RATE_LIMITER_CACHE.get(method.getName()).getRate());
            // 尝试获取令牌
            if (RATE_LIMITER_CACHE.get(method.getName()) != null && !RATE_LIMITER_CACHE.get(method.getName()).tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit())) {
                throw new RuntimeException("手速太快了,慢点儿吧~");
            }
        }
        return point.proceed();
    }
}

使用方式

在这里插入图片描述

查看结果(这里使用了自定义异常)
在这里插入图片描述

2. 分布式实现

package com.yxzapp.annotation;

import org.springframework.core.annotation.AliasFor;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 限流注解,
 * </p>
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {

    int NOT_LIMITED = 0;

    /**
     * 类名
     * @return
     */
    String className() default "";

    /**
     * qps
     */
    @AliasFor("qps") double value() default NOT_LIMITED;

    /**
     * qps
     */
    @AliasFor("value") double qps() default NOT_LIMITED;

    /**
     * 限流时间
     */
    int timeout() default 0;

    /**
     * 超时时间单位
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

}

使用 AOP + redis 实现

package com.yxzapp.aspect;


import com.yxzapp.annotation.RateLimiter;
import com.yxzapp.commons.constant.MessageConstant;
import com.yxzapp.exception.BizException;
import com.yxzapp.modules.sys.entity.SysUser;
import com.yxzapp.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


/**
 * <p>
 * 限流切面
 * </p>
 *
 * @author yxl
 * @date  2020/6/19
 */
@Slf4j
@Aspect
@Component
public class RateLimiterAspect {

    @Autowired
    private RedisUtils redisUtils;

    @Pointcut("@annotation(com.yxzapp.annotation.RateLimiter)")
    public void rateLimit() {

    }

    @Around("rateLimit()")
    public Object pointcut(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        Class aClass = signature.getClass();

        // 获取方法上的@RateLimiter注解
        RateLimiter rateLimiter = AnnotationUtils.findAnnotation(method, RateLimiter.class);
    
        if (rateLimiter != null && rateLimiter.qps() > RateLimiter.NOT_LIMITED) {
            //获取qps
            double qps = rateLimiter.qps();
            
            String key = "RateLimiter:" rateLimiter.className() + +':'+ method.getName();
            if(!redisUtils.hasKey(key)){
                redisUtils.setMillisecond(key,rateLimiter.qps(),rateLimiter.timeout());
            }else if(redisUtils.get(key) != null) {
                throw new BizException(MessageConstant.MSG_STATUS,"手速太快了,慢点儿吧~");
            }

            log.debug("【{}】的QPS设置为: {}", key, redisUtils.get(key));

        }
        return point.proceed();
    }
}

使用方式

在这里插入图片描述
查看结果 (这里使用了自定义异常)
在这里插入图片描述

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
应用服务中间件 nginx
分布式限流
分布式限流
44 1
|
4月前
|
NoSQL Cloud Native 算法
🤔为什么分布式限流会出现不均衡的情况?
🤔为什么分布式限流会出现不均衡的情况?
|
25天前
|
存储 NoSQL 算法
Go 分布式令牌桶限流 + 兜底保障
Go 分布式令牌桶限流 + 兜底保障
|
1月前
|
SQL 索引
分布式之接口幂等性
分布式之接口幂等性
32 2
|
2月前
|
存储 缓存 NoSQL
高并发架构设计三大利器:缓存、限流和降级问题之Redis用于搭建分布式缓存集群问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之Redis用于搭建分布式缓存集群问题如何解决
|
4月前
|
存储 缓存 算法
【专栏】探讨分布式限流所面临的挑战以及目前业界常用的解决方案
【4月更文挑战第27天】在互联网时代,分布式限流是应对高并发、保护系统稳定的关键。它面临数据一致性、算法准确性和系统可扩展性的挑战。常见限流算法有令牌桶、漏桶和滑动窗口。解决方案包括使用分布式存储同步状态、结合多种算法及动态调整阈值。定期压力测试确保策略有效性。随着系统规模增长,限流技术将持续发展,理解并应用限流原理对保障服务质量至关重要。
121 3
|
4月前
|
XML 安全 Java
【分布式技术专题】「单点登录技术架构」一文带领你好好对接对应的Okta单点登录实现接口服务的实现落地
【分布式技术专题】「单点登录技术架构」一文带领你好好对接对应的Okta单点登录实现接口服务的实现落地
227 0
|
4月前
|
负载均衡 算法
分布式限流:避免流控失控的关键问题
在当今高并发互联网环境下,分布式系统中的限流机制显得尤为重要。然而,分布式限流也面临着一系列挑战和问题。本文将探讨分布式限流中需要注意的关键问题,并提供相应解决方案,以确保流控策略的有效实施。
|
4月前
|
消息中间件 数据采集 缓存
探索分布式限流:挑战与解决方案
分布式限流是现代系统设计中的重要挑战之一。本文将探讨分布式限流的背景和意义,以及在实施分布式限流时需要注意的关键问题,并提供一些解决方案。
|
11月前
|
算法 NoSQL Java
分布式接口幂等性、分布式限流(Guava 、nginx和lua限流)
接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条,这就没有保证接口的幂等性。