使用 Springboot 开发 MQ 消费服务
🏷️ Spring Boot
开发环境:RabbitMQ,Springboot,IntelliJ IDEA,Maven
主要是参照官网的 Guide(Messaging with RabbitMQ),然后实现了监听多个消息队列的功能。
1. pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>gs-messaging-rabbitmq</artifactId>
<version>0.1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. resources/application.properties
ini
spring.rabbitmq.host=192.168.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password
3. src/main/java/hello/Receiver.java
java
package hello;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
4. src/main/java/hello/Application.java
java
package hello;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class Application {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
}
5. src/main/java/hello/Runner.java
测试用代码,用于在程序启动时发送一条消息到消息队列。
java
package hello;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private final ConfigurableApplicationContext context;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
ConfigurableApplicationContext context) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
this.context = context;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(Application.queueName, "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
context.close();
}
}
6. 执行结果
txt
Sending message...
> Received <Hello from RabbitMQ!>
7. 修改后 Linstener
使用@Bean
的name
属性给Bean
定义别名,使用@Qualifier
标注需要使用哪个Bean
。
java
package me.liujiajia.mqservices.line;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* Created by liujiajia on 2017/2/24.
*/
@Component("Listener-hello")
public class Listener {
final static String queueName = "hello";
@Bean(name = "queue-hello")
Queue queue() {
return new Queue(queueName, true);
}
@Bean(name = "exchange-hello")
TopicExchange exchange() {
return new TopicExchange("");
}
@Bean(name = "binding-hello")
Binding binding(@Qualifier("queue-hello") Queue queue, @Qualifier("exchange-hello") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean(name = "container-hello")
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
@Qualifier("listenerAdapter-hello") MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean(name = "listenerAdapter-hello")
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
8. 修改后的 Receiver
java
package me.liujiajia.mqservices.hello;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component("Receiver-hello")
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(byte[] body) {
String message = new String(body);
System.out.println("Received from hello <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
9. Application 提到了外层
启动程序不能直接放在 src.main.java
目录下,否则会报错。建一个包放在里面就可以了。
@ImportResource
的用法可以参照【Spring】使用@ImportResource 导入 XML 配置文件
java
package me.liujiajia.mqservices;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
/**
* Created by liujiajia on 2017/2/24.
*/
@SpringBootApplication
@ImportResource("file:///me/mqservices/applicationContext_ES.xml")
public class Application {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
}