微服务应用(十):分布式并发重复提交问题

star2017 1年前 ⋅ 572 阅读

Web 系统的表单重复提交问题必然会出现,如注册、秒杀下单、支付等场景都可能出现重复提交。必须对重复提交进行处理,否则会出现脏数据,若建了唯一索引则会抛 JDBC 异常。单体应用的表单重复提交问题可参考 拦截器 与Spring AOP 实现防止表单重复提交

分布式集群部署的 Web 做防止表单重复提交的处理上,其根本也是基于 TOKEN 的方式来实现,因是集群布署,所以此 TOKEN 不能存放在 SESSION 中(未使用共享 SESSION 方案) ,而是存放在外部存储系统中,如 Reids,这里其实引入了类似分布式锁的概念。

如果集群的 session 实现了集群共享的,如基于 Spring Session + Redis 方案,也可以将 token 放入在 Session。此篇文章是基于分布式锁的方式来实现分布式环境下处理并发重复提交问题。

重复提交最好是在控制层或者更早提前处理,阻止重复提交流入到业务层甚至是数据库,数据库唯一约束最好是作为最后的兜底策略,确保相同的提交请求只被处理一次。

防重复提交

前提:定义防止重复提交的注解,作用在 ControllerMapping 注解的类或方法上,注解的方式可以更灵活地标记那些接口需要做防重复提交处理。

防止重复提交的 TOKEN 在分布式环境相当于分布式锁的 KEY,如果 KEY 存在,则表示有效执行提交处理;如果不存在,则表示该请求已处理,不能重复提交。

分布式锁概念及实现方案,参考 分布式微服务应用系列(十一):分布式锁理解及实现方案

Token 方案

思路

  1. 创建防重复提交注解 NoRepeatCommit,作用在防重复提交的接口。
  2. 创建防重复提交拦截器,通过 preHandle() 方法的 handler 参数获取处理方法,再通过方法获取注解(NoRepeatCommit)。
  3. 在初始化表单之前先获取防重复提交的 Token,后端生成防重复提交的 Token 并存放到 Redis 中。
  4. 提交表单同时带上防重复提交 Token 。
  5. 拦截器解析注解并获取请求中防重复提交的 Token 作为 Redis 中 的 Key,判断 Key 是否存在,存在则执行提交;不存在,则说明请求已被处理。

