.NET Core RabbitMQ 消费者
大致的方案是在配置文件中配置消息队列及其消费的处理方法,然后创建订阅并通过反射执行配置的处理方法来消费。
connection(连接)整个应用程序共用一个,channel 则是每个队列共同一个 channel 则是每个消费者订阅单独使用一个。
下面是示例代码,仅供参考。
示例代码
新建 .NET Core 控制台工程 RabbitMQConsumers,目标框架为 .NET Core 2.1。
安装所需程序包
powershellInstall-Package RabbitMQ.Client -Version 5.1.0 Install-Package Newtonsoft.Json -Version 12.0.2 Install-Package Microsoft.AspNetCore.All -Version 2.1.5
创建配置用的类 MQSetting.cs(MQ 设置)、ConsumerSetting.cs(消费设置)和 QueueSetting.cs(队列设置)。
MQSetting.cs
csharp/// <summary> /// MQ 设置 /// </summary> class MQSetting { /// <summary> /// MQ 服务器地址 /// </summary> public string HostName { get; set; } /// <summary> /// 用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 密码 /// </summary> public string Password { get; set; } }
ConsumerSetting.cs
csharp/// <summary> /// 消费设置 /// </summary> class ConsumerSetting { /// <summary> /// 程序集名称 /// </summary> public string AssemblyName { get; set; } /// <summary> /// 类名 /// </summary> public string ClassName { get; set; } /// <summary> /// 方法名 /// </summary> public string MethodName { get; set; } /// <summary> /// 是否自动应答 /// </summary> public bool AutoAck { get; set; } = true; /// <summary> /// 是否自动应答为 false 时,若发生异常,是否重新回队列。 /// true:重回队列;false:直接丢弃; /// </summary> public bool Requeue { get; set; } = false; /// <summary> /// 消费者数量 /// </summary> public int ConsumerCount { get; set; } = 1; /// <summary> /// 队列设置 /// </summary> public QueueSetting QueueSetting { get; set; } }
QueueSetting.cs
csharp/// <summary> /// 队列设置 /// </summary> class QueueSetting { /// <summary> /// 队列名称 /// </summary> public string QueueName { get; set; } /// <summary> /// 持久化队列 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 独占队列 /// </summary> public bool Exclusive { get; set; } = false; /// <summary> /// 自动删除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 参数 /// </summary> public IDictionary<string, object> Arguments = null; }
创建消费启动接口及类 IRabbitMQConsumerApplication.cs RabbitMQConsumerApplication.cs
IRabbitMQConsumerApplication.cs
csharpinterface IRabbitMQConsumerApplication { void Start(); }
RabbitMQConsumerApplication.cs
csharp/// <summary> /// RabbitMQ 消费 App /// </summary> class RabbitMQConsumerApplication : IRabbitMQConsumerApplication { /// <summary> /// MQ 设置 /// </summary> MQSetting _mqSetting; /// <summary> /// MQ 消费者设置 /// </summary> List<ConsumerSetting> _consumerSettings; ILoggerFactory _loggerFactory; /// <summary> /// Logger /// </summary> ILogger _logger; /// <summary> /// 构造函数 /// </summary> public RabbitMQConsumerApplication(IOptions<MQSetting> mqSetting, IOptions<List<ConsumerSetting>> consumerSettings, ILoggerFactory loggerFactory) { _mqSetting = mqSetting.Value; _consumerSettings = consumerSettings.Value; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(nameof(RabbitMQConsumerApplication)); } /// <summary> /// 启动 RabbitMQ 消费程序 /// </summary> public void Start() { var factory = new ConnectionFactory() { HostName = _mqSetting.HostName, UserName = _mqSetting.UserName, Password = _mqSetting.Password }; using (var connection = factory.CreateConnection()) { foreach (var consumerSetting in _consumerSettings) { for (int i = 0; i < consumerSetting.ConsumerCount; i++) { Task.Run(() => { using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: consumerSetting.QueueSetting.QueueName, durable: consumerSetting.QueueSetting.Durable, exclusive: consumerSetting.QueueSetting.Exclusive, autoDelete: consumerSetting.QueueSetting.AutoDelete, arguments: consumerSetting.QueueSetting.Arguments); _logger.LogInformation($"StartConsumer : {JsonConvert.SerializeObject(consumerSetting)}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (currentConsumer, ea) => { try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); _logger.LogInformation($"mission start ( message: {message} )"); var assembly = Assembly.Load(consumerSetting.AssemblyName); var type = assembly.GetType(consumerSetting.ClassName); var method = type.GetMethod(consumerSetting.MethodName); method.Invoke(Activator.CreateInstance(type, _loggerFactory), new object[] { message }); _logger.LogInformation($"mission success"); if (!consumerSetting.AutoAck) { channel.BasicAck(ea.DeliveryTag, false); } } catch (Exception ex) { _logger.LogError(ex, "mission fail"); if (!consumerSetting.AutoAck) { channel.BasicReject(ea.DeliveryTag, consumerSetting.Requeue); } } }; channel.BasicConsume(queue: consumerSetting.QueueSetting.QueueName, autoAck: consumerSetting.AutoAck, consumer: consumer); _logger.LogInformation($"StartConsumer Successed"); Thread.Sleep(Timeout.Infinite); } }); } } Thread.Sleep(Timeout.Infinite); } } }
创建具体的消费类 HelloConsumer.cs
csharp/// <summary> /// Hello 消费者 /// </summary> class HelloConsumer { /// <summary> /// Logger /// </summary> ILogger _logger; public HelloConsumer(ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger(nameof(HelloConsumer)); } /// <summary> /// 处理消息 /// </summary> /// <param name="message"></param> public void HandleMessage(string message) { Console.WriteLine(" [x] {0}", message); _logger.LogInformation(message); } }
在配置文件中配置 MQ、队列及消费 appsettings.json
json{ "Logging": { "LogLevel": { "Default": "Debug" } }, "MqSetting": { "HostName": "192.168.0.69", "UserName": "octopus", "Password": "octopus" }, "MqConsumerSetting": [ { "AssemblyName": "RabbitMQConsumers", "ClassName": "RabbitMQConsumers.HelloConsumer", "MethodName": "HandleMessage", "AutoAck": false, "Requeue": false, "ConsumerCount": 2, "QueueSetting": { "Arguments": null, "QueueName": "QUEUE_HELLO", "Durable": true, "Exclusive": false, "AutoDelete": false } } ] }
在 Program.cs 中读取配置并启动消费
csharpclass Program { static void Main(string[] args) { IServiceCollection services = new ServiceCollection(); // 配置 var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build(); services.Configure<MQSetting>(configuration.GetSection("MqSetting")); services.Configure<List<ConsumerSetting>>(configuration.GetSection("MqConsumerSetting")); services.AddLogging((builder) => builder .AddConfiguration(configuration.GetSection("Logging")) .AddConsole()); // 注入 services.AddSingleton<IRabbitMQConsumerApplication, RabbitMQConsumerApplication>(); // 构建容器 IServiceProvider serviceProvider = services.BuildServiceProvider(); // 启动 MQ 消费 Task.Run(() => { serviceProvider.GetService<IRabbitMQConsumerApplication>().Start(); }); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.Exit(0); } }
参考文档
- .NET/C# Client API Guide
- RabbitMQ 入门教程 (十二):消息确认 Ack
- .NET Core 使用 RabbitMQ
- 基于 RabbitMQ.Client 组件实现 RabbitMQ 可复用的 ConnectionPool(连接池)
- LogLevel Enum
- .Net Core 中的日志组件 (Logging)
- 在.NET Core 控制台程序中使用依赖注入
- .net core 读取 json 格式的配置文件
- ASP.NET Core 源码学习之 Logging 3:Logger
2019/06/27 追记
之前关于 RabbitMQConsumerApplication 中 ConsumerSetting.ConsumerCount 的使用有误,应该每个消费者在单独的线程中。(上面的代码已更新)
另外关于 EventingBasicConsumer
通过订阅消费队列的过程,跟我想象的有些出入。
本以为是 RabbitMQ 会将一个消息推送给订阅者,待其消费结束后再推送下一个;但实际上其过程是 RabbitMQ 会将当前队列中 noack 的所有消息通过某种方式(貌似不是轮询,而是将所有任务按照当前订阅者的数量分段)分配给当前的订阅者。
这就导致当 AutoAck 设置为 true
时可能会发生如下的情况:当队列中的消息已经推送到了消费者,但消费者在未处理结束时发生异常导致进程结束了(比如人为关闭、系统崩溃、停电等),此时已经推送到消费者的消费并不会重新回到队列(其在推送到消费者时就被从队列中删除了)。所以在将消费者配置为 AutoAck (自动应答)时需谨慎些。
虽然每个消费者同时可能接收到了很多消息,但是同一个消费者同一时间貌似只能处理一个,其他的任务处于等待执行的状态。在消费处理中增加 Thread.Sleep
可以很明显的看到这个效果。