Skip to content

SpringBoot 批量消费 RabbitMQ 消息

🏷️ Spring Boot RabbitMQ

尝试使用 BatchMessagingMessageListenerAdapter 来实现批量消费时报了如下错误:

org.springframework.amqp.UncategorizedAmqpException: java.lang.NullPointerException

调试发现是执行到 MessagingMessageListenerAdapter.invokeHandler 时,this.handlerAdapter 值为空导致的。

参考 这篇文章 中的写法,对 handlerAdapter 赋值后可以正常的实现批量消费(没有出现文章中记录的问题,估计已经修复了)。

示例代码如下:

CustomBatchMessagingMessageListenerAdapter.java

java
import org.springframework.amqp.rabbit.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter;
import org.springframework.beans.BeanUtils;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

import java.lang.reflect.Method;

public class CustomBatchMessagingMessageListenerAdapter extends BatchMessagingMessageListenerAdapter {
    private final static String defaultMethodName = "receiveMessage";

    public CustomBatchMessagingMessageListenerAdapter(Object bean, Method method) {
        super(bean, null, false, null, null);
        HandlerAdapter adapter = createHandleAdapter(bean, method);
        setHandlerAdapter(adapter);
    }

    public CustomBatchMessagingMessageListenerAdapter(Object bean) {
        this(bean, BeanUtils.findDeclaredMethodWithMinimalParameters(bean.getClass(), defaultMethodName));
    }

    private HandlerAdapter createHandleAdapter(Object bean, Method method) {
        MessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        InvocableHandlerMethod invokerHandlerMethod = messageHandlerMethodFactory.createInvocableHandlerMethod(bean, method);
        HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
        resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver());
        resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver());
        resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver(new GenericMessageConverter()));
        invokerHandlerMethod.setMessageMethodArgumentResolvers(resolvers);
        HandlerAdapter handler = new HandlerAdapter(invokerHandlerMethod);
        return handler;
    }
}

BatchTestListenter.java

重点在创建 SimpleMessageListenerContainer Bean 时,需要启用 ConsumerBatchEnabled ,并且设置 BatchSize
PrefetchCount 设置的比 BatchSize 小时,实际的 PrefetchCountBatchSize 为准。

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Slf4j
@Component("listener-" + BatchTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class BatchTestListenter {
    final static String QUEUE_NAME = "batch_test_queue";

    @Bean(name = "queue-" + QUEUE_NAME)
    Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean(name = "exchange-" + QUEUE_NAME)
    TopicExchange exchange() {
        return new TopicExchange("");
    }

    @Bean(name = "binding-" + QUEUE_NAME)
    Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean(name = "container-" + QUEUE_NAME)
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             @Qualifier("listener-adapter-" + QUEUE_NAME) BatchMessagingMessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setConcurrentConsumers(1);
        container.setPrefetchCount(100);
        container.setConsumerBatchEnabled(true);
        container.setBatchSize(50);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean(name = "listener-adapter-" + QUEUE_NAME)
    CustomBatchMessagingMessageListenerAdapter listenerAdapter(BatchTestReceiver receiver) throws NoSuchMethodException {
        return new CustomBatchMessagingMessageListenerAdapter(receiver);
    }
}

BatchTestReceiver.java

需要注意的点:

  1. 消费的参数类型为 List<String>

  2. 如果处理中发生了未捕捉的异常,会导致当前批处理中的所有消息重新添加回队列。这可能会导致死循环,建议最好添加 try/catch 处理。

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Component("receiver-" + BatchTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class BatchTestReceiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(List<String> messages) {
        try {
            for (int i = 0; i < messages.size(); i++) {
                String message = messages.get(i);
                try {
                    log.info(message);
                    // 一旦一个消息发生异常,本次批处理的所有消息都会重新添加回队列
                    // 为避免出现死循环或者部分消息未被处理的情况,最好在每个循环中添加 try/catch 处理
                    if (i != 0 && i % 20 == 0) throw new Exception("发生了异常");
                } catch (Exception e) {
                    log.error("系统发生异常({})({})", e.getMessage(), message);
                }
            }
        } finally {
            latch.countDown();
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

附 1. 非批处理的消费示例代码

附上非批处理的示例代码,以作对比。

SingleTestListenter.java

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Slf4j
@Component("listener-" + SingleTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class SingleTestListenter {
    final static String QUEUE_NAME = "single_test_queue";

    @Bean(name = "queue-" + QUEUE_NAME)
    Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean(name = "exchange-" + QUEUE_NAME)
    TopicExchange exchange() {
        return new TopicExchange("");
    }

    @Bean(name = "binding-" + QUEUE_NAME)
    Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean(name = "container-" + QUEUE_NAME)
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             @Qualifier("listener-adapter-" + QUEUE_NAME) MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setConcurrentConsumers(1);
        container.setPrefetchCount(100);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean(name = "listener-adapter-" + QUEUE_NAME)
    MessageListenerAdapter listenerAdapter(SingleTestReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

SingleTestReceiver.java

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Slf4j
@Component("receiver-" + SingleTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class SingleTestReceiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(byte[] message) {
        String json = new String(message);
        try {
            if (StringUtils.isEmpty(json)) return;

            log.info(json);
        } catch (Exception e) {
            log.error("系统发生异常({})({})", e.getMessage(), json);
        } finally {
            latch.countDown();
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

参考

  1. Throws NullPointerException if I use BatchMessagingMessageListenerAdapter