80 lines
2.9 KiB
C#
80 lines
2.9 KiB
C#
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using RabbitMQ.Client;
|
|
using TakeoutSaaS.Module.Messaging.Abstractions;
|
|
using TakeoutSaaS.Module.Messaging.Options;
|
|
using TakeoutSaaS.Module.Messaging.Serialization;
|
|
|
|
namespace TakeoutSaaS.Module.Messaging.Services;
|
|
|
|
/// <summary>
|
|
/// RabbitMQ 消息发布实现。
|
|
/// </summary>
|
|
public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessagePublisher> logger)
|
|
: IMessagePublisher, IAsyncDisposable
|
|
{
|
|
private IConnection? _connection;
|
|
private IChannel? _channel;
|
|
private bool _disposed;
|
|
|
|
/// <inheritdoc />
|
|
public async Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default)
|
|
{
|
|
// 1. 确保通道可用
|
|
await EnsureChannelAsync(cancellationToken);
|
|
var options = optionsMonitor.CurrentValue;
|
|
|
|
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
|
|
// 2. 声明交换机
|
|
await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, arguments: null, noWait: false, cancellationToken).ConfigureAwait(false);
|
|
// 3. 序列化消息并设置属性
|
|
var body = serializer.Serialize(message);
|
|
var props = new BasicProperties();
|
|
props.ContentType = "application/json";
|
|
props.DeliveryMode = DeliveryModes.Persistent;
|
|
props.MessageId = Guid.NewGuid().ToString("N");
|
|
|
|
// 4. 发布消息
|
|
await channel.BasicPublishAsync(options.Exchange, routingKey, mandatory: false, basicProperties: props, body: body, cancellationToken).ConfigureAwait(false);
|
|
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
|
|
}
|
|
|
|
private async Task EnsureChannelAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (_channel != null && _channel.IsOpen)
|
|
{
|
|
return;
|
|
}
|
|
|
|
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
|
|
_channel = await _connection.CreateChannelAsync(new CreateChannelOptions(false, false, null, null), cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 释放 RabbitMQ 资源。
|
|
/// </summary>
|
|
public ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
|
|
_disposed = true;
|
|
return CloseAsync();
|
|
}
|
|
|
|
private async ValueTask CloseAsync()
|
|
{
|
|
if (_channel != null)
|
|
{
|
|
await _channel.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
|
}
|
|
|
|
if (_connection != null)
|
|
{
|
|
await _connection.CloseAsync(CancellationToken.None).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|