feat: finalize core modules and gateway

This commit is contained in:
2025-11-23 18:53:12 +08:00
parent 429d4fb747
commit ae273e510a
115 changed files with 4695 additions and 223 deletions

View File

@@ -0,0 +1,15 @@
using System.Threading;
using System.Threading.Tasks;
namespace TakeoutSaaS.Module.Messaging.Abstractions;
/// <summary>
/// 消息发布抽象。
/// </summary>
public interface IMessagePublisher
{
/// <summary>
/// 发布消息到指定路由键。
/// </summary>
Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace TakeoutSaaS.Module.Messaging.Abstractions;
/// <summary>
/// 消息订阅抽象。
/// </summary>
public interface IMessageSubscriber : IAsyncDisposable
{
/// <summary>
/// 订阅指定队列与路由键,处理后返回是否消费成功。
/// </summary>
Task SubscribeAsync<T>(string queue, string routingKey, Func<T, CancellationToken, Task<bool>> handler, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,32 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using TakeoutSaaS.Module.Messaging.Abstractions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Messaging.Serialization;
using TakeoutSaaS.Module.Messaging.Services;
namespace TakeoutSaaS.Module.Messaging.Extensions;
/// <summary>
/// 消息队列模块注册扩展。
/// </summary>
public static class MessagingServiceCollectionExtensions
{
/// <summary>
/// 注册 RabbitMQ 发布/订阅能力。
/// </summary>
public static IServiceCollection AddMessagingModule(this IServiceCollection services, IConfiguration configuration)
{
services.AddOptions<RabbitMqOptions>()
.Bind(configuration.GetSection("RabbitMQ"))
.ValidateDataAnnotations()
.ValidateOnStart();
services.AddSingleton<JsonMessageSerializer>();
services.AddSingleton<RabbitMqConnectionFactory>();
services.AddSingleton<IMessagePublisher, RabbitMqMessagePublisher>();
services.AddSingleton<IMessageSubscriber, RabbitMqMessageSubscriber>();
return services;
}
}

View File

@@ -0,0 +1,55 @@
using System.ComponentModel.DataAnnotations;
namespace TakeoutSaaS.Module.Messaging.Options;
/// <summary>
/// RabbitMQ 连接与交换机配置。
/// </summary>
public sealed class RabbitMqOptions
{
/// <summary>
/// 主机名。
/// </summary>
[Required]
public string Host { get; set; } = "localhost";
/// <summary>
/// 端口。
/// </summary>
[Range(1, 65535)]
public int Port { get; set; } = 5672;
/// <summary>
/// 用户名。
/// </summary>
[Required]
public string Username { get; set; } = "guest";
/// <summary>
/// 密码。
/// </summary>
[Required]
public string Password { get; set; } = "guest";
/// <summary>
/// 虚拟主机。
/// </summary>
public string VirtualHost { get; set; } = "/";
/// <summary>
/// 默认交换机名称。
/// </summary>
[Required]
public string Exchange { get; set; } = "takeout.events";
/// <summary>
/// 交换机类型,默认 topic。
/// </summary>
public string ExchangeType { get; set; } = "topic";
/// <summary>
/// 消费预取数量。
/// </summary>
[Range(1, 1000)]
public ushort PrefetchCount { get; set; } = 20;
}

View File

@@ -0,0 +1,22 @@
using System.Text;
using System.Text.Json;
namespace TakeoutSaaS.Module.Messaging.Serialization;
/// <summary>
/// 消息 JSON 序列化器。
/// </summary>
public sealed class JsonMessageSerializer
{
private static readonly JsonSerializerOptions DefaultOptions = new(JsonSerializerDefaults.Web);
/// <summary>
/// 序列化消息。
/// </summary>
public byte[] Serialize<T>(T message) => Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, DefaultOptions));
/// <summary>
/// 反序列化消息。
/// </summary>
public T? Deserialize<T>(byte[] body) => JsonSerializer.Deserialize<T>(body, DefaultOptions);
}

View File

@@ -0,0 +1,30 @@
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using TakeoutSaaS.Module.Messaging.Options;
namespace TakeoutSaaS.Module.Messaging.Services;
/// <summary>
/// RabbitMQ 连接工厂封装。
/// </summary>
public sealed class RabbitMqConnectionFactory(IOptionsMonitor<RabbitMqOptions> optionsMonitor)
{
/// <summary>
/// 创建连接。
/// </summary>
public IConnection CreateConnection()
{
var options = optionsMonitor.CurrentValue;
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
UserName = options.Username,
Password = options.Password,
VirtualHost = options.VirtualHost,
DispatchConsumersAsync = true
};
return factory.CreateConnection();
}
}

View File

@@ -0,0 +1,66 @@
using System;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using TakeoutSaaS.Module.Messaging.Abstractions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Messaging.Serialization;
namespace TakeoutSaaS.Module.Messaging.Services;
/// <summary>
/// RabbitMQ 消息发布实现。
/// </summary>
public sealed class RabbitMqMessagePublisher(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessagePublisher> logger)
: IMessagePublisher, IAsyncDisposable
{
private IConnection? _connection;
private IModel? _channel;
private bool _disposed;
/// <inheritdoc />
public Task PublishAsync<T>(string routingKey, T message, CancellationToken cancellationToken = default)
{
EnsureChannel();
var options = optionsMonitor.CurrentValue;
_channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
var body = serializer.Serialize(message);
var props = _channel.CreateBasicProperties();
props.ContentType = "application/json";
props.DeliveryMode = 2;
props.MessageId = Guid.NewGuid().ToString("N");
_channel.BasicPublish(options.Exchange, routingKey, props, body);
logger.LogDebug("发布消息到交换机 {Exchange} RoutingKey {RoutingKey}", options.Exchange, routingKey);
return Task.CompletedTask;
}
private void EnsureChannel()
{
if (_channel != null && _channel.IsOpen)
{
return;
}
_connection ??= connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
}
/// <summary>
/// 释放 RabbitMQ 资源。
/// </summary>
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
_connection?.Dispose();
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,92 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using TakeoutSaaS.Module.Messaging.Abstractions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Messaging.Serialization;
namespace TakeoutSaaS.Module.Messaging.Services;
/// <summary>
/// RabbitMQ 消费者实现。
/// </summary>
public sealed class RabbitMqMessageSubscriber(RabbitMqConnectionFactory connectionFactory, IOptionsMonitor<RabbitMqOptions> optionsMonitor, JsonMessageSerializer serializer, ILogger<RabbitMqMessageSubscriber> logger)
: IMessageSubscriber
{
private IConnection? _connection;
private IModel? _channel;
private bool _disposed;
/// <inheritdoc />
public async Task SubscribeAsync<T>(string queue, string routingKey, Func<T, CancellationToken, Task<bool>> handler, CancellationToken cancellationToken = default)
{
EnsureChannel();
var options = optionsMonitor.CurrentValue;
_channel!.ExchangeDeclare(options.Exchange, options.ExchangeType, durable: true, autoDelete: false);
_channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(queue, options.Exchange, routingKey);
_channel.BasicQos(0, options.PrefetchCount, global: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (_, ea) =>
{
var message = serializer.Deserialize<T>(ea.Body.ToArray());
if (message == null)
{
_channel.BasicAck(ea.DeliveryTag, multiple: false);
return;
}
var success = false;
try
{
success = await handler(message, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "处理消息失败:{RoutingKey}", ea.RoutingKey);
}
if (success)
{
_channel.BasicAck(ea.DeliveryTag, multiple: false);
}
else
{
_channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
}
};
_channel.BasicConsume(queue, autoAck: false, consumer);
await Task.CompletedTask.ConfigureAwait(false);
}
private void EnsureChannel()
{
if (_channel != null && _channel.IsOpen)
{
return;
}
_connection ??= connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
await Task.Run(() =>
{
_channel?.Dispose();
_connection?.Dispose();
}).ConfigureAwait(false);
}
}

View File

@@ -5,10 +5,15 @@
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.6.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Core\TakeoutSaaS.Shared.Abstractions\TakeoutSaaS.Shared.Abstractions.csproj" />
</ItemGroup>
</Project>