Redis 从 2.8.0 版本开始提供了 键空间通知 特性。键空间通知允许客户端订阅发布/订阅通道,以便接收以某种方式影响Redis 数据集的事件。
可以基于 Redis 的键空间通知中的 键过期事件通知来实现延时队列的功能。
键空间通知
Redis 从 2.8.0 版本开始提供了 键空间通知 特性。键空间通知允许客户端订阅发布/订阅通道,以便接收以某种方式影响Redis 数据集的事件。
可以接收的事件的示例有:
- 所有影响指定 KEY 的命令。
- 所有 KEY 都接受 LPUSH 操作。
- 所有 KEY 都在 db0 库中过期。
事件是使用 Redis 常规的 Pub/Sub 层传递的,因此实现 Pub/Sub 的客户端都可以使用此功能,无需额外修改。
因为 Redis 的 Pub/Sub 是 即发即忘弃(fire and forget) 模式,所以如果应用程序要求可靠的事件通知,该功能就无法满足就不能使用了。也就是说,如果 Pub/Sub 客户端断开连接,然后重新连接,那么客户端断开连接期间传递的所有事件都将丢失。
Redis 键空间特性为实现延时队列提供了一种思路,即当键过期时,Redis 发送回调通知,客户端订阅通知执行业务处理。
因 Redis 的键空间基于 Pub/Sub 实现,是 即发即弃 的,且会丢失客户端连接断开重连期间的数据,所以不推荐将此功能用于实现延时队列。
Redis命令实现
Redis 开启键空间通知功能,两种方式:
- redis-client 登录后,通过命令设置开启:
config set notify-keyspace-events Ex
,服务重启后失效。 - 编辑配置文件
redis.conf
,设置notify-keyspace-events "Ex"
,重启 Redis 服务。
- redis-client 登录后,通过命令设置开启:
开启一个客户端订阅键空间通知事件。注意:
_keyevent@0
前后是两个下划线。psubscribe __keyevent@*__:*
:@
后面的第一个*
位置表示 Redis 的 0-15个数据库索引,*
表示所有库,可以指定索引; 第二*
的位置表示要监听的事件,*
表示所有事件,如果要指定过期事件,可以设置为expired
。psubscribe __keyevent@0__:expired
:表示订阅0
号db
的 键过期事件。[root@localhost bin]# ./redis-cli -h 192.168.50.128 192.168.50.128:6379> psubscribe __keyevent@0__:expired Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__keyevent@0__:expired" 3) (integer) 1
再开启一个客户端设置健值和过期时间
192.168.50.128:6379> setex name 10 Kitty OK
订阅键空间通知的客户端输出如下信息,表示订阅键空间通知成功
1) "pmessage" 2) "__keyevent@0__:expired" 3) "__keyevent@0__:expired" 4) "name"
实现逻辑
Redis 的键空间通知功能,回调通知传的是具体的 key
,是不包含value
的,所以如果使用监听过期事件来实现延时队列 ,key
和 key/value
都得存一份。
- 存一份
key
,设置过期时间,相当于过期队列。 - 存一份
key/vlue
。 - 客户端订阅键过期通知,在收到
key
的过期通知后,根据key
查询key/value
拿到值处理业务。 - 业务处理成功就删除任务;处理失败则将
key
重新加入过期队列,等待下次过期重新处理。
注意:
- KEY 过期通知事件是在 KEY 被删除时触发的,只能获取 KEY,不能获取 VALUE,因此 KEY 理应包含所需的信息,或通过 KEY 去查询需要的信息。
- Redis 键空间通知是针对的所有键,在定义 KEY 时可以给 KEY 设置前缀以标识所属业务,以区分需要监听的 KEY 和 其它不需要监听的。
- 在 KEY 过期前主动删除该 KEY 是不会触发过期通知事件的。例如,KEY 未到过期时间用户主动取消订单,则要删除该 KEY。
Spring Data Redis支持
spring-data-redis 提供了 KeyExpirationEventMessageListener
监听器,其继承自键空间监听器 KeyspaceEventMessageListener
,默认指定的监听事件是键过期事件
。
监听器,监听器容器,订阅主题相关类位于 org.springframework.data.redis.listener 包下。
监听器:KeyExpirationEventMessageListener
package org.springframework.data.redis.listener;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisKeyExpiredEvent;
import org.springframework.lang.Nullable;
/**
* 键过期的键空间通知
*/
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
// 默认监听过期事件:expired
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
// 监听到过期事件后还可以通过 ApplicationEventPublisher 发布
private @Nullable ApplicationEventPublisher publisher;
/**
* 构造方法,创建监听器
* 依赖监听器容器 RedisMessageListenerContainer
*/
public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 注册监听器
*/
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
/*
* 处理键过期事件通知,传入消息message, message实际只有 key
*/
@Override
protected void doHandleMessage(Message message) {
publishEvent(new RedisKeyExpiredEvent(message.getBody()));
}
/**
* 监听接收到过期事件, 通过 ApplicationEventPublisher 发布
*/
protected void publishEvent(RedisKeyExpiredEvent event) {
if (publisher != null) {
this.publisher.publishEvent(event);
}
}
/*
* 设置 ApplicationEventPublisher
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}
监听器容器:RedisMessageListenerContainer
RedisMessageListenerContainer 为 Redis 消息监听器提供异步行为的容器。处理监听、转换和消息调度的低级别细节。
与底层 Redis(每个订阅一个连接)相反,容器只使用一个连接,该连接对所有注册的监听器都是 多路复用 的,消息调度是通过任务执行器完成的。
注意:容器以延迟方式使用连接,即仅当至少配置了一个监听器时才使用连接。
订阅主题 Topic:
org.springframework.data.redis.listener 包下提供了 Topic 接口,其有两个实现:
- ChannelTopic:指定监听(订阅)的具体通道名(Redis Channel)
- PatternTopic:支持模式匹配监听(订阅)多个通道(Redis Channel)
实现示例
基于 Spring Boot + spring-boot-starter-data-redis 框架实现。
Redis 开启键空间通知功能
参考上面的 【Redis命令实现】章节中的内容。
创建监听器容器,并注册为 Bean
@Configuration public class RedisConfig { /** * 对象序列化 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer<>(Object.class); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(serializer); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(serializer); //开启事务支持 redisTemplate.setEnableTransactionSupport(true); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 注册Redis键空间事件监听容器 * * @param lettuceConnectionFactory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(LettuceConnectionFactory lettuceConnectionFactory) { RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); listenerContainer.setConnectionFactory(lettuceConnectionFactory); return listenerContainer; } }
在创建监听器容器实例时就可以给容器添加监听器,并绑定监听 Topic。
@Configuration public class RedisConfig { /** * 注册Redis键空间事件监听容器 * 添加监听器,并将监听器与监听 Topic 绑定 * @param lettuceConnectionFactory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(LettuceConnectionFactory lettuceConnectionFactory, RedisKeyExpireListener redisKeyExpireListener) { RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); listenerContainer.setConnectionFactory(lettuceConnectionFactory); listenerContainer.addMessageListener(redisKeyExpireListener, new PatternTopic("__keyevent@4__:expired")); return listenerContainer; } }
这样,在监听器里重写父类的
doRegister
方法就可以省略了。自定义监听器,继承
KeyExpirationEventMessageListener
。package com.delay.queue.common; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.stereotype.Component; /** * @desc Redis 键过期监听器 */ @Component public class RedisKeyExpireListener extends KeyExpirationEventMessageListener { // private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired"); // 自定义监听通道,指定db private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@4__:expired"); /** * 添加键过期监听器,注入监听容器 * * @param listenerContainer */ public RedisKeyExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 重写父类方法,将监听器与通道绑定并注册到监听容器 * * @param listenerContainer */ @Override public void doRegister(RedisMessageListenerContainer listenerContainer) { listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC); } /** * 处理监听到的消息 * * @param message */ @Override public void doHandleMessage(Message message) { String key = message.toString(); // 注意:Redis 键空间通知是针对库的所有键 // 在设置元素时可以给键设置前缀以标识所属业务 if (key.startsWith("DelayMessage")) { System.out.println("延时任务 Key:" + message.toString()); // 拿到 key 再到 Redis 获取任务并处理 // 处理成功, 删除任务 // 处理失败, 重新将 key 加入到过期队列中待下次处理 } } }
添加元素
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class DelayQueueApplicationTests { @Autowired private RedisTemplate<String, Object> redisTemplate; @Test void redisAddElement() { redisTemplate.opsForValue().set("DelayMessage:Task", "value", 10, TimeUnit.SECONDS); } }
监听结果输出
延时任务 Key:DelayMessage:Task
集群下监听
Redis 的键空间通知事件在客户端多实例集群部署的情况下会被所有实例监听,若不加处理就会出现重复消费的情况,这是不允许的。
因此在处理消息方法里需要使用分布式锁,防止同一个 KEY 被多个实例监听,重复处理任务。
增加分布式锁处理,注意:下面示例的分布式锁是不完整的,是有缺陷的,具体分布式锁实现可参考 微服务应用(八):分布式锁理解及Redis实现方案。
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
// 自定义监听通道,指定db
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@4__:expired");
/**
* 添加键过期监听器,注入监听容器
*
* @param listenerContainer
*/
public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 重写父类方法,将监听器与通道绑定并注册到监听容器
*
* @param listenerContainer
*/
@Override
public void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
/**
* 处理监听到的消息
*
* @param message
*/
@Override
public void doHandleMessage(Message message) {
String key = message.toString();
// 注意:Redis 键空间通知是针对库的所有键
// 在设置元素时可以给键设置前缀以标识所属业务
if (!key.startsWith("DelayOrder")) {
return;
}
String lockKey = "lock_" + key;
Boolean lock = lock(lockKey);
System.out.println(lock);
if (!lock) {
return;
}
System.out.println("延时任务 Key:" + message.toString());
// 拿到 key 再到 Redis 获取任务并处理
// 处理成功, 删除任务
// 处理失败, 重新将 key 加入到过期队列中待下次处理
try {
// 释放锁,这里睡5秒后解锁,防止程序太快,导致1服务释放锁后,2服务才开始获取锁
Thread.sleep(5000);
unlock(lockKey);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 加锁
*
* @param lockKey
* @return
*/
private Boolean lock(String lockKey) {
return redisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 10, TimeUnit.SECONDS);
}
/**
* 释放锁
*
* @param lockKey
*/
private void unlock(String lockKey) {
redisTemplate.delete(lockKey);
}
}
开启两个实现监听:
实例 1 拿到锁,处理延时任务,示例代码输出如下:
true
延时任务 Key:DelayOrder:Task1
实例 2 没拿到锁,直接返回,示例代码输出如下:
false
相关参考
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
注意:本文归作者所有,未经作者允许,不得转载