实现示例

  1. 项目引入 spring-boot-starter-data-redis 包 和其它依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.6.0</version>
    </dependency>
    <!-- fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    
  2. Spring Boot 配置文件添加连接 Redis 配置。

    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    spring.redis.password=123456
    spring.redis.database=0
    spring.redis.lettuce.pool.max-active=8
    spring.redis.lettuce.pool.max-wait=-1ms
    spring.redis.lettuce.pool.max-idle=8
    spring.redis.lettuce.pool.min-idle=0
    spring.redis.lettuce.shutdown-timeout=100ms
    
  3. 自定义 RedisTemplate

    @Configuration
    public class RedisConfig {
    
        /**
         * redis默认使用jdk的二进制数据来序列化
         * 以下自定义使用FastJson来序列化
         *
         * @param redisConnectionFactory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
    
            FastJsonRedisSerializer serializer = new FastJsonRedisSerializer(Object.class);
            template.setKeySerializer(new StringRedisSerializer()); 
            template.setValueSerializer(serializer); 
            template.setHashKeySerializer(new StringRedisSerializer());
            template.setHashValueSerializer(serializer);
    
            //开启事务支持
            template.setEnableTransactionSupport(true);
            template.afterPropertiesSet();
            return template;
        }
    }
    
  4. 定义防重复提交注解

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD, ElementType.TYPE})
    public @interface NoRepeatCommit {
    }
    
  5. 创建生成防重复提交Token接口

    @RestController
    @RequestMapping("/token")
    public class TokenController {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Autowired
        private HttpServletResponse response;
    
        @RequestMapping("/noRepeat")
        public String noRepeat() {
            UUID uuid = UUID.randomUUID();
            String token = uuid.toString().replace("-", "").toLowerCase();
            redisTemplate.opsForValue().set(ConstantParm.TOKEN_PRE + ":" + token, "");
    
            Cookie cookie = new Cookie(ConstantParm.TOKEN_PRE, token);
            response.addCookie(cookie);
            return token;
        }
    }
    
  6. 创建防重复提交接口,添加防重复注解

    @RestController
    @RequestMapping("/user")
    public class UserController {
    
        @NoRepeatCommit
        @RequestMapping("/add")
        public ResultBean<User> add() {
            User user = new User();
            user.setId(System.currentTimeMillis()).setUsername("admin").setPassword("123456");
            return new ResultBean<>(user);
        }
    }
    
  7. 其它实体类

    响应实体

    @Data
    @Accessors(chain = true)
    public class ResultBean<T> {
    
        private int code;
        private String msg;
        private T data;
    
        public ResultBean(T data) {
            this.code = 200;
            this.msg = "SUCCESS";
            this.data = data;
        }
    }
    

    实体类,使用 Lombok 插件

    @Data
    @Accessors(chain = true)
    public class User implements Serializable {
        private static final long serialVersionUID = 1458351568707901332L;
    
        private Long id;
        private String username;
        private String password;
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    '}';
        }
    }
    

拦截器实现

  1. 创建防重复提交拦截器

    public class NoRepeatInterceptor implements HandlerInterceptor {
        private static final Logger logger = LogManager.getLogger(NoRepeatInterceptor.class);
    
        private RedisTemplate<String, Object> redisTemplate;
    
        public NoRepeatInterceptor(RedisTemplate<String, Object> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    
            logger.info("进入防止重复提交拦截器.............");
    
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Class<?> beanType = handlerMethod.getBeanType();
            //注解支持作用在类上,则所有方法都需要做防重复提交
            NoRepeatCommit classNoRepeat = beanType.getAnnotation(NoRepeatCommit.class);
            NoRepeatCommit MethodNoRepeat = handlerMethod.getMethodAnnotation(NoRepeatCommit.class);
            //方法注解优先
            NoRepeatCommit noRepeatCommit = (null != MethodNoRepeat ? MethodNoRepeat : classNoRepeat);
            if (null != noRepeatCommit) {
                /*Cookie[] cookies = request.getCookies();
                for (Cookie cookie : cookies) {
                    if (cookie.getName().equals(ConstantParm.TOKEN_PRE)) {
                        String noRepeatToken = cookie.getValue();
                    }
                }*/
                String noRepeatToken = request.getHeader(ConstantParm.TOKEN_PRE);
                String key = ConstantParm.TOKEN_PRE + ":" + noRepeatToken;
                //如果有防重复提交的注解,则进入业务处理
                if (null != noRepeatToken) {
                    //此处使用同步代码块来防止并发问题,同一时刻执行查询和删除操作只能有一个线程
                    synchronized (this) {
                        //此处还可以使用 lua 脚本来保证查询和删除在同一个原子操作
                        boolean exist = redisTemplate.hasKey(key).booleanValue();
                        if (exist) {
                            redisTemplate.delete(key);
                        }
                    }
                    return true;
                } else {
                    //如果防重复提交 Token 不存在,或不相等,则认为已处理并拒绝提交
                    ServletOutputStream output = response.getOutputStream();
                    output.write("{\"msg\":\"请不要重复提交\"}".getBytes("UTF-8"));
                    output.flush();
                    output.close();
                    return false;
                }
            }
            return true;
        }
    }
    
  2. 创建 WebMvcConfigurer,添加拦截器使其生效

    @Configuration
    public class WebConfig implements WebMvcConfigurer {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(noRepeatInterceptor())
                    .addPathPatterns("/**");
        }
    
        @Bean
        public NoRepeatInterceptor noRepeatInterceptor() {
            return new NoRepeatInterceptor(redisTemplate);
        }
    }
    

