使用 RedissonDelayedQueue 实现延时队列功能
RedissonDelayedQueue
是一个基于 Redis 实现的延时队列功能。
简单来说就是 RedissonDelayedQueue
会使一个 ZSet
来存储消息的延时时间,元素的 score
是对应的延时执行时间的时间戳,Redisson 使用轮询的方式执行 ZRANGEBYSCORE
来获取已经到了延迟时间的消息,之后将获取到的消息转发到一个目标队列(List
类型),最后消费服务通过轮询的方式来获取目标队列中的消息并进行消费。
具体的实现细节可以查看网上的文章或者 RedissonDelayedQueue
类的 源码。
以上流程组成了一个轻量的延迟消费队列功能,不过与专门的队列中间件相比,缺点还是比较明显的。
- 虽然看代码像是使用的发布订阅模式,但实际上还是轮询加拉取的方式,性能上要差一些;
- 消费时没有根据消费实例进行分发和 ACK 的功能,无法保证数据一定被正确消费了;
相对的,使用起来也算比较简单,成本也比较低(不需要添加额外的 MQ 中间件,只需要引入 Redisson 依赖即可)。不过为了弥补可能出现的未消费问题,最好还是要添加一些额外的兜底措施。
以下是本次实现的代码示例:
1. 定义延时消费队列
定义延时队列的主要代码是 redissonClient.getDelayedQueue(redissonClient.getBlockingDeque(KEY))
,参数中的 BlockingDeque
就是目标队列。
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@RequiredArgsConstructor
public class AutoCancelOrderQueue {
/**
* 消息 Key
*/
private static final String KEY = "auto-cancel-order";
private final RedissonClient redissonClient;
@PostConstruct
public void init() {
delayedQueue();
}
private RDelayedQueue<IdMessage> delayedQueue() {
return redissonClient.getDelayedQueue(destinationQueue());
}
private RBlockingDeque<IdMessage> destinationQueue() {
return redissonClient.getBlockingDeque(KEY);
}
public void add(IdMessage message) {
add(message, 30, TimeUnit.MINUTES);
}
public void add(IdMessage message, int delay, TimeUnit timeUnit) {
delayedQueue().offer(message, delay, timeUnit);
}
public IdMessage poll() {
return destinationQueue().poll();
}
}
需要注意的是 init()
方法的 @PostConstruct
注解。这个是为了防止项目启动后,如果一直没有新增过这个延时消息(即,没有调用过 redissonClient.getDelayedQueue()
方法),会导致当前延时队列的 RedissonDelayedQueue
轮询功能没有被启动,所以需要在服务启动后初始化一下这个延时队列。
这里消费时使用的是目标队列的 poll()
方法,这种方式是非阻塞的(空队列时不会阻塞,而是直接返回 null
),也可以选择使用阻塞的 take()
方法。
Redis 的 List
数据类型还支持 peek()
操作,在单个消费者时,可以使用该方式获取消息,在消费结束后再移除消息。这种方式消息未消费的几率会低一些,但毕竟是基于缓存的方案,仍然不能保证 100%。如果可能有多个消费者,则不太推荐使用这种方式。
2. 消费消息
这里使用 @Scheduled
配合 while (true)
的循环方式来消费消息,队列为空时退出循环并 5 秒后重试。
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@RequiredArgsConstructor
public class AutoCancelOrderConsumer {
public static final int MAX_RETRY_TIMES = 5;
private final AutoCancelOrderQueue autoCancelOrderQueue;
private final OrderBizService orderBizService;
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.SECONDS)
public void consumeMessages() {
while (true) {
var message = autoCancelOrderQueue.poll();
if (message == null) return;
processMessage(message);
}
}
private void processMessage(IdMessage message) {
log.info("AutoCancelOrderConsumer message: {}", message);
try {
orderBizService.cancelOrder(message.getId());
} catch (Exception e) {
if (message.getTimes() <= MAX_RETRY_TIMES) {
message.setTimes(message.getTimes() + 1);
autoCancelOrderQueue.add(message, message.getTimes(), TimeUnit.MINUTES);
}
}
}
}
这里在 catch
中加了发生异常时的重试处理(使用新的延时时间重新加入到延迟队列),并且为了防止死循环的发生,设置了最大重试次数(使用 times
字段标记当前重试次数)。
注意
- 使用
@Scheduled
注解时需要在服务中通过@EnableScheduling
注解启用定时任务功能。 - 这里的重试消息是不保险的,并不能保证消息一定能被重新发送回延时队列。所以需要额外添加一些兜底措施,比如通过额外的定时任务直接检查数据库中是否有未取消的过期订单。