调整RabbitMQ客户端版本并兼容同步调用

This commit is contained in:
2025-12-27 10:25:18 +08:00
parent c9980ef237
commit 7aeef0a24d
4 changed files with 22 additions and 23 deletions

View File

@@ -14,6 +14,7 @@ public sealed class RabbitMqConnectionFactory(IOptionsMonitor<RabbitMqOptions> o
/// </summary> /// </summary>
public Task<IConnection> CreateConnectionAsync(CancellationToken cancellationToken = default) public Task<IConnection> CreateConnectionAsync(CancellationToken cancellationToken = default)
{ {
cancellationToken.ThrowIfCancellationRequested();
var options = optionsMonitor.CurrentValue; var options = optionsMonitor.CurrentValue;
var factory = new ConnectionFactory var factory = new ConnectionFactory
{ {
@@ -24,6 +25,6 @@ public sealed class RabbitMqConnectionFactory(IOptionsMonitor<RabbitMqOptions> o
VirtualHost = options.VirtualHost VirtualHost = options.VirtualHost
}; };
return factory.CreateConnectionAsync(cancellationToken); return Task.FromResult(factory.CreateConnection());
} }
} }

View File

@@ -14,7 +14,7 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio
: IMessagePublisher, IAsyncDisposable : IMessagePublisher, IAsyncDisposable
{ {
private IConnection? _connection; private IConnection? _connection;
private IChannel? _channel; private IModel? _channel;
private bool _disposed; private bool _disposed;
/// <inheritdoc /> /// <inheritdoc />
@@ -26,18 +26,16 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
// 2. 声明交换机 // 2. 声明交换机
await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, cancellationToken: cancellationToken); channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
// 3. 序列化消息并设置属性 // 3. 序列化消息并设置属性
var body = serializer.Serialize(message); var body = serializer.Serialize(message);
var props = new BasicProperties var props = channel.CreateBasicProperties();
{ props.ContentType = "application/json";
ContentType = "application/json", props.DeliveryMode = 2;
DeliveryMode = DeliveryModes.Persistent, props.MessageId = Guid.NewGuid().ToString("N");
MessageId = Guid.NewGuid().ToString("N")
};
// 4. 发布消息 // 4. 发布消息
await channel.BasicPublishAsync(options.Exchange, routingKey, mandatory: false, props, body, cancellationToken); channel.BasicPublish(options.Exchange, routingKey, mandatory: false, basicProperties: props, body: body);
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey); logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
} }
@@ -49,7 +47,7 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio
} }
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken); _connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); _channel = _connection.CreateModel();
} }
/// <summary> /// <summary>

View File

@@ -15,7 +15,7 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
: IMessageSubscriber : IMessageSubscriber
{ {
private IConnection? _connection; private IConnection? _connection;
private IChannel? _channel; private IModel? _channel;
private bool _disposed; private bool _disposed;
/// <inheritdoc /> /// <inheritdoc />
@@ -28,19 +28,19 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
// 2. 声明交换机、队列及绑定 // 2. 声明交换机、队列及绑定
await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, cancellationToken: cancellationToken); channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue, options.Exchange, routingKey, cancellationToken: cancellationToken); channel.QueueBind(queue, options.Exchange, routingKey);
await channel.BasicQosAsync(0, options.PrefetchCount, global: false, cancellationToken: cancellationToken); channel.BasicQos(0, options.PrefetchCount, global: false);
// 3. 设置消费者回调 // 3. 设置消费者回调
var consumer = new AsyncEventingBasicConsumer(channel); var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) => consumer.Received += async (_, ea) =>
{ {
var message = serializer.Deserialize<T>(ea.Body.ToArray()); var message = serializer.Deserialize<T>(ea.Body.ToArray());
if (message == null) if (message == null)
{ {
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken); channel.BasicAck(ea.DeliveryTag, multiple: false);
return; return;
} }
@@ -56,16 +56,16 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
if (success) if (success)
{ {
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken); channel.BasicAck(ea.DeliveryTag, multiple: false);
} }
else else
{ {
await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false, cancellationToken); channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
} }
}; };
// 4. 开始消费 // 4. 开始消费
await channel.BasicConsumeAsync(queue, autoAck: false, consumer, cancellationToken); channel.BasicConsume(queue, autoAck: false, consumer);
} }
private async Task EnsureChannelAsync(CancellationToken cancellationToken) private async Task EnsureChannelAsync(CancellationToken cancellationToken)
@@ -76,7 +76,7 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
} }
_connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken); _connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); _channel = _connection.CreateModel();
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@@ -11,7 +11,7 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.0" /> <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="10.0.0" /> <PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="10.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" /> <PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Core\TakeoutSaaS.Shared.Abstractions\TakeoutSaaS.Shared.Abstractions.csproj" /> <ProjectReference Include="..\..\Core\TakeoutSaaS.Shared.Abstractions\TakeoutSaaS.Shared.Abstractions.csproj" />