AOP 实现

  1. 创建防重复提交 AOP

    需要注释掉 WebMvcConfigurer 中添加生效的拦截器

    @Aspect
    @Component
    public class NoRepeatCommitAop {
        private static final Logger logger = LogManager.getLogger(NoRepeatCommitAop.class);
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Pointcut("execution(* com.springboot.norepeat.commit.controller..*(..)) && @annotation(noRepeatCommit)")
        public void pointcut(NoRepeatCommit noRepeatCommit) {
        }
    
        @Before("pointcut(noRepeatCommit)")
        public void before(JoinPoint joinPoint, NoRepeatCommit noRepeatCommit) throws IOException {
    
            //获取RequestAttributes
            RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
            //获取HttpServletRequest
            HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest();
            //获取HttpServletResponse
            HttpServletResponse response = ((ServletRequestAttributes) requestAttributes).getResponse();
    
            String noRepeatToken = request.getHeader(ConstantParm.TOKEN_PRE);
            if (null != noRepeatToken) {
                String key = ConstantParm.TOKEN_PRE + ":" + noRepeatToken;
    
                //使用 Redis Lua 脚本实现判断和删除属于同一个原子操作
                long result = redisToken(key);
                if(result != 1L){
                    response.setCharacterEncoding("UTF-8");
                    response.setContentType("application/json;charset=UTF-8");
                    ServletOutputStream output = response.getOutputStream();
                    output.write("{\"msg\":\"请不要重复提交\"}".getBytes("UTF-8"));
                    output.flush();
                    output.close();
                }
    
                //使用同步锁防止并发,判断和删除同时只能有一个线程操作
                /*synchronized (this) {
                    boolean exist = redisTemplate.hasKey(key);
                    if (exist) {
                        redisTemplate.delete(key);
                    } else {
                        response.setCharacterEncoding("UTF-8");
                        response.setContentType("application/json;charset=UTF-8");
                        ServletOutputStream output = response.getOutputStream();
                        output.write("{\"msg\":\"请不要重复提交\"}".getBytes("UTF-8"));
                        output.flush();
                        output.close();
                    }
                }*/
    
                //不使用同步锁,存在并发问题,多线程可能都判断得到 key 存在
                /*boolean exist = redisTemplate.hasKey(key);
                if (exist) {
                    redisTemplate.delete(key);
                } else {
                    response.setCharacterEncoding("UTF-8");
                    response.setContentType("application/json;charset=UTF-8");
                    ServletOutputStream output = response.getOutputStream();
                    output.write("{\"msg\":\"请不要重复提交\"}".getBytes("UTF-8"));
                    output.flush();
                    output.close();
                }*/
    
            } else {
                response.setCharacterEncoding("UTF-8");
                response.setContentType("application/json;charset=UTF-8");
                ServletOutputStream output = response.getOutputStream();
                output.write("{\"msg\":\"请重新初始化表单\"}".getBytes("UTF-8"));
                output.flush();
                output.close();
            }
        }
    
        private long redisToken(String key) {
            String luaScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
            RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
            List<String> keyList = new ArrayList<>();
            keyList.add(key);
            long result = redisTemplate.execute(redisScript, keyList, "");
            return result;
        }
    }
    

