SpringBoot 批量消费 RabbitMQ 消息
尝试使用 BatchMessagingMessageListenerAdapter
来实现批量消费时报了如下错误:
org.springframework.amqp.UncategorizedAmqpException: java.lang.NullPointerException
调试发现是执行到 MessagingMessageListenerAdapter.invokeHandler
时,this.handlerAdapter 值为空导致的。
参考 这篇文章 中的写法,对 handlerAdapter 赋值后可以正常的实现批量消费(没有出现文章中记录的问题,估计已经修复了)。
示例代码如下:
CustomBatchMessagingMessageListenerAdapter.java
java
import org.springframework.amqp.rabbit.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter;
import org.springframework.beans.BeanUtils;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import java.lang.reflect.Method;
public class CustomBatchMessagingMessageListenerAdapter extends BatchMessagingMessageListenerAdapter {
private final static String defaultMethodName = "receiveMessage";
public CustomBatchMessagingMessageListenerAdapter(Object bean, Method method) {
super(bean, null, false, null, null);
HandlerAdapter adapter = createHandleAdapter(bean, method);
setHandlerAdapter(adapter);
}
public CustomBatchMessagingMessageListenerAdapter(Object bean) {
this(bean, BeanUtils.findDeclaredMethodWithMinimalParameters(bean.getClass(), defaultMethodName));
}
private HandlerAdapter createHandleAdapter(Object bean, Method method) {
MessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
InvocableHandlerMethod invokerHandlerMethod = messageHandlerMethodFactory.createInvocableHandlerMethod(bean, method);
HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver());
resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver());
resolvers.addResolver(new org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver(new GenericMessageConverter()));
invokerHandlerMethod.setMessageMethodArgumentResolvers(resolvers);
HandlerAdapter handler = new HandlerAdapter(invokerHandlerMethod);
return handler;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
BatchTestListenter.java
重点在创建 SimpleMessageListenerContainer Bean 时,需要启用 ConsumerBatchEnabled ,并且设置 BatchSize 。
当 PrefetchCount 设置的比 BatchSize 小时,实际的 PrefetchCount 以 BatchSize 为准。
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Slf4j
@Component("listener-" + BatchTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class BatchTestListenter {
final static String QUEUE_NAME = "batch_test_queue";
@Bean(name = "queue-" + QUEUE_NAME)
Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean(name = "exchange-" + QUEUE_NAME)
TopicExchange exchange() {
return new TopicExchange("");
}
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean(name = "container-" + QUEUE_NAME)
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("listener-adapter-" + QUEUE_NAME) BatchMessagingMessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setConcurrentConsumers(1);
container.setPrefetchCount(100);
container.setConsumerBatchEnabled(true);
container.setBatchSize(50);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean(name = "listener-adapter-" + QUEUE_NAME)
CustomBatchMessagingMessageListenerAdapter listenerAdapter(BatchTestReceiver receiver) throws NoSuchMethodException {
return new CustomBatchMessagingMessageListenerAdapter(receiver);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
BatchTestReceiver.java
需要注意的点:
消费的参数类型为
List<String>
如果处理中发生了未捕捉的异常,会导致当前批处理中的所有消息重新添加回队列。这可能会导致死循环,建议最好添加 try/catch 处理。
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component("receiver-" + BatchTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class BatchTestReceiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(List<String> messages) {
try {
for (int i = 0; i < messages.size(); i++) {
String message = messages.get(i);
try {
log.info(message);
// 一旦一个消息发生异常,本次批处理的所有消息都会重新添加回队列
// 为避免出现死循环或者部分消息未被处理的情况,最好在每个循环中添加 try/catch 处理
if (i != 0 && i % 20 == 0) throw new Exception("发生了异常");
} catch (Exception e) {
log.error("系统发生异常({})({})", e.getMessage(), message);
}
}
} finally {
latch.countDown();
}
}
public CountDownLatch getLatch() {
return latch;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
附 1. 非批处理的消费示例代码
附上非批处理的示例代码,以作对比。
SingleTestListenter.java
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Slf4j
@Component("listener-" + SingleTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class SingleTestListenter {
final static String QUEUE_NAME = "single_test_queue";
@Bean(name = "queue-" + QUEUE_NAME)
Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean(name = "exchange-" + QUEUE_NAME)
TopicExchange exchange() {
return new TopicExchange("");
}
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean(name = "container-" + QUEUE_NAME)
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("listener-adapter-" + QUEUE_NAME) MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setConcurrentConsumers(1);
container.setPrefetchCount(100);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean(name = "listener-adapter-" + QUEUE_NAME)
MessageListenerAdapter listenerAdapter(SingleTestReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
SingleTestReceiver.java
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component("receiver-" + SingleTestListenter.QUEUE_NAME)
@RequiredArgsConstructor
public class SingleTestReceiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(byte[] message) {
String json = new String(message);
try {
if (StringUtils.isEmpty(json)) return;
log.info(json);
} catch (Exception e) {
log.error("系统发生异常({})({})", e.getMessage(), json);
} finally {
latch.countDown();
}
}
public CountDownLatch getLatch() {
return latch;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31