feat: SKU保存链路切换到RabbitMQ Outbox并新增独立Worker
All checks were successful
Build and Deploy TenantApi / build-and-deploy (push) Successful in 51s

This commit is contained in:
2026-02-25 11:20:38 +08:00
parent aeef4ca649
commit 77caac3af9
13 changed files with 9475 additions and 18 deletions

View File

@@ -47,6 +47,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TakeoutSaaS.Module.Sms", "s
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TakeoutSaaS.TenantApi", "src\Api\TakeoutSaaS.TenantApi\TakeoutSaaS.TenantApi.csproj", "{F53E274A-838A-477A-8D29-6EEB0DBD62CD}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Worker", "Worker", "{89BA21D6-604E-9DA1-5F6C-9062FD58212E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TakeoutSaaS.SkuWorker", "src\Worker\TakeoutSaaS.SkuWorker\TakeoutSaaS.SkuWorker.csproj", "{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -56,7 +60,7 @@ Global
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{022FCF39-EC48-46EA-AC08-FA2EAD1548B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{022FCF39-EC48-46EA-AC08-FA2EAD1548B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{022FCF39-EC48-46EA-AC08-FA2EAD1548B7}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -222,12 +226,12 @@ Global
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|Any CPU.Build.0 = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x64.ActiveCfg = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x64.Build.0 = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x86.ActiveCfg = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x86.Build.0 = Release|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|x64.ActiveCfg = Debug|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x64.Build.0 = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x86.ActiveCfg = Release|Any CPU
{38011EC3-7EC3-40E4-B9B2-E631966B350B}.Release|x86.Build.0 = Release|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|x64.ActiveCfg = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|x64.Build.0 = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|x86.ActiveCfg = Debug|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Debug|x86.Build.0 = Debug|Any CPU
@@ -237,6 +241,18 @@ Global
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Release|x64.Build.0 = Release|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Release|x86.ActiveCfg = Release|Any CPU
{F53E274A-838A-477A-8D29-6EEB0DBD62CD}.Release|x86.Build.0 = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|x64.ActiveCfg = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|x64.Build.0 = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|x86.ActiveCfg = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Debug|x86.Build.0 = Debug|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|Any CPU.Build.0 = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|x64.ActiveCfg = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|x64.Build.0 = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|x86.ActiveCfg = Release|Any CPU
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -259,9 +275,11 @@ Global
{05058F44-6FB7-43AF-8648-8BF538E283EF} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{5C12177E-6C25-4F78-BFD4-AA073CFC0650} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{5ADA37B6-09A0-48F7-8633-C266FD5BBFD1} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{FE49A9E7-1228-45BA-9B71-337AA353FE98} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{9C2F510E-4054-482D-AFD3-D2E374D60304} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{38011EC3-7EC3-40E4-B9B2-E631966B350B} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{F53E274A-838A-477A-8D29-6EEB0DBD62CD} = {81034408-37C8-1011-444E-4C15C2FADA8E}
EndGlobalSection
{FE49A9E7-1228-45BA-9B71-337AA353FE98} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{9C2F510E-4054-482D-AFD3-D2E374D60304} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{38011EC3-7EC3-40E4-B9B2-E631966B350B} = {EC447DCF-ABFA-6E24-52A5-D7FD48A5C558}
{F53E274A-838A-477A-8D29-6EEB0DBD62CD} = {81034408-37C8-1011-444E-4C15C2FADA8E}
{89BA21D6-604E-9DA1-5F6C-9062FD58212E} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{1482D41E-4F00-4B21-8CC0-9025FA41E5E3} = {89BA21D6-604E-9DA1-5F6C-9062FD58212E}
EndGlobalSection
EndGlobal

View File

@@ -1,10 +1,11 @@
using System.Globalization;
using System.Text.Json;
using Hangfire;
using MassTransit;
using MediatR;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using TakeoutSaaS.Application.App.Products.Messaging;
using TakeoutSaaS.Application.App.Products.Commands;
using TakeoutSaaS.Application.App.Products.Dto;
using TakeoutSaaS.Application.App.Products.Queries;
@@ -14,8 +15,10 @@ using TakeoutSaaS.Domain.Products.Enums;
using TakeoutSaaS.Infrastructure.App.Persistence;
using TakeoutSaaS.Shared.Abstractions.Constants;
using TakeoutSaaS.Shared.Abstractions.Exceptions;
using TakeoutSaaS.Shared.Abstractions.Ids;
using TakeoutSaaS.Shared.Abstractions.Results;
using TakeoutSaaS.Shared.Web.Api;
using TakeoutSaaS.Shared.Web.Security;
using TakeoutSaaS.TenantApi.Contracts.Product;
using TakeoutSaaS.TenantApi.Services;
@@ -32,7 +35,8 @@ public sealed class ProductController(
TakeoutAppDbContext dbContext,
StoreContextService storeContextService,
ProductSkuSaveService productSkuSaveService,
IBackgroundJobClient backgroundJobClient) : BaseApiController
IIdGenerator idGenerator,
IPublishEndpoint publishEndpoint) : BaseApiController
{
/// <summary>
/// 商品列表。
@@ -1544,6 +1548,7 @@ public sealed class ProductController(
var entity = new ProductSkuSaveJob
{
Id = idGenerator.NextId(),
StoreId = storeId,
ProductId = productId,
Status = ProductSkuSaveJobStatus.Queued,
@@ -1553,13 +1558,28 @@ public sealed class ProductController(
ProgressProcessed = 0,
FailedCount = 0
};
await dbContext.ProductSkuSaveJobs.AddAsync(entity, cancellationToken);
await dbContext.SaveChangesAsync(cancellationToken);
var tenantId = HttpContext.GetTenantContext()?.TenantId ?? 0;
if (tenantId > 0)
{
entity.TenantId = tenantId;
}
var requestId = HttpContext.TraceIdentifier;
await dbContext.ProductSkuSaveJobs.AddAsync(entity, cancellationToken);
try
{
var hangfireJobId = backgroundJobClient.Enqueue<ProductSkuSaveJobRunner>(runner => runner.ExecuteAsync(entity.Id));
entity.HangfireJobId = hangfireJobId;
await publishEndpoint.Publish(
new ProductSkuSaveJobRequestedEvent
{
JobId = entity.Id,
TenantId = tenantId,
StoreId = entity.StoreId,
ProductId = entity.ProductId,
RequestId = string.IsNullOrWhiteSpace(requestId) ? Guid.NewGuid().ToString("N") : requestId
},
cancellationToken);
await dbContext.SaveChangesAsync(cancellationToken);
}
catch (Exception ex)

View File

@@ -1,5 +1,6 @@
using Asp.Versioning;
using Asp.Versioning.ApiExplorer;
using MassTransit;
using Microsoft.AspNetCore.Cors.Infrastructure;
using Microsoft.AspNetCore.Mvc;
using OpenTelemetry.Metrics;
@@ -15,8 +16,10 @@ using TakeoutSaaS.Application.Storage.Extensions;
using TakeoutSaaS.Infrastructure.App.Extensions;
using TakeoutSaaS.Infrastructure.Dictionary.Extensions;
using TakeoutSaaS.Infrastructure.Identity.Extensions;
using TakeoutSaaS.Infrastructure.App.Persistence;
using TakeoutSaaS.Module.Authorization.Extensions;
using TakeoutSaaS.Module.Messaging.Extensions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Scheduler.Extensions;
using TakeoutSaaS.Module.Storage.Extensions;
using TakeoutSaaS.Module.Tenancy.Extensions;
@@ -113,6 +116,32 @@ builder.Services.AddDictionaryInfrastructure(builder.Configuration);
// 9. 注册消息发布能力(未配置 RabbitMQ 时自动降级为 NoOp 实现)
builder.Services.AddMessagingApplication();
builder.Services.AddMessagingModule(builder.Configuration);
builder.Services.AddMassTransit(configurator =>
{
configurator.AddEntityFrameworkOutbox<TakeoutAppDbContext>(outbox =>
{
outbox.UsePostgres();
outbox.UseBusOutbox();
});
configurator.UsingRabbitMq((context, cfg) =>
{
var options = builder.Configuration.GetSection("RabbitMQ").Get<RabbitMqOptions>()
?? throw new InvalidOperationException("缺少 RabbitMQ 配置。");
var virtualHost = string.IsNullOrWhiteSpace(options.VirtualHost) ? "/" : options.VirtualHost.Trim();
var virtualHostPath = virtualHost == "/" ? "/" : $"/{virtualHost.TrimStart('/')}";
var hostUri = new Uri($"rabbitmq://{options.Host}:{options.Port}{virtualHostPath}");
cfg.Host(hostUri, host =>
{
host.Username(options.Username);
host.Password(options.Password);
});
cfg.PrefetchCount = options.PrefetchCount;
cfg.ConfigureEndpoints(context);
});
});
builder.Services.AddStorageModule(builder.Configuration);
builder.Services.AddStorageApplication();
builder.Services.AddSchedulerModule(builder.Configuration);

View File

@@ -63,6 +63,11 @@ public sealed class ProductSkuSaveJobRunner(
return;
}
if (job.Status == ProductSkuSaveJobStatus.Succeeded)
{
return;
}
job.Status = ProductSkuSaveJobStatus.Running;
job.StartedAt ??= DateTime.UtcNow;
job.FinishedAt = null;
@@ -71,6 +76,10 @@ public sealed class ProductSkuSaveJobRunner(
job.ProgressProcessed = 0;
await dbContext.SaveChangesAsync();
// 以租户+商品维度申请事务级咨询锁,保证同商品串行落库。
await dbContext.Database.ExecuteSqlInterpolatedAsync(
$"SELECT pg_advisory_xact_lock({job.TenantId}, {job.ProductId});");
var payload = DeserializePayload(job.PayloadJson);
if (payload.Skus.Count == 0)
{

View File

@@ -0,0 +1,37 @@
namespace TakeoutSaaS.Application.App.Products.Messaging;
/// <summary>
/// SKU replace 任务已创建事件(用于异步消费执行)。
/// </summary>
public sealed class ProductSkuSaveJobRequestedEvent
{
/// <summary>
/// 默认消费队列名称。
/// </summary>
public const string QueueName = "tenant.product-sku-replace.v1";
/// <summary>
/// 任务 ID。
/// </summary>
public long JobId { get; set; }
/// <summary>
/// 租户 ID。
/// </summary>
public long TenantId { get; set; }
/// <summary>
/// 门店 ID。
/// </summary>
public long StoreId { get; set; }
/// <summary>
/// 商品 ID。
/// </summary>
public long ProductId { get; set; }
/// <summary>
/// 请求 ID幂等辅助
/// </summary>
public string RequestId { get; set; } = string.Empty;
}

View File

@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using MassTransit;
using TakeoutSaaS.Domain.Analytics.Entities;
using TakeoutSaaS.Domain.Coupons.Entities;
using TakeoutSaaS.Domain.CustomerService.Entities;
@@ -539,6 +540,9 @@ public sealed class TakeoutAppDbContext(
ConfigureMetricDefinition(modelBuilder.Entity<MetricDefinition>());
ConfigureMetricSnapshot(modelBuilder.Entity<MetricSnapshot>());
ConfigureMetricAlertRule(modelBuilder.Entity<MetricAlertRule>());
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
// 3. 应用多租户全局查询过滤器
ApplyTenantQueryFilters(modelBuilder);

View File

@@ -0,0 +1,142 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace TakeoutSaaS.Infrastructure.Migrations
{
/// <inheritdoc />
public partial class AddAppOutboxForSkuMessaging : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "InboxState",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
MessageId = table.Column<Guid>(type: "uuid", nullable: false),
ConsumerId = table.Column<Guid>(type: "uuid", nullable: false),
LockId = table.Column<Guid>(type: "uuid", nullable: false),
RowVersion = table.Column<byte[]>(type: "bytea", rowVersion: true, nullable: true),
Received = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
ReceiveCount = table.Column<int>(type: "integer", nullable: false),
ExpirationTime = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
Consumed = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
Delivered = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
LastSequenceNumber = table.Column<long>(type: "bigint", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_InboxState", x => x.Id);
table.UniqueConstraint("AK_InboxState_MessageId_ConsumerId", x => new { x.MessageId, x.ConsumerId });
});
migrationBuilder.CreateTable(
name: "OutboxState",
columns: table => new
{
OutboxId = table.Column<Guid>(type: "uuid", nullable: false),
LockId = table.Column<Guid>(type: "uuid", nullable: false),
RowVersion = table.Column<byte[]>(type: "bytea", rowVersion: true, nullable: true),
Created = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
Delivered = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
LastSequenceNumber = table.Column<long>(type: "bigint", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_OutboxState", x => x.OutboxId);
});
migrationBuilder.CreateTable(
name: "OutboxMessage",
columns: table => new
{
SequenceNumber = table.Column<long>(type: "bigint", nullable: false)
.Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
EnqueueTime = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
SentTime = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
Headers = table.Column<string>(type: "text", nullable: true),
Properties = table.Column<string>(type: "text", nullable: true),
InboxMessageId = table.Column<Guid>(type: "uuid", nullable: true),
InboxConsumerId = table.Column<Guid>(type: "uuid", nullable: true),
OutboxId = table.Column<Guid>(type: "uuid", nullable: true),
MessageId = table.Column<Guid>(type: "uuid", nullable: false),
ContentType = table.Column<string>(type: "character varying(256)", maxLength: 256, nullable: false),
MessageType = table.Column<string>(type: "text", nullable: false),
Body = table.Column<string>(type: "text", nullable: false),
ConversationId = table.Column<Guid>(type: "uuid", nullable: true),
CorrelationId = table.Column<Guid>(type: "uuid", nullable: true),
InitiatorId = table.Column<Guid>(type: "uuid", nullable: true),
RequestId = table.Column<Guid>(type: "uuid", nullable: true),
SourceAddress = table.Column<string>(type: "character varying(256)", maxLength: 256, nullable: true),
DestinationAddress = table.Column<string>(type: "character varying(256)", maxLength: 256, nullable: true),
ResponseAddress = table.Column<string>(type: "character varying(256)", maxLength: 256, nullable: true),
FaultAddress = table.Column<string>(type: "character varying(256)", maxLength: 256, nullable: true),
ExpirationTime = table.Column<DateTime>(type: "timestamp with time zone", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_OutboxMessage", x => x.SequenceNumber);
table.ForeignKey(
name: "FK_OutboxMessage_InboxState_InboxMessageId_InboxConsumerId",
columns: x => new { x.InboxMessageId, x.InboxConsumerId },
principalTable: "InboxState",
principalColumns: new[] { "MessageId", "ConsumerId" });
table.ForeignKey(
name: "FK_OutboxMessage_OutboxState_OutboxId",
column: x => x.OutboxId,
principalTable: "OutboxState",
principalColumn: "OutboxId");
});
migrationBuilder.CreateIndex(
name: "IX_InboxState_Delivered",
table: "InboxState",
column: "Delivered");
migrationBuilder.CreateIndex(
name: "IX_OutboxMessage_EnqueueTime",
table: "OutboxMessage",
column: "EnqueueTime");
migrationBuilder.CreateIndex(
name: "IX_OutboxMessage_ExpirationTime",
table: "OutboxMessage",
column: "ExpirationTime");
migrationBuilder.CreateIndex(
name: "IX_OutboxMessage_InboxMessageId_InboxConsumerId_SequenceNumber",
table: "OutboxMessage",
columns: new[] { "InboxMessageId", "InboxConsumerId", "SequenceNumber" },
unique: true);
migrationBuilder.CreateIndex(
name: "IX_OutboxMessage_OutboxId_SequenceNumber",
table: "OutboxMessage",
columns: new[] { "OutboxId", "SequenceNumber" },
unique: true);
migrationBuilder.CreateIndex(
name: "IX_OutboxState_Created",
table: "OutboxState",
column: "Created");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "OutboxMessage");
migrationBuilder.DropTable(
name: "InboxState");
migrationBuilder.DropTable(
name: "OutboxState");
}
}
}

View File

@@ -22,6 +22,174 @@ namespace TakeoutSaaS.Infrastructure.Migrations
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("MassTransit.EntityFrameworkCoreIntegration.InboxState", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<DateTime?>("Consumed")
.HasColumnType("timestamp with time zone");
b.Property<Guid>("ConsumerId")
.HasColumnType("uuid");
b.Property<DateTime?>("Delivered")
.HasColumnType("timestamp with time zone");
b.Property<DateTime?>("ExpirationTime")
.HasColumnType("timestamp with time zone");
b.Property<long?>("LastSequenceNumber")
.HasColumnType("bigint");
b.Property<Guid>("LockId")
.HasColumnType("uuid");
b.Property<Guid>("MessageId")
.HasColumnType("uuid");
b.Property<int>("ReceiveCount")
.HasColumnType("integer");
b.Property<DateTime>("Received")
.HasColumnType("timestamp with time zone");
b.Property<byte[]>("RowVersion")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("bytea");
b.HasKey("Id");
b.HasIndex("Delivered");
b.ToTable("InboxState");
});
modelBuilder.Entity("MassTransit.EntityFrameworkCoreIntegration.OutboxMessage", b =>
{
b.Property<long>("SequenceNumber")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("SequenceNumber"));
b.Property<string>("Body")
.IsRequired()
.HasColumnType("text");
b.Property<string>("ContentType")
.IsRequired()
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<Guid?>("ConversationId")
.HasColumnType("uuid");
b.Property<Guid?>("CorrelationId")
.HasColumnType("uuid");
b.Property<string>("DestinationAddress")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<DateTime?>("EnqueueTime")
.HasColumnType("timestamp with time zone");
b.Property<DateTime?>("ExpirationTime")
.HasColumnType("timestamp with time zone");
b.Property<string>("FaultAddress")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<string>("Headers")
.HasColumnType("text");
b.Property<Guid?>("InboxConsumerId")
.HasColumnType("uuid");
b.Property<Guid?>("InboxMessageId")
.HasColumnType("uuid");
b.Property<Guid?>("InitiatorId")
.HasColumnType("uuid");
b.Property<Guid>("MessageId")
.HasColumnType("uuid");
b.Property<string>("MessageType")
.IsRequired()
.HasColumnType("text");
b.Property<Guid?>("OutboxId")
.HasColumnType("uuid");
b.Property<string>("Properties")
.HasColumnType("text");
b.Property<Guid?>("RequestId")
.HasColumnType("uuid");
b.Property<string>("ResponseAddress")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<DateTime>("SentTime")
.HasColumnType("timestamp with time zone");
b.Property<string>("SourceAddress")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.HasKey("SequenceNumber");
b.HasIndex("EnqueueTime");
b.HasIndex("ExpirationTime");
b.HasIndex("OutboxId", "SequenceNumber")
.IsUnique();
b.HasIndex("InboxMessageId", "InboxConsumerId", "SequenceNumber")
.IsUnique();
b.ToTable("OutboxMessage");
});
modelBuilder.Entity("MassTransit.EntityFrameworkCoreIntegration.OutboxState", b =>
{
b.Property<Guid>("OutboxId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<DateTime>("Created")
.HasColumnType("timestamp with time zone");
b.Property<DateTime?>("Delivered")
.HasColumnType("timestamp with time zone");
b.Property<long?>("LastSequenceNumber")
.HasColumnType("bigint");
b.Property<Guid>("LockId")
.HasColumnType("uuid");
b.Property<byte[]>("RowVersion")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("bytea");
b.HasKey("OutboxId");
b.HasIndex("Created");
b.ToTable("OutboxState");
});
modelBuilder.Entity("TakeoutSaaS.Domain.Analytics.Entities.MetricAlertRule", b =>
{
b.Property<long>("Id")
@@ -8667,6 +8835,18 @@ namespace TakeoutSaaS.Infrastructure.Migrations
});
});
modelBuilder.Entity("MassTransit.EntityFrameworkCoreIntegration.OutboxMessage", b =>
{
b.HasOne("MassTransit.EntityFrameworkCoreIntegration.OutboxState", null)
.WithMany()
.HasForeignKey("OutboxId");
b.HasOne("MassTransit.EntityFrameworkCoreIntegration.InboxState", null)
.WithMany()
.HasForeignKey("InboxMessageId", "InboxConsumerId")
.HasPrincipalKey("MessageId", "ConsumerId");
});
modelBuilder.Entity("TakeoutSaaS.Domain.Orders.Entities.OrderItem", b =>
{
b.HasOne("TakeoutSaaS.Domain.Orders.Entities.Order", null)

View File

@@ -0,0 +1,44 @@
using MassTransit;
using Microsoft.EntityFrameworkCore;
using TakeoutSaaS.Application.App.Products.Messaging;
using TakeoutSaaS.Domain.Products.Enums;
using TakeoutSaaS.Infrastructure.App.Persistence;
using TakeoutSaaS.TenantApi.Services;
namespace TakeoutSaaS.SkuWorker.Consumers;
/// <summary>
/// SKU 保存任务消费器。
/// </summary>
public sealed class ProductSkuSaveJobRequestedConsumer(
ProductSkuSaveJobRunner jobRunner,
TakeoutAppDbContext dbContext,
ILogger<ProductSkuSaveJobRequestedConsumer> logger) : IConsumer<ProductSkuSaveJobRequestedEvent>
{
/// <inheritdoc />
public async Task Consume(ConsumeContext<ProductSkuSaveJobRequestedEvent> context)
{
var payload = context.Message;
logger.LogInformation(
"开始消费 SKU 保存任务JobId={JobId}, TenantId={TenantId}, StoreId={StoreId}, ProductId={ProductId}, RetryAttempt={RetryAttempt}",
payload.JobId,
payload.TenantId,
payload.StoreId,
payload.ProductId,
context.GetRetryAttempt());
await jobRunner.ExecuteAsync(payload.JobId);
var jobStatus = await dbContext.ProductSkuSaveJobs
.IgnoreQueryFilters()
.AsNoTracking()
.Where(item => item.Id == payload.JobId)
.Select(item => item.Status)
.SingleOrDefaultAsync(context.CancellationToken);
if (jobStatus == ProductSkuSaveJobStatus.Failed)
{
throw new InvalidOperationException($"SKU 保存任务执行失败JobId={payload.JobId}");
}
}
}

View File

@@ -0,0 +1,62 @@
using MassTransit;
using Microsoft.Extensions.Options;
using TakeoutSaaS.Infrastructure.App.Persistence;
using TakeoutSaaS.Infrastructure.Common.Extensions;
using TakeoutSaaS.Module.Messaging.Options;
using TakeoutSaaS.Module.Tenancy.Extensions;
using TakeoutSaaS.Shared.Abstractions.Constants;
using TakeoutSaaS.TenantApi.Services;
using TakeoutSaaS.SkuWorker.Consumers;
using TakeoutSaaS.Application.App.Products.Messaging;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddDatabaseInfrastructure(builder.Configuration);
builder.Services.AddPostgresDbContext<TakeoutAppDbContext>(DatabaseConstants.AppDataSource);
builder.Services.AddTenantResolution(builder.Configuration);
builder.Services.AddScoped<ProductSkuSaveService>();
builder.Services.AddScoped<ProductSkuSaveJobRunner>();
builder.Services.AddOptions<RabbitMqOptions>()
.Bind(builder.Configuration.GetSection("RabbitMQ"))
.ValidateDataAnnotations()
.ValidateOnStart();
builder.Services.AddMassTransit(configurator =>
{
configurator.AddConsumer<ProductSkuSaveJobRequestedConsumer>(consumerConfigurator =>
{
consumerConfigurator.UseMessageRetry(retry =>
{
retry.Exponential(
retryLimit: 5,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromSeconds(30),
intervalDelta: TimeSpan.FromSeconds(2));
});
});
configurator.UsingRabbitMq((context, cfg) =>
{
var options = context.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
var virtualHost = string.IsNullOrWhiteSpace(options.VirtualHost) ? "/" : options.VirtualHost.Trim();
var virtualHostPath = virtualHost == "/" ? "/" : $"/{virtualHost.TrimStart('/')}";
var hostUri = new Uri($"rabbitmq://{options.Host}:{options.Port}{virtualHostPath}");
cfg.Host(hostUri, host =>
{
host.Username(options.Username);
host.Password(options.Password);
});
cfg.PrefetchCount = options.PrefetchCount;
cfg.ReceiveEndpoint(ProductSkuSaveJobRequestedEvent.QueueName, endpoint =>
{
endpoint.PrefetchCount = options.PrefetchCount;
endpoint.ConfigureConsumer<ProductSkuSaveJobRequestedConsumer>(context);
});
});
});
var app = builder.Build();
await app.RunAsync();

View File

@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.5.7" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.5.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Api\TakeoutSaaS.TenantApi\TakeoutSaaS.TenantApi.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,30 @@
{
"Database": {
"DataSources": {
"AppDatabase": {
"Write": "Host=localhost;Port=5432;Database=takeout_app_db;Username=app_user;Password=AppUser112233",
"Reads": [
"Host=localhost;Port=5432;Database=takeout_app_db;Username=app_user;Password=AppUser112233"
],
"CommandTimeoutSeconds": 30,
"MaxRetryCount": 3,
"MaxRetryDelaySeconds": 5
}
}
},
"Tenancy": {
"TenantIdHeaderName": "X-Tenant-Id",
"TenantCodeHeaderName": "X-Tenant-Code",
"RootDomain": "tenant.laosankeji.com"
},
"RabbitMQ": {
"Host": "localhost",
"Port": 5672,
"Username": "guest",
"Password": "guest",
"VirtualHost": "/",
"Exchange": "takeout.events",
"ExchangeType": "topic",
"PrefetchCount": 20
}
}