多线程测试

  1. 另创建了一个测试工程,每次测试分别请求获取并替换 Token

    @Component
    public class RunTest implements ApplicationRunner {
    
        private static final Logger logger = LogManager.getLogger(RunTest.class);
    
        @Autowired
        private RestTemplate restTemplate;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("执行多线程测试");
            String url="http://localhost:8080/user/add";
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ExecutorService executorService = Executors.newFixedThreadPool(10);
    
            for(int i=0; i<10; i++){
                String userId = "userId" + i;
                HttpEntity request = buildRequest(userId);
                executorService.submit(() -> {
                    try {
                        countDownLatch.await();
                        System.out.println("Thread:"+Thread.currentThread().getName()+", time:"+System.currentTimeMillis());
                        ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
                        System.out.println("Thread:"+Thread.currentThread().getName() + "," + response.getBody());
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            countDownLatch.countDown();
        }
    
        private HttpEntity buildRequest(String userId) {
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.set("NO_REPEAT_TOKEN", "7314dc3e99f54f408f0d63fc09c9335d");
            Map<String, Object> body = new HashMap<>();
            body.put("userId", userId);
            return new HttpEntity<>(body, headers);
        }
    }
    
  2. 测试结果,分别是无同步锁,有同步锁,有 Lua 脚本三种情况

    判断 Token 是否存在和删除分两步操作,即不使用同步锁和 Lua 脚本,存在并发问题,就存在重复提交问题。

    执行多线程测试
    Thread:pool-1-thread-10,{"code":200,"msg":"SUCCESS","data":{"id":1578554714229,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-3,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-7,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-9,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-2,{"code":200,"msg":"SUCCESS","data":{"id":1578554714287,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-8,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-4,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-5,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-1,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-6,{"code":200,"msg":"SUCCESS","data":{"id":1578554714286,"username":"admin","password":"123456"}}
    

    使用同步锁防止并发,判断和删除同时只能有一个线程操作,即只能有一个线程完成提交,其它线程属重复提交。

    执行多线程测试
    Thread:pool-1-thread-6,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-5,{"code":200,"msg":"SUCCESS","data":{"id":1578554831026,"username":"admin","password":"123456"}}
    Thread:pool-1-thread-1,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-2,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-9,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-4,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-8,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-7,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-3,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-10,{"msg":"请不要重复提交"}
    

    使用 Redis Lua 脚本实现判断和删除属于同一个原子操作,即同时只能有一个线程操作,解决并发重复提交的问题。

    执行多线程测试
    Thread:pool-1-thread-2,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-4,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-5,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-10,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-9,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-6,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-1,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-3,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-7,{"msg":"请不要重复提交"}
    Thread:pool-1-thread-8,{"code":200,"msg":"SUCCESS","data":{"id":1578555025801,"username":"admin","password":"123456"}}
    

无 Token 方案

在每次请求拦截时生成 Key 存放到 Redis,设置过期时间。不需要将此 Key 做为 Token 传给前端,前端再回传。

重点是如何识别为重复提交,即 Key 的生成规则必须能表示同一个用户的请求存在重复,可以将请求路径+IP+用户名再 Hash 作为 Key。

实现

例如,要求 3 秒之内不能出现重复提交。

  1. 定义防重复提交注解

    /**
     * 避免重提交
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface AvoidRepeatableCommit {
        long timeout() default 5;
    }
    
  2. 定义 AOP 切面

    @Aspect
    @Component
    @EnableAspectJAutoProxy(exposeProxy = true)
    public class AvoidRepeatableCommitAspect {
    
        @Autowired
        HttpServletRequest request; //这里可以获取到request
    
        /**
         * @param point
         */
        @Around("@annotation(com.springboot.repeatcommit.common.config.AvoidRepeatableCommit)")
        public Object around(ProceedingJoinPoint point) throws Throwable {
    
            String ip = IPUtil.getIPAddress(request);
            //获取注解
            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();
            //目标类、方法
            String className = method.getDeclaringClass().getName();
            String methodName = method.getName();
            String ipKey = String.format("%s#%s", className, methodName);
            int hashCode = Math.abs(ipKey.hashCode());
            //重点是此 Key 生成规则
            String key = String.format("%s_%d", ip, hashCode);
    //        log.info("ipKey={},hashCode={},key={}",ipKey,hashCode,key);
            AvoidRepeatableCommit avoidRepeatableCommit = method.getAnnotation(AvoidRepeatableCommit.class);
            long timeout = avoidRepeatableCommit.timeout();
            if (timeout < 0L) {
                timeout = 5L;
            }
            //用多参数set方法保证对redis操作原子性
            boolean result = RedisUtil.setnxAndExpire(key, UUID.randomUUID().toString(), timeout * 1000);
            if (!result) {
                HashMap<String, Object> resultMap = new HashMap<>(5);
                resultMap.put("errCode", 10001);
                resultMap.put("errMsg", "请勿重复提交");
                return JSON.toJSONString(resultMap);
            }
            //执行方法
            Object object = point.proceed();
            return object;
        }
    }
    
  3. RedisUtil.java

    @Component
    public class RedisUtil {
    
        private static RedisTemplate redisTemplate;
    
        public RedisUtil(RedisTemplate redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        public static boolean setnxAndExpire(final String key, String value, long milliseconds) {
            Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, milliseconds, TimeUnit.MILLISECONDS);
            if(null != result && result){
                return result;
            }
            return false;
        }
    }
    
  4. Controller 方法

    @RestController
    @RequestMapping(value = "/repeat")
    public class RepeatCommit {
    
        //重复提交测试
        @RequestMapping(value = "formCommit")
        @AvoidRepeatableCommit(timeout = 3)
        public String testCommit(HttpServletRequest request) {
            Map<String, Object> resultMap = new HashMap<String, Object>();
            try {
                resultMap.put("success", true);
            } catch (Exception e) {
                e.printStackTrace();
                resultMap.put("success", false);
            }
            return JSON.toJSONString(resultMap);
        }
    }
    

测试

执行多线程测试
Thread:pool-1-thread-6,{"success":true}
Thread:pool-1-thread-8,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-7,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-10,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-5,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-4,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-3,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-9,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-2,{"errMsg":"请勿重复提交","errCode":10001}
Thread:pool-1-thread-1,{"errMsg":"请勿重复提交","errCode":10001}

分布式锁方案

思路

使用 AOP 或 拦截器执行前置拦截

  1. 服务器 A:如果有注解,则去加锁,加锁返回成功后,执行提交表单业务。
  2. 服务器 B:接收到相同的请求,加锁失败,表示为重复提交,返回异常。

非常重要的一点是生成分布式锁的 Key,因服务器 A 和 服务器 B 接收到的是完全相同的请求,可以对请求的信息进行 HASH 或 MD5 运算,将运算的值做为分布式锁的 Key。

实现

参考 分布式微服务应用系列(十一):分布式锁理解及实现方案

其它参考

  1. Spring Boot使用AOP防止重复提交的方法示例
  2. Redis 防表单重复提交
  3. 分布式系统后台如何防止重复提交
  4. 防重复提交 Token 设计
  5. 分布式重复提交相关文章
  6. 分布式服务防重复提交方案
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: