chore: 消除构建警告并升级依赖
This commit is contained in:
@@ -25,14 +25,15 @@ public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectio
|
|||||||
EnsureChannel();
|
EnsureChannel();
|
||||||
var options = optionsMonitor.CurrentValue;
|
var options = optionsMonitor.CurrentValue;
|
||||||
|
|
||||||
_channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
|
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
|
||||||
|
channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
|
||||||
var body = serializer.Serialize(message);
|
var body = serializer.Serialize(message);
|
||||||
var props = _channel.CreateBasicProperties();
|
var props = channel.CreateBasicProperties();
|
||||||
props.ContentType = "application/json";
|
props.ContentType = "application/json";
|
||||||
props.DeliveryMode = 2;
|
props.DeliveryMode = 2;
|
||||||
props.MessageId = Guid.NewGuid().ToString("N");
|
props.MessageId = Guid.NewGuid().ToString("N");
|
||||||
|
|
||||||
_channel.BasicPublish(options.Exchange, routingKey, props, body);
|
channel.BasicPublish(options.Exchange, routingKey, props, body);
|
||||||
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
|
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,18 +24,20 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
|
|||||||
EnsureChannel();
|
EnsureChannel();
|
||||||
var options = optionsMonitor.CurrentValue;
|
var options = optionsMonitor.CurrentValue;
|
||||||
|
|
||||||
_channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
|
var channel = _channel ?? throw new InvalidOperationException("RabbitMQ channel is not available.");
|
||||||
_channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
|
|
||||||
_channel.QueueBind(queue, options.Exchange, routingKey);
|
|
||||||
_channel.BasicQos(0, options.PrefetchCount, global: false);
|
|
||||||
|
|
||||||
var consumer = new AsyncEventingBasicConsumer(_channel);
|
channel.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
|
||||||
|
channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
|
||||||
|
channel.QueueBind(queue, options.Exchange, routingKey);
|
||||||
|
channel.BasicQos(0, options.PrefetchCount, global: false);
|
||||||
|
|
||||||
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||||
consumer.Received += async (_, ea) =>
|
consumer.Received += async (_, ea) =>
|
||||||
{
|
{
|
||||||
var message = serializer.Deserialize<T>(ea.Body.ToArray());
|
var message = serializer.Deserialize<T>(ea.Body.ToArray());
|
||||||
if (message == null)
|
if (message == null)
|
||||||
{
|
{
|
||||||
_channel.BasicAck(ea.DeliveryTag, multiple: false);
|
channel.BasicAck(ea.DeliveryTag, multiple: false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,15 +53,15 @@ public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connecti
|
|||||||
|
|
||||||
if (success)
|
if (success)
|
||||||
{
|
{
|
||||||
_channel.BasicAck(ea.DeliveryTag, multiple: false);
|
channel.BasicAck(ea.DeliveryTag, multiple: false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
|
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
_channel.BasicConsume(queue, autoAck: false, consumer);
|
channel.BasicConsume(queue, autoAck: false, consumer);
|
||||||
await Task.CompletedTask.ConfigureAwait(false);
|
await Task.CompletedTask.ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,10 @@ public static class SchedulerServiceCollectionExtensions
|
|||||||
config
|
config
|
||||||
.UseSimpleAssemblyNameTypeSerializer()
|
.UseSimpleAssemblyNameTypeSerializer()
|
||||||
.UseRecommendedSerializerSettings()
|
.UseRecommendedSerializerSettings()
|
||||||
.UsePostgreSqlStorage(options.ConnectionString);
|
.UsePostgreSqlStorage(storage =>
|
||||||
|
{
|
||||||
|
storage.UseNpgsqlConnection(options.ConnectionString);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
services.AddHangfireServer((serviceProvider, options) =>
|
services.AddHangfireServer((serviceProvider, options) =>
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.14" />
|
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.14" />
|
||||||
<PackageReference Include="Hangfire.PostgreSql" Version="1.20.12" />
|
<PackageReference Include="Hangfire.PostgreSql" Version="1.20.12" />
|
||||||
|
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
|
||||||
|
|||||||
Reference in New Issue
Block a user