微服务应用(十九):延时队列之RedissonDelayedQueue实现

star2017 1年前 ⋅ 929 阅读

基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。

Redisson DelayedQueue

基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。

涉及三个队列,一个发布订阅通道,获取延时元素是通过阻塞队列实现的,发布订阅通道没看出具体的作用。

  • 阻塞队列 List:KEY = queueName,执行 BLPOP 命令从左端弹出元素,右端插入元素。
  • 有序集合 Sorted Set:KEY = redisson_delay_queue_timeout:{queueName},score 是元素的过期时间,按从小到大排序,
    过期时间小于当前时间表示已过期,删除集合中的元素,并将元素添加到阻塞队列。
  • 普通集合 List:KEY = redisson_delay_queue:{DelayMessage},按顺序从右端添加元素,元素过期会被删除。
  • 发布/订阅通道:redisson_delay_queue_channel,目前没看出具体用途。

应用示例

基于 Spring Boot + redisson-spring-boot-starter 实现示例

添加依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.1</version>
</dependency>

Redisson 的 starter 包自动配置默认注册了 RedisTemplate Bean,RedissonConnectionFactory Bean,RedissonClient Bean。

延时队列管理

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @desc 延时队列管理
 */
@Component
public class RedisDelayedQueueManager {

    @Autowired
    RedissonClient redissonClient;

    /**
     * 添加元素到延时队列
     *
     * @param t         队列成员
     * @param delay     延时时间
     * @param timeUnit  时间单位
     * @param queueName 队列名称
     * @param <T>       泛型
     */
    public <T> void add(T t, long delay, TimeUnit timeUnit, String queueName) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
        delayedQueue.destroy();
    }

    /**
     * 获取元素并删除
     *
     * @param queueName           队列名称
     * @param delayedTaskListener 延时任务监听器
     * @param <T>                 泛型
     */
    public <T> void take(String queueName, DelayedTaskListener delayedTaskListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        while (true) {
            try {
                delayedTaskListener.invoke(blockingFairQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        /*((Runnable) () -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    T t = blockingFairQueue.take();
                    delayedTaskListener.invoke(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).run();*/
    }
}

延时队列监听器

监听器抽象接口:

/**
 * 延时任务监听器
 */
public interface DelayedTaskListener<T> {

    void invoke(T t);
}

具体监听器实现:

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * @desc 实现 CommandLineRunner 接口中的 run 方法,随应用启动而运行
 */
@Component
public class RedisDelayQueueListener implements CommandLineRunner {
    private static final Logger logger = LogManager.getLogger(RedisDelayQueueListener.class);

    @Autowired
    RedisDelayedQueueManager redisDelayedQueueManager;
    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void run(String... args) throws Exception {
        logger.info("===============延时队列监听器启动==============");
        //监听延迟队列
        DelayedTaskListener<String> delayedTaskListener = new DelayedTaskListener<String>() {
            @Override
            public void invoke(String delayMessage) {
                //这里调用你延迟之后的代码,在这里执行业务处理
                System.out.println(delayMessage);
            }
        };
        redisDelayedQueueManager.take("DelayMessage", delayedTaskListener);
    }
}

添加延时任务

package com.delay.queue.controller;

import com.delay.queue.redisson.DelayMessage;
import com.delay.queue.redisson.RedisDelayedQueueManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/delayed")
public class DelayedTaskController {

    @Autowired
    private RedisDelayedQueueManager redisDelayedQueueManager;

    @RequestMapping("/message")
    public void addDelayedMessage(Integer delay, String value) {
        redisDelayedQueueManager.add(value, delay, TimeUnit.SECONDS, "DelayMessage");
    }
}

快速发送两条请求:

发送请求1:http://localhost:8080/delayed/message?delay=20&value=AAAA
发送请求2:http://localhost:8080/delayed/message?delay=5&value=BBBB

监听结果

BBBB
AAAA

源码分析

Redisson 入队操作使用是的 Lua 脚本,Redis 底层解析是会开启事务来执行脚本中的多条命令,可以确保原子性。

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {

    private final QueueTransferService queueTransferService;
    private final String channelName;
    private final String queueName;
    private final String timeoutSetName;

    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        // 订阅通道名前缀
        channelName = prefixName("redisson_delay_queue_channel", getName());
        // 延时队列 List 前缀
        queueName = prefixName("redisson_delay_queue", getName());
        // 延时队列过期排序 Sorted Set 的 KEY 前缀
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
        // 创建一个订阅作为监听器
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {

            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);//默认取 100条
            }

            @Override
            protected RTopic getTopic() {
                return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        // 执行任务
        queueTransferService.schedule(queueName, task);

        this.queueTransferService = queueTransferService;
    }

    @Override
    public void offer(V e, long delay, TimeUnit timeUnit) {
        // 添加元素
        get(offerAsync(e, delay, timeUnit));
    }

    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        // 延时时间转换为毫秒值
        long delayInMs = timeUnit.toMillis(delay);
        // 超时时间=当前时间毫秒值 + 延时时间毫秒值
        long timeout = System.currentTimeMillis() + delayInMs;

        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }
}

