Files
TakeoutSaaS.TenantApi/src/Worker/TakeoutSaaS.SkuWorker/Program.cs
MSuMshk 77caac3af9
All checks were successful
Build and Deploy TenantApi / build-and-deploy (push) Successful in 51s
feat: SKU保存链路切换到RabbitMQ Outbox并新增独立Worker
2026-02-25 11:20:38 +08:00

63 lines
2.3 KiB
C#

using MassTransit;
using Microsoft.Extensions.Options;
using TakeoutSaaS.Infrastructure.App.Persistence;
using TakeoutSaaS.Infrastructure.Common.Extensions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Tenancy.Extensions;
using TakeoutSaaS.Shared.Abstractions.Constants;
using TakeoutSaaS.TenantApi.Services;
using TakeoutSaaS.SkuWorker.Consumers;
using TakeoutSaaS.Application.App.Products.Messaging;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddDatabaseInfrastructure(builder.Configuration);
builder.Services.AddPostgresDbContext<TakeoutAppDbContext>(DatabaseConstants.AppDataSource);
builder.Services.AddTenantResolution(builder.Configuration);
builder.Services.AddScoped<ProductSkuSaveService>();
builder.Services.AddScoped<ProductSkuSaveJobRunner>();
builder.Services.AddOptions<RabbitMqOptions>()
.Bind(builder.Configuration.GetSection("RabbitMQ"))
.ValidateDataAnnotations()
.ValidateOnStart();
builder.Services.AddMassTransit(configurator =>
{
configurator.AddConsumer<ProductSkuSaveJobRequestedConsumer>(consumerConfigurator =>
{
consumerConfigurator.UseMessageRetry(retry =>
{
retry.Exponential(
retryLimit: 5,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromSeconds(30),
intervalDelta: TimeSpan.FromSeconds(2));
});
});
configurator.UsingRabbitMq((context, cfg) =>
{
var options = context.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
var virtualHost = string.IsNullOrWhiteSpace(options.VirtualHost) ? "/" : options.VirtualHost.Trim();
var virtualHostPath = virtualHost == "/" ? "/" : $"/{virtualHost.TrimStart('/')}";
var hostUri = new Uri($"rabbitmq://{options.Host}:{options.Port}{virtualHostPath}");
cfg.Host(hostUri, host =>
{
host.Username(options.Username);
host.Password(options.Password);
});
cfg.PrefetchCount = options.PrefetchCount;
cfg.ReceiveEndpoint(ProductSkuSaveJobRequestedEvent.QueueName, endpoint =>
{
endpoint.PrefetchCount = options.PrefetchCount;
endpoint.ConfigureConsumer<ProductSkuSaveJobRequestedConsumer>(context);
});
});
});
var app = builder.Build();
await app.RunAsync();