Redis StreamListener 订阅失败
项目中使用了 Redis 的 StreamListener
来实现类似 RabiitMQ 的 Fanout Exchange 的效果,但是随着订阅的增多,发现最后一个创建的订阅没有起作用。
代码运行没有任何报错,在 Redis 的 stream 对象的 groups 上也可以看到对应的 consumer 信息,但是即使有消息进入 stream,也不会触发消费。
既然是后面订阅没起作用,那就是跟订阅的数量有关(第 9 个订阅会失效),首先就想到是不是 Redis 线程池或者连接池的问题。一般线程池的数量会设置为 CPU 核数的两倍,原本设置的是 16,但是改为更大的值之后也没有解决问题,其它几个 Redis 连接池相关的配置也都改了一遍,结果还是无效。
最后想到在配置 StreamMessageListenerContainer
时也使用了一个线程池,当时直接使用了一个项目已经在用的 ThreadPoolTaskExecutor
,其配置大概如下:
private final int core = 8;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(core);
executor.setMaxPoolSize(core * 10);
executor.setQueueCapacity(2048);
executor.setKeepAliveSeconds(300);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
本以为这里虽然核心线程数设置的比较小,但是最大线程数是核心的 10 倍,应该不会有问题,但后来调整了这些参数后,发现确实是这里的配置导致的。反思后发现这个问题出现的根本原因在于自己对 ThreadPoolTaskExecutor
线程池的理解不足。
线程池中处理新任务的顺序如下:
- 如果当前运行的线程数少于
corePoolSize
,则会创建新线程执行任务。 - 如果当前线程数已达到
corePoolSize
,任务将会被放入工作队列中等待。
上面线程池中使用的工作队列是一个 FIFO 的阻塞队列LinkedBlockingQueue
,容量为 2048。 - 如果工作队列已满且线程数少于
maximumPoolSize
,则会创建新线程。 - 如果工作队列已满且线程数已达到
maximumPoolSize
,则会根据配置的拒绝拒绝来处理。
上面线程池中使用的是ThreadPoolExecutor.CallerRunsPolicy
策略,该策略会直接在当前线程中执行任务,而不是创建新线程。
到这里需要先看一下 StreamMessageListenerContainer
的 start()
和 stop()
方法:
public void start() {
synchronized (lifecycleMonitor) {
if (this.running) { return; }
subscriptions.stream() //
.filter(it -> !it.isActive()) //
.filter(it -> it instanceof TaskSubscription) //
.map(TaskSubscription.class::cast) //
.map(TaskSubscription::getTask) //
.forEach(taskExecutor::execute);
running = true;
}
}
public void stop() {
synchronized (lifecycleMonitor) {
if (this.running) {
subscriptions.forEach(Cancelable::cancel);
running = false;
}
}
}
可以发现,在 start()
方法中,会将所有订阅的 subscriptions
提交到线程池执行,而由于订阅任务会持续运行,并不会释放对线程的占用,一直持续到应用 shutdown 时才会执行 stop()
方法。
综上,代码在执行到第 9 个订阅时,由于当前线程数已达到 corePoolSize
(8),任务将被放入工作队列中等待。之后除非有新的任务提交到这个线程池,且达到 queueCapacity
容量上限(2048),才会创建新的线程,否则会一直阻塞在工作队列中。很不幸,这次遇到的就是这种场景,以至于后面的订阅任务永远不会被执行。
这里的解决方法是使用 Executors.newCachedThreadPool()
作为 listener 容器的执行器。这是一个核心线程数为 0,最大线程数为 Integer.MAX_VALUE
,且工作队列是 SynchronousQueue
的线程池。这样可以保证每个监听器都会有单独的线程执行,不会互相阻塞。
StreamMessageListenerContainerOptionsBuilder
中指定了一个默认的 Exeuctor
: new SimpleAsyncTaskExecutor()
,这个执行器虽然没有线程池管理的功能,但是也可以满足大部分场景的要求。如果没有在代码中动态的添加或移除订阅,那么这两种方式基本上没啥区别。
实际的配置示例:
/**
* 创建 StreamMessageListenerContainerOptions 对象
*/
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options() {
return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(1)
// 执行消息轮询的执行器
.executor(Executors.newCachedThreadPool())
// pollTimeout 参数定义了消费者从消息代理(如 Kafka、RabbitMQ 等)拉取消息时,如果队列中没有消息可供消费,消费者会等待多长时间再尝试拉取。这个时间通常以毫秒为单位。
// 定义为 Duration.ZERO 时可能会导致 CPU 使用率非常高
.pollTimeout(Duration.ofMillis(50))
// 序列化器
.serializer(new StringRedisSerializer())
.build();
}