using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using TakeoutSaaS.Module.Messaging.Abstractions; using TakeoutSaaS.Module.Messaging.Options; using TakeoutSaaS.Module.Messaging.Serialization; namespace TakeoutSaaS.Module.Messaging.Services; /// /// RabbitMQ 消息发布实现。 /// public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor optionsMonitor, JsonMessageSerializer serializer, ILogger logger) : IMessagePublisher, IAsyncDisposable { private IConnection? _connection; private IChannel? _channel; private bool _disposed; /// public async Task PublishAsync(string routingKey, T message, CancellationToken cancellationToken = default) { // 1. 确保通道可用 await EnsureChannelAsync(cancellationToken); var options = optionsMonitor.CurrentValue; var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); // 2. 声明交换机 await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, cancellationToken: cancellationToken); // 3. 序列化消息并设置属性 var body = serializer.Serialize(message); var props = new BasicProperties { ContentType = "application/json", DeliveryMode = DeliveryModes.Persistent, MessageId = Guid.NewGuid().ToString("N") }; // 4. 发布消息 await channel.BasicPublishAsync(options.Exchange, routingKey, mandatory: false, props, body, cancellationToken); logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey); } private async Task EnsureChannelAsync(CancellationToken cancellationToken) { if (_channel != null && _channel.IsOpen) { return; } _connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken); _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); } /// /// 释放 RabbitMQ 资源。 /// public ValueTask DisposeAsync() { if (_disposed) { return ValueTask.CompletedTask; } _disposed = true; _channel?.Dispose(); _connection?.Dispose(); return ValueTask.CompletedTask; } }