@@ -0,0 +1,12 @@
|
||||
namespace TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// 消息发布抽象。
|
||||
/// </summary>
|
||||
public interface IMessagePublisher
|
||||
{
|
||||
/// <summary>
|
||||
/// 发布消息到指定路由键。
|
||||
/// </summary>
|
||||
Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
namespace TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// 消息订阅抽象。
|
||||
/// </summary>
|
||||
public interface IMessageSubscriber : IAsyncDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// 订阅指定队列与路由键,处理后返回是否消费成功。
|
||||
/// </summary>
|
||||
Task SubscribeAsync<T>(string queue, string routingKey, Func<T, CancellationToken, Task<bool>> handler, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
using TakeoutSaaS.Module.Messaging.Options;
|
||||
using TakeoutSaaS.Module.Messaging.Serialization;
|
||||
using TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Extensions;
|
||||
|
||||
/// <summary>
|
||||
/// 消息队列模块注册扩展。
|
||||
/// </summary>
|
||||
public static class MessagingServiceCollectionExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 注册 RabbitMQ 发布/订阅能力。
|
||||
/// </summary>
|
||||
public static IServiceCollection AddMessagingModule(this IServiceCollection services, IConfiguration configuration)
|
||||
{
|
||||
var rabbitMqSection = configuration.GetSection("RabbitMQ");
|
||||
if (!rabbitMqSection.Exists())
|
||||
{
|
||||
services.AddSingleton<IMessagePublisher, NoOpMessagePublisher>();
|
||||
services.AddSingleton<IMessageSubscriber, NoOpMessageSubscriber>();
|
||||
return services;
|
||||
}
|
||||
|
||||
// 1. (空行后) 存在 RabbitMQ 配置时才启用真实 MQ 能力(启动时验证配置完整性)
|
||||
services.AddOptions<RabbitMqOptions>()
|
||||
.Bind(rabbitMqSection)
|
||||
.ValidateDataAnnotations()
|
||||
.ValidateOnStart();
|
||||
|
||||
services.AddSingleton<JsonMessageSerializer>();
|
||||
services.AddSingleton<RabbitMqConnectionFactory>();
|
||||
services.AddSingleton<IMessagePublisher, RabbitMqMessagePublisher>();
|
||||
services.AddSingleton<IMessageSubscriber, RabbitMqMessageSubscriber>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Options;
|
||||
|
||||
/// <summary>
|
||||
/// RabbitMQ 连接与交换机配置。
|
||||
/// </summary>
|
||||
public sealed class RabbitMqOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// 主机名。
|
||||
/// </summary>
|
||||
[Required]
|
||||
public string Host { get; set; } = "localhost";
|
||||
|
||||
/// <summary>
|
||||
/// 端口。
|
||||
/// </summary>
|
||||
[Range(1, 65535)]
|
||||
public int Port { get; set; } = 5672;
|
||||
|
||||
/// <summary>
|
||||
/// 用户名。
|
||||
/// </summary>
|
||||
[Required]
|
||||
public string Username { get; set; } = "guest";
|
||||
|
||||
/// <summary>
|
||||
/// 密码。
|
||||
/// </summary>
|
||||
[Required]
|
||||
public string Password { get; set; } = "guest";
|
||||
|
||||
/// <summary>
|
||||
/// 虚拟主机。
|
||||
/// </summary>
|
||||
public string VirtualHost { get; set; } = "/";
|
||||
|
||||
/// <summary>
|
||||
/// 默认交换机名称。
|
||||
/// </summary>
|
||||
[Required]
|
||||
public string Exchange { get; set; } = "takeout.events";
|
||||
|
||||
/// <summary>
|
||||
/// 交换机类型,默认 topic。
|
||||
/// </summary>
|
||||
public string ExchangeType { get; set; } = "topic";
|
||||
|
||||
/// <summary>
|
||||
/// 消费预取数量。
|
||||
/// </summary>
|
||||
[Range(1, 1000)]
|
||||
public ushort PrefetchCount { get; set; } = 20;
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Serialization;
|
||||
|
||||
/// <summary>
|
||||
/// 消息 JSON 序列化器。
|
||||
/// </summary>
|
||||
public sealed class JsonMessageSerializer
|
||||
{
|
||||
private static readonly JsonSerializerOptions DefaultOptions = new(JsonSerializerDefaults.Web);
|
||||
|
||||
/// <summary>
|
||||
/// 序列化消息。
|
||||
/// </summary>
|
||||
public byte[] Serialize<T>(T message) => Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, DefaultOptions));
|
||||
|
||||
/// <summary>
|
||||
/// 反序列化消息。
|
||||
/// </summary>
|
||||
public T? Deserialize<T>(byte[] body) => JsonSerializer.Deserialize<T>(body, DefaultOptions);
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
/// <summary>
|
||||
/// 空实现消息发布器:用于未配置 RabbitMQ 的开发/测试场景,避免启动依赖外部 MQ。
|
||||
/// </summary>
|
||||
public sealed class NoOpMessagePublisher(ILogger<NoOpMessagePublisher> logger) : IMessagePublisher
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
logger.LogDebug(
|
||||
"未配置 RabbitMQ,已跳过消息发布:RoutingKey={RoutingKey} MessageType={MessageType}",
|
||||
routingKey,
|
||||
typeof(T).FullName ?? typeof(T).Name);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
/// <summary>
|
||||
/// 空实现消息订阅器:用于未配置 RabbitMQ 的开发/测试场景。
|
||||
/// </summary>
|
||||
public sealed class NoOpMessageSubscriber(ILogger<NoOpMessageSubscriber> logger) : IMessageSubscriber
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task SubscribeAsync<T>(string queue, string routingKey, Func<T, CancellationToken, Task<bool>> handler, CancellationToken cancellationToken = default)
|
||||
{
|
||||
logger.LogWarning("未配置 RabbitMQ,消息订阅被禁用:Queue={Queue} RoutingKey={RoutingKey}", queue, routingKey);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
// 1. (空行后) 释放资源(NoOp 实现无实际资源)
|
||||
/// <inheritdoc />
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
logger.LogDebug("NoOpMessageSubscriber 已释放。");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using TakeoutSaaS.Module.Messaging.Options;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
/// <summary>
|
||||
/// RabbitMQ 连接工厂封装。
|
||||
/// </summary>
|
||||
public sealed class RabbitMqConnectionFactory(IOptionsMonitor<RabbitMqOptions> optionsMonitor)
|
||||
{
|
||||
/// <summary>
|
||||
/// 创建连接。
|
||||
/// </summary>
|
||||
public async Task<IConnection> CreateConnectionAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var options = optionsMonitor.CurrentValue;
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
HostName = options.Host,
|
||||
Port = options.Port,
|
||||
UserName = options.Username,
|
||||
Password = options.Password,
|
||||
VirtualHost = options.VirtualHost
|
||||
};
|
||||
|
||||
return await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
using TakeoutSaaS.Module.Messaging.Options;
|
||||
using TakeoutSaaS.Module.Messaging.Serialization;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
/// <summary>
|
||||
/// RabbitMQ 消息发布实现。
|
||||
/// </summary>
|
||||
public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessagePublisher> logger)
|
||||
: IMessagePublisher, IAsyncDisposable
|
||||
{
|
||||
private IConnection? _connection;
|
||||
private IChannel? _channel;
|
||||
private bool _disposed;
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// 1. 确保通道可用
|
||||
await EnsureChannelAsync(cancellationToken);
|
||||
var options = optionsMonitor.CurrentValue;
|
||||
|
||||
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
|
||||
// 2. 声明交换机
|
||||
await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, arguments: null, noWait: false, cancellationToken).ConfigureAwait(false);
|
||||
// 3. 序列化消息并设置属性
|
||||
var body = serializer.Serialize(message);
|
||||
var props = new BasicProperties();
|
||||
props.ContentType = "application/json";
|
||||
props.DeliveryMode = DeliveryModes.Persistent;
|
||||
props.MessageId = Guid.NewGuid().ToString("N");
|
||||
|
||||
// 4. 发布消息
|
||||
await channel.BasicPublishAsync(options.Exchange, routingKey, mandatory: false, basicProperties: props, body: body, cancellationToken).ConfigureAwait(false);
|
||||
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
|
||||
}
|
||||
|
||||
private async Task EnsureChannelAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_channel != null && _channel.IsOpen)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
_channel = await _connection.CreateChannelAsync(new CreateChannelOptions(false, false, null, null), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 释放 RabbitMQ 资源。
|
||||
/// </summary>
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
return CloseAsync();
|
||||
}
|
||||
|
||||
private async ValueTask CloseAsync()
|
||||
{
|
||||
if (_channel != null)
|
||||
{
|
||||
await _channel.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (_connection != null)
|
||||
{
|
||||
await _connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
using TakeoutSaaS.Module.Messaging.Abstractions;
|
||||
using TakeoutSaaS.Module.Messaging.Options;
|
||||
using TakeoutSaaS.Module.Messaging.Serialization;
|
||||
|
||||
namespace TakeoutSaaS.Module.Messaging.Services;
|
||||
|
||||
/// <summary>
|
||||
/// RabbitMQ 消费者实现。
|
||||
/// </summary>
|
||||
public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessageSubscriber> logger)
|
||||
: IMessageSubscriber
|
||||
{
|
||||
private IConnection? _connection;
|
||||
private IChannel? _channel;
|
||||
private bool _disposed;
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task SubscribeAsync<T>(string queue, string routingKey, Func<T, CancellationToken, Task<bool>> handler, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// 1. 确保通道可用
|
||||
await EnsureChannelAsync(cancellationToken);
|
||||
var options = optionsMonitor.CurrentValue;
|
||||
|
||||
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
|
||||
|
||||
// 2. 声明交换机、队列及绑定
|
||||
await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, arguments: null, noWait: false, cancellationToken).ConfigureAwait(false);
|
||||
await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, arguments: null, noWait: false, cancellationToken).ConfigureAwait(false);
|
||||
await channel.QueueBindAsync(queue, options.Exchange, routingKey, arguments: null, noWait: false, cancellationToken).ConfigureAwait(false);
|
||||
await channel.BasicQosAsync(0, options.PrefetchCount, global: false, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// 3. 设置消费者回调
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, ea) =>
|
||||
{
|
||||
var message = serializer.Deserialize<T>(ea.Body.ToArray());
|
||||
if (message == null)
|
||||
{
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
var success = false;
|
||||
try
|
||||
{
|
||||
success = await handler(message, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "处理消息失败:{RoutingKey}", ea.RoutingKey);
|
||||
}
|
||||
|
||||
if (success)
|
||||
{
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
};
|
||||
|
||||
// 4. 开始消费
|
||||
await channel.BasicConsumeAsync(queue, autoAck: false, consumer, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task EnsureChannelAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_channel != null && _channel.IsOpen)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
_channel = await _connection.CreateChannelAsync(new CreateChannelOptions(false, false, null, null), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
if (_channel != null)
|
||||
{
|
||||
await _channel.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (_connection != null)
|
||||
{
|
||||
await _connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user