Files
TakeoutSaaS.TenantApi/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs

69 lines
2.4 KiB
C#

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using TakeoutSaaS.Module.Messaging.Abstractions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Messaging.Serialization;
namespace TakeoutSaaS.Module.Messaging.Services;
/// <summary>
/// RabbitMQ 消息发布实现。
/// </summary>
public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessagePublisher> logger)
: IMessagePublisher, IAsyncDisposable
{
private IConnection? _connection;
private IModel? _channel;
private bool _disposed;
/// <inheritdoc />
public async Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default)
{
// 1. 确保通道可用
await EnsureChannelAsync(cancellationToken);
var options = optionsMonitor.CurrentValue;
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
// 2. 声明交换机
channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
// 3. 序列化消息并设置属性
var body = serializer.Serialize(message);
var props = channel.CreateBasicProperties();
props.ContentType = "application/json";
props.DeliveryMode = 2;
props.MessageId = Guid.NewGuid().ToString("N");
// 4. 发布消息
channel.BasicPublish(options.Exchange, routingKey, mandatory: false, basicProperties: props, body: body);
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
}
private async Task EnsureChannelAsync(CancellationToken cancellationToken)
{
if (_channel != null && _channel.IsOpen)
{
return;
}
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken);
_channel = _connection.CreateModel();
}
/// <summary>
/// 释放 RabbitMQ 资源。
/// </summary>
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
_connection?.Dispose();
return ValueTask.CompletedTask;
}
}