RabbitMQ 延时消息
使用 rabbitmq-delayed-message-exchange 插件来实现延时消息功能。
安装延时消息插件:rabbitmq-delayed-message-exchange
官方插件现在只支持 3.8.x. 及以上的版本,不支持我现在使用的 3.5.7 版本,所以只能根据网上找到的文章中提供的插件下载地址。
具体步骤如下(摘自这篇博客):
查找 rabbitmq 的安装目录
bashwhereis rabbitmq
1定位到 plugins 目录
bashcd /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/plugins
1下载插件包
bashwget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
1重命名文件
下载的文件名前面多了一段文字,使用 mv 命令重命名文件。
bashmv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez
1启用插件
bashrabbitmq-plugins enable rabbitmq_delayed_message_exchange
1使用如下命令禁用插件:
bashrabbitmq-plugins disable rabbitmq_delayed_message_exchange
1
生产者【Spring】
使用 RabbitTemplate
来实现延时消息的发送。通过添加 x-delay Header 来指定延迟的时间。
java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/mq/kf")
@RequiredArgsConstructor
public class KefuController {
private final RabbitTemplate rabbitTemplate;
@PostMapping
public void add(@RequestBody LazyMessage message, @RequestParam(value = "delayedSeconds", defaultValue = "1") int delayedSeconds) {
if (delayedSeconds < 0) delayedSeconds = 0;
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", delayedSeconds * 1000);
rabbitTemplate.send(
"delayed-kefu-message",
"kefu-message",
new Message(JSONUtil.toJsonStr(message).getBytes(StandardCharsets.UTF_8), messageProperties)
);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
消费者【Spring】
Listener
使用 CustomExchange
来创建延时消息 Exchange 。
注意
需要添加 x-delayed-type 参数,否则会报错。
另外一点需要注意的是,创建 Exchange 的 binding 时,需要指定 x-delay: *
通配参数。
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
* 延迟消息 监听器
*/
@Slf4j
@Component("listener-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageListener {
final static String QUEUE_NAME = "kefu-message";
@Bean(name = "queue-" + QUEUE_NAME)
Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean(name = "exchange-" + QUEUE_NAME)
CustomExchange exchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
}
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delay", "*");
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
}
@Bean(name = "container-" + QUEUE_NAME)
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("listener-adapter-" + QUEUE_NAME) MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setConcurrentConsumers(5);
container.setPrefetchCount(200);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean(name = "listener-adapter-" + QUEUE_NAME)
MessageListenerAdapter listenerAdapter(LazyMessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Receiver
这只是个消费的示例。
java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.api.WxConsts;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
/**
* 延迟消息 接收器
*/
@Slf4j
@Component("receiver-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageReceiver {
private final WxMpService wxService;
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(byte[] message) {
receiveMessage(new String(message));
}
public void receiveMessage(String json) {
try {
log.info("接收消息({})", json);
LazyMessage message = JSONUtil.toBean(json, LazyMessage.class);
if (message == null) return;
WxMpKefuMessage kefuMessage = new WxMpKefuMessage();
kefuMessage.setToUser(message.getOpenId());
kefuMessage.setMsgType(WxConsts.KefuMsgType.TEXT);
kefuMessage.setTitle(message.getTitle());
kefuMessage.setContent(message.getContent());
String response = wxService
.switchoverTo(message.getAppid())
.getKefuService()
.sendKefuMessageWithResponse(kefuMessage);
log.info("response : {}", response);
} catch (Exception e) {
log.error(String.format("系统发生异常(%s)(%s)", e.getMessage(), json), e);
} finally {
latch.countDown();
}
}
public CountDownLatch getLatch() {
return latch;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
附 1. 'x-delayed-type' must be an existing exchange type
创建 exchange 时报了如下错误:
'x-delayed-type' must be an existing exchange type
这个是由于缺少了 x-delayed-type 导致的,需要在 Arguments 中添加 x-delayed-type = direct 。
通过 Java 创建 exchange :
java
@Bean(name = "exchange-" + QUEUE_NAME)
CustomExchange exchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
}
1
2
3
4
5
6
2
3
4
5
6
通过 Java 创建 binding :
java
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delay", "*");
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
}
1
2
3
4
5
6
2
3
4
5
6
注意: bingding 需要添加 x-delay 参数,否则消息不会转发到队列。