Skip to content

RabbitMQ 延时消息

🏷️ Spring Boot RabbitMQ

使用 rabbitmq-delayed-message-exchange 插件来实现延时消息功能。

安装延时消息插件:rabbitmq-delayed-message-exchange

官方插件现在只支持 3.8.x. 及以上的版本,不支持我现在使用的 3.5.7 版本,所以只能根据网上找到的文章中提供的插件下载地址。

具体步骤如下(摘自这篇博客):

  1. 查找 rabbitmq 的安装目录

    bash
    whereis rabbitmq
  2. 定位到 plugins 目录

    bash
    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/plugins
  3. 下载插件包

    bash
    wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  4. 重命名文件

    下载的文件名前面多了一段文字,使用 mv 命令重命名文件。

    bash
    mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez
  5. 启用插件

    bash
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    使用如下命令禁用插件:

    bash
    rabbitmq-plugins disable rabbitmq_delayed_message_exchange

生产者【Spring】

使用 RabbitTemplate 来实现延时消息的发送。通过添加 x-delay Header 来指定延迟的时间。

java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;

import java.nio.charset.StandardCharsets;

@RestController
@RequestMapping("/mq/kf")
@RequiredArgsConstructor
public class KefuController {

    private final RabbitTemplate rabbitTemplate;

    @PostMapping
    public void add(@RequestBody LazyMessage message, @RequestParam(value = "delayedSeconds", defaultValue = "1") int delayedSeconds) {
        if (delayedSeconds < 0) delayedSeconds = 0;

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayedSeconds * 1000);

        rabbitTemplate.send(
                "delayed-kefu-message",
                "kefu-message",
                new Message(JSONUtil.toJsonStr(message).getBytes(StandardCharsets.UTF_8), messageProperties)
        );
    }

}

消费者【Spring】

Listener

使用 CustomExchange 来创建延时消息 Exchange

注意

需要添加 x-delayed-type 参数,否则会报错。

另外一点需要注意的是,创建 Exchangebinding 时,需要指定 x-delay: * 通配参数。

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
 * 延迟消息 监听器
 */
@Slf4j
@Component("listener-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageListener {
    final static String QUEUE_NAME = "kefu-message";

    @Bean(name = "queue-" + QUEUE_NAME)
    Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean(name = "exchange-" + QUEUE_NAME)
    CustomExchange exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
    }

    @Bean(name = "binding-" + QUEUE_NAME)
    Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delay", "*");
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
    }

    @Bean(name = "container-" + QUEUE_NAME)
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             @Qualifier("listener-adapter-" + QUEUE_NAME) MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setConcurrentConsumers(5);
        container.setPrefetchCount(200);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean(name = "listener-adapter-" + QUEUE_NAME)
    MessageListenerAdapter listenerAdapter(LazyMessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

Receiver

这只是个消费的示例。

java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.api.WxConsts;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

/**
 * 延迟消息 接收器
 */
@Slf4j
@Component("receiver-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageReceiver {

    private final WxMpService wxService;

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(byte[] message) {
        receiveMessage(new String(message));
    }

    public void receiveMessage(String json) {
        try {
            log.info("接收消息({})", json);
            LazyMessage message = JSONUtil.toBean(json, LazyMessage.class);
            if (message == null) return;

            WxMpKefuMessage kefuMessage = new WxMpKefuMessage();
            kefuMessage.setToUser(message.getOpenId());
            kefuMessage.setMsgType(WxConsts.KefuMsgType.TEXT);
            kefuMessage.setTitle(message.getTitle());
            kefuMessage.setContent(message.getContent());

            String response = wxService
                    .switchoverTo(message.getAppid())
                    .getKefuService()
                    .sendKefuMessageWithResponse(kefuMessage);
            log.info("response : {}", response);
        } catch (Exception e) {
            log.error(String.format("系统发生异常(%s)(%s)", e.getMessage(), json), e);
        } finally {
            latch.countDown();
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

附 1. 'x-delayed-type' must be an existing exchange type

创建 exchange 时报了如下错误:

'x-delayed-type' must be an existing exchange type

这个是由于缺少了 x-delayed-type 导致的,需要在 Arguments 中添加 x-delayed-type = direct

通过 Java 创建 exchange

java
@Bean(name = "exchange-" + QUEUE_NAME)
CustomExchange exchange() {
    HashMap<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
}

通过 Java 创建 binding

java
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
    HashMap<String, Object> args = new HashMap<>();
    args.put("x-delay", "*");
    return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
}

注意: bingding 需要添加 x-delay 参数,否则消息不会转发到队列。

参考

  1. 实现 RabbitMQ 延时消息
  2. Delayed Message 插件实现 RabbitMQ 延迟队列
  3. rabbitmq / rabbitmq-delayed-message-exchange