RedissonDelayedQueue构造方法

添加元素,会先创建 RedissonDelayedQueue 实例,执行构造方法中的 Lua 脚本,目的是对已过期但未处理的任务进行处理

Lua 脚本

-- 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据(已过期的数据)
local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); 
if #expiredValues > 0 
    then 
    -- 循环遍历
    for i, v in ipairs(expiredValues)
    do local randomId, value = struct.unpack('dLc0', v);
        -- 加入到阻塞队列 queueName
        redis.call('rpush', KEYS[1], value);
        -- 删除 redisson_delay_queue:{queueName} 队列(List)该元素
        redis.call('lrem', KEYS[3], 1, v);
    end; 
    -- 删除 Sorted Set 中的该元素
    redis.call('zrem', KEYS[2], unpack(expiredValues));    
end; 

local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES');     
if v[1] ~= nil 
    then return v[2];     
end 
return nil;

脚本参数示例

KEY "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" 
VALUE "1615773796322" "100"

脚本转为执行的Redis命令示例

# 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据
1615773628.031731 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615773796322" "limit" "0" "100"
# 取出元 Sorted Set 中 score 最小的元素返回
1615773628.031752 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"

offerAsync添加元素

执行添加元素操作,执行 Lua 脚本,在 Redis 解析 Lua 脚本转换为 Redis 命令底层是开启事务执行多条命令。

Lua 脚本

local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);
-- 将元素加入到 Sorted Set, KEY 为 redisson_delay_queue_timeout:{queueName} 的延时队列, score 为延时时间
redis.call('zadd', KEYS[2], ARGV[1], value);
-- 将元素加入到 List, KEY 为 redisson_delay_queue:{queueName}, 从右侧加入, 这是个普通 List,
redis.call('rpush', KEYS[3], value);
-- 从 Sorted Set KEY 为 redisson_delay_queue_timeout:{queueName} 中从小到大排序, 取出第一个元素
local v = redis.call('zrange', KEYS[2], 0, 0); 

if v[1] == value 
    then 
    -- 如果第一个元素为当前新增的元素, 发布到 channel 为 redisson_delay_queue_channel:{queueName}
    redis.call('publish', KEYS[4], ARGV[1]); 
end;

脚本参数示例

KEYS "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" 

VALUES "1615773806323" "1493646126723946165" "\x04>\x04BBBB"

脚本转为执行的Reids命令示例

添加元素的 offerAsync 方法中的 Lua 脚本最终执行的 Redis 命令如下:

# 添加元素添加到 KEY 为 redisson_delay_queue_timeout:{DelayMessage} 的 Sorted Set 有序队列中
1615773628.032423 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615773806323" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 将元素添加到 KEY 为 redisson_delay_queue:{DelayMessage} 的 普通 List 中,右侧插入
1615773628.032463 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 从 Sorted Set 按 socre 从小到大排序, 取第一个元素
1615773628.032491 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
# 如果从 Sorted Set 拿到的元素即为本次添加的元素,就将其发布到
1615773628.032510 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615773806323"

延时任务定时轮询

Redisson 的定时轮询是基于 Netty 的 时间轮(HashedWheelTimeout) 实现的。轮询到过期的元素,则将期加入到阻塞队列,删除 Sorted Set 和 List 中的元素。

Netty 的 时间轮后续再开一篇文章详细描述。

底层逻辑

Redisson 的延时队列是基于 Redis 的 Sorted SetList + Blocking List 实现的,底层是基于 Redis 的事务和相关命令实现。参考 【附文一】。

