diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqConnectionFactory.cs b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqConnectionFactory.cs index e015c72..1ffa9dd 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqConnectionFactory.cs +++ b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqConnectionFactory.cs @@ -14,6 +14,7 @@ public sealed class RabbitMqConnectionFactory(IOptionsMonitor o /// public Task CreateConnectionAsync(CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); var options = optionsMonitor.CurrentValue; var factory = new ConnectionFactory { @@ -24,6 +25,6 @@ public sealed class RabbitMqConnectionFactory(IOptionsMonitor o VirtualHost = options.VirtualHost }; - return factory.CreateConnectionAsync(cancellationToken); + return Task.FromResult(factory.CreateConnection()); } } diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs index 4a5b215..2319a23 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs +++ b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs @@ -14,7 +14,7 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio : IMessagePublisher, IAsyncDisposable { private IConnection? _connection; - private IChannel? _channel; + private IModel? _channel; private bool _disposed; /// @@ -26,18 +26,16 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); // 2. 声明交换机 - await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, cancellationToken: cancellationToken); + channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); // 3. 序列化消息并设置属性 var body = serializer.Serialize(message); - var props = new BasicProperties - { - ContentType = "application/json", - DeliveryMode = DeliveryModes.Persistent, - MessageId = Guid.NewGuid().ToString("N") - }; + var props = channel.CreateBasicProperties(); + props.ContentType = "application/json"; + props.DeliveryMode = 2; + props.MessageId = Guid.NewGuid().ToString("N"); // 4. 发布消息 - await channel.BasicPublishAsync(options.Exchange, routingKey, mandatory: false, props, body, cancellationToken); + channel.BasicPublish(options.Exchange, routingKey, mandatory: false, basicProperties: props, body: body); logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey); } @@ -49,7 +47,7 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio } _connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + _channel = _connection.CreateModel(); } /// diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs index 6287c77..608ee5a 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs +++ b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs @@ -15,7 +15,7 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti : IMessageSubscriber { private IConnection? _connection; - private IChannel? _channel; + private IModel? _channel; private bool _disposed; /// @@ -28,19 +28,19 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); // 2. 声明交换机、队列及绑定 - await channel.ExchangeDeclareAsync(options.Exchange, options.ExchangeType, durable: true, autoDelete: false, cancellationToken: cancellationToken); - await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken); - await channel.QueueBindAsync(queue, options.Exchange, routingKey, cancellationToken: cancellationToken); - await channel.BasicQosAsync(0, options.PrefetchCount, global: false, cancellationToken: cancellationToken); + channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); + channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false, arguments: null); + channel.QueueBind(queue, options.Exchange, routingKey); + channel.BasicQos(0, options.PrefetchCount, global: false); // 3. 设置消费者回调 var consumer = new AsyncEventingBasicConsumer(channel); - consumer.ReceivedAsync += async (_, ea) => + consumer.Received += async (_, ea) => { var message = serializer.Deserialize(ea.Body.ToArray()); if (message == null) { - await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken); + channel.BasicAck(ea.DeliveryTag, multiple: false); return; } @@ -56,16 +56,16 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti if (success) { - await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken); + channel.BasicAck(ea.DeliveryTag, multiple: false); } else { - await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false, cancellationToken); + channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); } }; // 4. 开始消费 - await channel.BasicConsumeAsync(queue, autoAck: false, consumer, cancellationToken); + channel.BasicConsume(queue, autoAck: false, consumer); } private async Task EnsureChannelAsync(CancellationToken cancellationToken) @@ -76,7 +76,7 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti } _connection ??= await connectionFactory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + _channel = _connection.CreateModel(); } /// diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/TakeoutSaaS.Module.Messaging.csproj b/src/Modules/TakeoutSaaS.Module.Messaging/TakeoutSaaS.Module.Messaging.csproj index f5650aa..6ac643f 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/TakeoutSaaS.Module.Messaging.csproj +++ b/src/Modules/TakeoutSaaS.Module.Messaging/TakeoutSaaS.Module.Messaging.csproj @@ -11,7 +11,7 @@ - +