Skip to content
标签
欢迎扫码关注公众号

Redis StreamListener 订阅失败

项目中使用了 Redis 的 StreamListener 来实现类似 RabiitMQ 的 Fanout Exchange 的效果,但是随着订阅的增多,发现最后一个创建的订阅没有起作用。

代码运行没有任何报错,在 Redis 的 stream 对象的 groups 上也可以看到对应的 consumer 信息,但是即使有消息进入 stream,也不会触发消费。

既然是后面订阅没起作用,那就是跟订阅的数量有关(第 9 个订阅会失效),首先就想到是不是 Redis 线程池或者连接池的问题。一般线程池的数量会设置为 CPU 核数的两倍,原本设置的是 16,但是改为更大的值之后也没有解决问题,其它几个 Redis 连接池相关的配置也都改了一遍,结果还是无效。

最后想到在配置 StreamMessageListenerContainer 时也使用了一个线程池,当时直接使用了一个项目已经在用的 ThreadPoolTaskExecutor,其配置大概如下:

java
private final int core = 8;

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(core);
executor.setMaxPoolSize(core * 10);
executor.setQueueCapacity(2048);
executor.setKeepAliveSeconds(300);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;

本以为这里虽然核心线程数设置的比较小,但是最大线程数是核心的 10 倍,应该不会有问题,但后来调整了这些参数后,发现确实是这里的配置导致的。反思后发现这个问题出现的根本原因在于自己对 ThreadPoolTaskExecutor 线程池的理解不足。

线程池中处理新任务的顺序如下:

  1. 如果当前运行的线程数少于 corePoolSize,则会创建新线程执行任务。
  2. 如果当前线程数已达到 corePoolSize,任务将会被放入工作队列中等待。
    上面线程池中使用的工作队列是一个 FIFO 的阻塞队列 LinkedBlockingQueue,容量为 2048。
  3. 如果工作队列已满且线程数少于 maximumPoolSize,则会创建新线程。
  4. 如果工作队列已满且线程数已达到 maximumPoolSize,则会根据配置的拒绝拒绝来处理。
    上面线程池中使用的是 ThreadPoolExecutor.CallerRunsPolicy 策略,该策略会直接在当前线程中执行任务,而不是创建新线程。

到这里需要先看一下 StreamMessageListenerContainerstart()stop() 方法:

java
public void start() {
    synchronized (lifecycleMonitor) {
        if (this.running) { return; }
        subscriptions.stream() //
                .filter(it -> !it.isActive()) //
                .filter(it -> it instanceof TaskSubscription) //
                .map(TaskSubscription.class::cast) //
                .map(TaskSubscription::getTask) //
                .forEach(taskExecutor::execute);
        running = true;
    }
}
public void stop() {
    synchronized (lifecycleMonitor) {
        if (this.running) {
            subscriptions.forEach(Cancelable::cancel);
            running = false;
        }
    }
}

可以发现,在 start() 方法中,会将所有订阅的 subscriptions 提交到线程池执行,而由于订阅任务会持续运行,并不会释放对线程的占用,一直持续到应用 shutdown 时才会执行 stop() 方法。

综上,代码在执行到第 9 个订阅时,由于当前线程数已达到 corePoolSize(8),任务将被放入工作队列中等待。之后除非有新的任务提交到这个线程池,且达到 queueCapacity 容量上限(2048),才会创建新的线程,否则会一直阻塞在工作队列中。很不幸,这次遇到的就是这种场景,以至于后面的订阅任务永远不会被执行。

这里的解决方法是使用 Executors.newCachedThreadPool() 作为 listener 容器的执行器。这是一个核心线程数为 0,最大线程数为 Integer.MAX_VALUE,且工作队列是 SynchronousQueue 的线程池。这样可以保证每个监听器都会有单独的线程执行,不会互相阻塞。

StreamMessageListenerContainerOptionsBuilder 中指定了一个默认的 Exeuctor : new SimpleAsyncTaskExecutor(),这个执行器虽然没有线程池管理的功能,但是也可以满足大部分场景的要求。如果没有在代码中动态的添加或移除订阅,那么这两种方式基本上没啥区别。

实际的配置示例:

java
/**
 * 创建 StreamMessageListenerContainerOptions 对象
 */
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options() {
    return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
        .builder()
        // 一次性最多拉取多少条消息
        .batchSize(1)
        // 执行消息轮询的执行器
        .executor(Executors.newCachedThreadPool())
        // pollTimeout 参数定义了消费者从消息代理(如 Kafka、RabbitMQ 等)拉取消息时,如果队列中没有消息可供消费,消费者会等待多长时间再尝试拉取。这个时间通常以毫秒为单位。
        // 定义为 Duration.ZERO 时可能会导致 CPU 使用率非常高
        .pollTimeout(Duration.ofMillis(50))
        // 序列化器
        .serializer(new StringRedisSerializer())
        .build();
}

Page Layout Max Width

Adjust the exact value of the page width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the page layout
A ranged slider for user to choose and customize their desired width of the maximum width of the page layout can go.

Content Layout Max Width

Adjust the exact value of the document content width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the content layout
A ranged slider for user to choose and customize their desired width of the maximum width of the content layout can go.