使用 redis-client登录 Redis 服务,执行 monitor 命令开启监控,输出打印见 【附文二】。

添加元素

  1. 执行 MULTI开启事务。

  2. 执行 zdd 命令将元素加入到一个 KEY 为 redisson_delay_queue_timeout:{queueName}Sorted Set

    权重 score当前时间 + 延迟时间,Sorted Set 是一个按 score 排序的有序集合。

  3. 执行 rpush命令将元素将入到一个 KEY 为 redisson_delay_queue:{queueName}的普通 List

    该 List 是按插入的顺序保存元素。

  4. 执行 EXEC 命令提交。

消费元素

客户端随应该启动就运行,

  1. Redisson 客户端会订阅一个 KEY 为 queueNameBlocking List(阻塞队列 )。

  2. 使用 zrange命令获取 Sorted Set中的第一个元素,如果小于等于当前时间,表示已过期。

  3. 执行 MULTI开启事务。

  4. 拿到过期的元素,执行 rpush 命令,将元素插入到 Blocking List (阻塞队列)最右端。

  5. 执行 lrem 命令从 KEY 为 redisson_delay_queue:{queueName}的普通 List 中删除元素。

  6. 执行 zrem命令从 KEY 为 redisson_delay_queue_timeout:{DelayMessage}Sorted Set 中删除元素。

  7. 执行 EXEC 命令提交。

  8. 最后执行 LPOP 命令 从 Blocking List (阻塞队列)最左端弹出元素完成消费。

    如果阻塞队列中元素未被消费则不会消失,直到消费完。

相关参考

  1. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  2. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  3. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  4. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  5. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  6. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  7. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  8. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  9. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  10. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  11. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。

附文一

Redisson 延时队列执行 Redis 命令的 AOF 日志

$6
SELECT
$1
4
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696477945
$23
???薈       >AAAA
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
???薈       >AAAA
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696543300
$23
3j澁C       >BBBB
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
3j澁C       >BBBB
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696458376
$23
囤NM褚?       >CCCC
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
囤NM褚?       >CCCC
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696526357
$23
:跜灿#幻       >DDDD
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
:跜灿#幻       >DDDD
*1
$4
EXEC
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>CCCC
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
囤NM褚?       >CCCC
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
囤NM褚?       >CCCC
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>AAAA
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
???薈       >AAAA
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
???薈       >AAAA
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>DDDD
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
:跜灿#幻       >DDDD
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
:跜灿#幻       >DDDD
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>BBBB
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
3j澁C       >BBBB
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
3j澁C       >BBBB
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage

附文二

使用 redis-client登录 Redis 服务,执行 monitor 监控。

添加两个元素到延时队列,Redis 监控命令

1615736035.403628 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736035.420899 [4 120.229.16.239:34562] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736037375" "100"
1615736035.420986 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736037375" "limit" "0" "100"
1615736035.421000 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736035.460491 [4 120.229.16.239:34557] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379" "1732831665336717572" "\x04>\x04AAAA"
1615736035.460573 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736097379" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460593 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460617 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736035.460626 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379"
1615736035.479613 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
============================================================================
1615736039.976610 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736039.991013 [4 120.229.16.239:34561] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736041946" "100"
1615736039.991096 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736041946" "limit" "0" "100"
1615736039.991118 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736039.996328 [4 120.229.16.239:34566] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948" "-139067833826375740" "\x04>\x04BBBB"
1615736039.996401 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736061948" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996419 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996431 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736039.996438 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948"
1615736040.015530 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"

过期监听处理的Redis命令

1615736443.935211 [4 120.229.16.239:34795] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736445889" "100"
1615736443.935305 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736445889" "limit" "0" "100"
1615736443.935335 [4 lua] "rpush" "DelayMessage" "\x04>\x04BBBB"
1615736443.935348 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935361 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935373 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736491.636923 [4 120.229.16.239:34818] "PING"
===========================================================================
1615736491.637112 [4 120.229.16.239:34819] "PING"
1615736497.234518 [4 120.229.16.239:34797] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736499189" "100"
1615736497.234599 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736499189" "limit" "0" "100"
1615736497.234621 [4 lua] "rpush" "DelayMessage" "\x04>\x04AAAA"
1615736497.234630 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234641 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234651 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: