From 3e01943727cd1877f2f8547bad0ba0f9d506bb80 Mon Sep 17 00:00:00 2001 From: MSuMshk <2039814060@qq.com> Date: Tue, 2 Dec 2025 11:18:38 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=B6=88=E9=99=A4=E6=9E=84=E5=BB=BA?= =?UTF-8?q?=E8=AD=A6=E5=91=8A=E5=B9=B6=E5=8D=87=E7=BA=A7=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Services/RabbitMqMessagePublisher.cs | 7 ++++--- .../Services/RabbitMqMessageSubscriber.cs | 20 ++++++++++--------- .../SchedulerServiceCollectionExtensions.cs | 5 ++++- .../TakeoutSaaS.Module.Scheduler.csproj | 1 + 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs index 113ee3c..701a992 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs +++ b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessagePublisher.cs @@ -25,14 +25,15 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio EnsureChannel(); var options = optionsMonitor.CurrentValue; - _channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); + var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); + channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); var body = serializer.Serialize(message); - var props = _channel.CreateBasicProperties(); + var props = channel.CreateBasicProperties(); props.ContentType = "application/json"; props.DeliveryMode = 2; props.MessageId = Guid.NewGuid().ToString("N"); - _channel.BasicPublish(options.Exchange, routingKey, props, body); + channel.BasicPublish(options.Exchange, routingKey, props, body); logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey); return Task.CompletedTask; } diff --git a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs index 88f19a9..1acba98 100644 --- a/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs +++ b/src/Modules/TakeoutSaaS.Module.Messaging/Services/RabbitMqMessageSubscriber.cs @@ -24,18 +24,20 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti EnsureChannel(); var options = optionsMonitor.CurrentValue; - _channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); - _channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); - _channel.QueueBind(queue, options.Exchange, routingKey); - _channel.BasicQos(0, options.PrefetchCount, global: false); + var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available."); - var consumer = new AsyncEventingBasicConsumer(_channel); + channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false); + channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); + channel.QueueBind(queue, options.Exchange, routingKey); + channel.BasicQos(0, options.PrefetchCount, global: false); + + var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += async (_, ea) => { var message = serializer.Deserialize(ea.Body.ToArray()); if (message == null) { - _channel.BasicAck(ea.DeliveryTag, multiple: false); + channel.BasicAck(ea.DeliveryTag, multiple: false); return; } @@ -51,15 +53,15 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti if (success) { - _channel.BasicAck(ea.DeliveryTag, multiple: false); + channel.BasicAck(ea.DeliveryTag, multiple: false); } else { - _channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); + channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); } }; - _channel.BasicConsume(queue, autoAck: false, consumer); + channel.BasicConsume(queue, autoAck: false, consumer); await Task.CompletedTask.ConfigureAwait(false); } diff --git a/src/Modules/TakeoutSaaS.Module.Scheduler/Extensions/SchedulerServiceCollectionExtensions.cs b/src/Modules/TakeoutSaaS.Module.Scheduler/Extensions/SchedulerServiceCollectionExtensions.cs index f80b65d..db86c46 100644 --- a/src/Modules/TakeoutSaaS.Module.Scheduler/Extensions/SchedulerServiceCollectionExtensions.cs +++ b/src/Modules/TakeoutSaaS.Module.Scheduler/Extensions/SchedulerServiceCollectionExtensions.cs @@ -33,7 +33,10 @@ public static class SchedulerServiceCollectionExtensions config .UseSimpleAssemblyNameTypeSerializer() .UseRecommendedSerializerSettings() - .UsePostgreSqlStorage(options.ConnectionString); + .UsePostgreSqlStorage(storage => + { + storage.UseNpgsqlConnection(options.ConnectionString); + }); }); services.AddHangfireServer((serviceProvider, options) => diff --git a/src/Modules/TakeoutSaaS.Module.Scheduler/TakeoutSaaS.Module.Scheduler.csproj b/src/Modules/TakeoutSaaS.Module.Scheduler/TakeoutSaaS.Module.Scheduler.csproj index 8e4c663..6c7f697 100644 --- a/src/Modules/TakeoutSaaS.Module.Scheduler/TakeoutSaaS.Module.Scheduler.csproj +++ b/src/Modules/TakeoutSaaS.Module.Scheduler/TakeoutSaaS.Module.Scheduler.csproj @@ -7,6 +7,7 @@ +