基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。
Redisson DelayedQueue
基于 Redisson 的最佳实践应用 Redisson 实现了延时队列(Delayed Queue )功能,可以直接拿来使用。
涉及三个队列,一个发布订阅通道,获取延时元素是通过阻塞队列实现的,发布订阅通道没看出具体的作用。
- 阻塞队列 List:KEY =
queueName
,执行BLPOP
命令从左端弹出元素,右端插入元素。 - 有序集合 Sorted Set:KEY =
redisson_delay_queue_timeout:{queueName}
,score 是元素的过期时间,按从小到大排序,
过期时间小于当前时间表示已过期,删除集合中的元素,并将元素添加到阻塞队列。 - 普通集合 List:KEY =
redisson_delay_queue:{DelayMessage}
,按顺序从右端添加元素,元素过期会被删除。 - 发布/订阅通道:
redisson_delay_queue_channel
,目前没看出具体用途。
应用示例
基于 Spring Boot + redisson-spring-boot-starter 实现示例
添加依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.1</version>
</dependency>
Redisson 的 starter 包自动配置默认注册了 RedisTemplate Bean,RedissonConnectionFactory Bean,RedissonClient Bean。
延时队列管理
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @desc 延时队列管理
*/
@Component
public class RedisDelayedQueueManager {
@Autowired
RedissonClient redissonClient;
/**
* 添加元素到延时队列
*
* @param t 队列成员
* @param delay 延时时间
* @param timeUnit 时间单位
* @param queueName 队列名称
* @param <T> 泛型
*/
public <T> void add(T t, long delay, TimeUnit timeUnit, String queueName) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
delayedQueue.destroy();
}
/**
* 获取元素并删除
*
* @param queueName 队列名称
* @param delayedTaskListener 延时任务监听器
* @param <T> 泛型
*/
public <T> void take(String queueName, DelayedTaskListener delayedTaskListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
while (true) {
try {
delayedTaskListener.invoke(blockingFairQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//由于此线程需要常驻,可以新建线程,不用交给线程池管理
/*((Runnable) () -> {
while (true) {
try {
Thread.sleep(1000);
T t = blockingFairQueue.take();
delayedTaskListener.invoke(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).run();*/
}
}
延时队列监听器
监听器抽象接口:
/**
* 延时任务监听器
*/
public interface DelayedTaskListener<T> {
void invoke(T t);
}
具体监听器实现:
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @desc 实现 CommandLineRunner 接口中的 run 方法,随应用启动而运行
*/
@Component
public class RedisDelayQueueListener implements CommandLineRunner {
private static final Logger logger = LogManager.getLogger(RedisDelayQueueListener.class);
@Autowired
RedisDelayedQueueManager redisDelayedQueueManager;
@Autowired
private RedissonClient redissonClient;
@Override
public void run(String... args) throws Exception {
logger.info("===============延时队列监听器启动==============");
//监听延迟队列
DelayedTaskListener<String> delayedTaskListener = new DelayedTaskListener<String>() {
@Override
public void invoke(String delayMessage) {
//这里调用你延迟之后的代码,在这里执行业务处理
System.out.println(delayMessage);
}
};
redisDelayedQueueManager.take("DelayMessage", delayedTaskListener);
}
}
添加延时任务
package com.delay.queue.controller;
import com.delay.queue.redisson.DelayMessage;
import com.delay.queue.redisson.RedisDelayedQueueManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/delayed")
public class DelayedTaskController {
@Autowired
private RedisDelayedQueueManager redisDelayedQueueManager;
@RequestMapping("/message")
public void addDelayedMessage(Integer delay, String value) {
redisDelayedQueueManager.add(value, delay, TimeUnit.SECONDS, "DelayMessage");
}
}
快速发送两条请求:
发送请求1:http://localhost:8080/delayed/message?delay=20&value=AAAA
发送请求2:http://localhost:8080/delayed/message?delay=5&value=BBBB
监听结果
BBBB
AAAA
源码分析
Redisson 入队操作使用是的 Lua 脚本,Redis 底层解析是会开启事务来执行脚本中的多条命令,可以确保原子性。
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
private final QueueTransferService queueTransferService;
private final String channelName;
private final String queueName;
private final String timeoutSetName;
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
// 订阅通道名前缀
channelName = prefixName("redisson_delay_queue_channel", getName());
// 延时队列 List 前缀
queueName = prefixName("redisson_delay_queue", getName());
// 延时队列过期排序 Sorted Set 的 KEY 前缀
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
// 创建一个订阅作为监听器
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);//默认取 100条
}
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
// 执行任务
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
@Override
public void offer(V e, long delay, TimeUnit timeUnit) {
// 添加元素
get(offerAsync(e, delay, timeUnit));
}
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
// 延时时间转换为毫秒值
long delayInMs = timeUnit.toMillis(delay);
// 超时时间=当前时间毫秒值 + 延时时间毫秒值
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
}
RedissonDelayedQueue构造方法
添加元素,会先创建 RedissonDelayedQueue
实例,执行构造方法中的 Lua 脚本,目的是对已过期但未处理的任务进行处理。
Lua 脚本
-- 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据(已过期的数据)
local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]);
if #expiredValues > 0
then
-- 循环遍历
for i, v in ipairs(expiredValues)
do local randomId, value = struct.unpack('dLc0', v);
-- 加入到阻塞队列 queueName
redis.call('rpush', KEYS[1], value);
-- 删除 redisson_delay_queue:{queueName} 队列(List)该元素
redis.call('lrem', KEYS[3], 1, v);
end;
-- 删除 Sorted Set 中的该元素
redis.call('zrem', KEYS[2], unpack(expiredValues));
end;
local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES');
if v[1] ~= nil
then return v[2];
end
return nil;
脚本参数示例
KEY "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}"
VALUE "1615773796322" "100"
脚本转为执行的Redis命令示例
# 从 Sorted Set 按score从小到大排序,拿出小于当前时间的 100 条数据
1615773628.031731 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615773796322" "limit" "0" "100"
# 取出元 Sorted Set 中 score 最小的元素返回
1615773628.031752 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
offerAsync添加元素
执行添加元素操作,执行 Lua 脚本,在 Redis 解析 Lua 脚本转换为 Redis 命令底层是开启事务执行多条命令。
Lua 脚本
local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);
-- 将元素加入到 Sorted Set, KEY 为 redisson_delay_queue_timeout:{queueName} 的延时队列, score 为延时时间
redis.call('zadd', KEYS[2], ARGV[1], value);
-- 将元素加入到 List, KEY 为 redisson_delay_queue:{queueName}, 从右侧加入, 这是个普通 List,
redis.call('rpush', KEYS[3], value);
-- 从 Sorted Set KEY 为 redisson_delay_queue_timeout:{queueName} 中从小到大排序, 取出第一个元素
local v = redis.call('zrange', KEYS[2], 0, 0);
if v[1] == value
then
-- 如果第一个元素为当前新增的元素, 发布到 channel 为 redisson_delay_queue_channel:{queueName}
redis.call('publish', KEYS[4], ARGV[1]);
end;
脚本参数示例
KEYS "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}"
VALUES "1615773806323" "1493646126723946165" "\x04>\x04BBBB"
脚本转为执行的Reids命令示例
添加元素的 offerAsync 方法中的 Lua 脚本最终执行的 Redis 命令如下:
# 添加元素添加到 KEY 为 redisson_delay_queue_timeout:{DelayMessage} 的 Sorted Set 有序队列中
1615773628.032423 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615773806323" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 将元素添加到 KEY 为 redisson_delay_queue:{DelayMessage} 的 普通 List 中,右侧插入
1615773628.032463 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\xd7\x84\x13=\x7f\xba\xb4C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
# 从 Sorted Set 按 socre 从小到大排序, 取第一个元素
1615773628.032491 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
# 如果从 Sorted Set 拿到的元素即为本次添加的元素,就将其发布到
1615773628.032510 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615773806323"
延时任务定时轮询
Redisson 的定时轮询是基于 Netty 的 时间轮(HashedWheelTimeout
) 实现的。轮询到过期的元素,则将期加入到阻塞队列,删除 Sorted Set 和 List 中的元素。
Netty 的 时间轮后续再开一篇文章详细描述。
底层逻辑
Redisson 的延时队列是基于 Redis 的 Sorted Set
和 List
+ Blocking List
实现的,底层是基于 Redis 的事务和相关命令实现。参考 【附文一】。
使用 redis-client
登录 Redis 服务,执行 monitor
命令开启监控,输出打印见 【附文二】。
添加元素
执行
MULTI
开启事务。执行
zdd
命令将元素加入到一个 KEY 为redisson_delay_queue_timeout:{queueName}
的Sorted Set
,权重
score
是当前时间 + 延迟时间
,Sorted Set 是一个按score
排序的有序集合。执行
rpush
命令将元素将入到一个 KEY 为redisson_delay_queue:{queueName}
的普通List
。该 List 是按插入的顺序保存元素。
执行
EXEC
命令提交。
消费元素
客户端随应该启动就运行,
Redisson 客户端会订阅一个 KEY 为
queueName
的Blocking List
(阻塞队列 )。使用
zrange
命令获取Sorted Set
中的第一个元素,如果小于等于当前时间,表示已过期。执行
MULTI
开启事务。拿到过期的元素,执行
rpush
命令,将元素插入到Blocking List
(阻塞队列)最右端。执行
lrem
命令从 KEY 为redisson_delay_queue:{queueName}
的普通 List 中删除元素。执行
zrem
命令从 KEY 为redisson_delay_queue_timeout:{DelayMessage}
的Sorted Set
中删除元素。执行
EXEC
命令提交。最后执行
LPOP
命令 从Blocking List
(阻塞队列)最左端弹出元素完成消费。如果阻塞队列中元素未被消费则不会消失,直到消费完。
相关参考
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
附文一
Redisson 延时队列执行 Redis 命令的 AOF 日志
$6
SELECT
$1
4
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696477945
$23
???薈 >AAAA
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
???薈 >AAAA
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696543300
$23
3j澁C >BBBB
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
3j澁C >BBBB
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696458376
$23
囤NM褚? >CCCC
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
囤NM褚? >CCCC
*1
$4
EXEC
*1
$5
MULTI
*4
$4
zadd
$43
redisson_delay_queue_timeout:{DelayMessage}
$13
1615696526357
$23
:跜灿#幻 >DDDD
*3
$5
rpush
$35
redisson_delay_queue:{DelayMessage}
$23
:跜灿#幻 >DDDD
*1
$4
EXEC
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>CCCC
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
囤NM褚? >CCCC
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
囤NM褚? >CCCC
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>AAAA
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
???薈 >AAAA
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
???薈 >AAAA
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>DDDD
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
:跜灿#幻 >DDDD
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
:跜灿#幻 >DDDD
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
*1
$5
MULTI
*3
$5
rpush
$12
DelayMessage
$7
>BBBB
*4
$4
lrem
$35
redisson_delay_queue:{DelayMessage}
$1
1
$23
3j澁C >BBBB
*3
$4
zrem
$43
redisson_delay_queue_timeout:{DelayMessage}
$23
3j澁C >BBBB
*1
$4
EXEC
*2
$4
LPOP
$12
DelayMessage
附文二
使用 redis-client
登录 Redis 服务,执行 monitor
监控。
添加两个元素到延时队列,Redis 监控命令
1615736035.403628 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736035.420899 [4 120.229.16.239:34562] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736037375" "100"
1615736035.420986 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736037375" "limit" "0" "100"
1615736035.421000 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736035.460491 [4 120.229.16.239:34557] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379" "1732831665336717572" "\x04>\x04AAAA"
1615736035.460573 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736097379" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460593 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x91\xf1\xfb7A\x0c\xb8C\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736035.460617 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736035.460626 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736097379"
1615736035.479613 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
============================================================================
1615736039.976610 [4 120.229.16.239:34559] "SUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
1615736039.991013 [4 120.229.16.239:34561] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736041946" "100"
1615736039.991096 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736041946" "limit" "0" "100"
1615736039.991118 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736039.996328 [4 120.229.16.239:34566] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948" "-139067833826375740" "\x04>\x04BBBB"
1615736039.996401 [4 lua] "zadd" "redisson_delay_queue_timeout:{DelayMessage}" "1615736061948" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996419 [4 lua] "rpush" "redisson_delay_queue:{DelayMessage}" "\x84w.T\x17\xe1~\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736039.996431 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0"
1615736039.996438 [4 lua] "publish" "redisson_delay_queue_channel:{DelayMessage}" "1615736061948"
1615736040.015530 [4 120.229.16.239:34559] "UNSUBSCRIBE" "redisson_delay_queue_channel:{DelayMessage}"
过期监听处理的Redis命令
1615736443.935211 [4 120.229.16.239:34795] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736445889" "100"
1615736443.935305 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736445889" "limit" "0" "100"
1615736443.935335 [4 lua] "rpush" "DelayMessage" "\x04>\x04BBBB"
1615736443.935348 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935361 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x19\xef\xa5\x0c9>\xcb\xc3\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04BBBB"
1615736443.935373 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
1615736491.636923 [4 120.229.16.239:34818] "PING"
===========================================================================
1615736491.637112 [4 120.229.16.239:34819] "PING"
1615736497.234518 [4 120.229.16.239:34797] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "DelayMessage" "redisson_delay_queue_timeout:{DelayMessage}" "redisson_delay_queue:{DelayMessage}" "1615736499189" "100"
1615736497.234599 [4 lua] "zrangebyscore" "redisson_delay_queue_timeout:{DelayMessage}" "0" "1615736499189" "limit" "0" "100"
1615736497.234621 [4 lua] "rpush" "DelayMessage" "\x04>\x04AAAA"
1615736497.234630 [4 lua] "lrem" "redisson_delay_queue:{DelayMessage}" "1" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234641 [4 lua] "zrem" "redisson_delay_queue_timeout:{DelayMessage}" "\x96Vrr[\x1b\xdaC\a\x00\x00\x00\x00\x00\x00\x00\x04>\x04AAAA"
1615736497.234651 [4 lua] "zrange" "redisson_delay_queue_timeout:{DelayMessage}" "0" "0" "WITHSCORES"
注意:本文归作者所有,未经作者允许,不得转载