feat: 身份操作日志改造为Outbox并修正日志库连接
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
using MassTransit;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Npgsql;
|
||||
using TakeoutSaaS.Application.Identity.Events;
|
||||
using TakeoutSaaS.Domain.Tenants.Entities;
|
||||
using TakeoutSaaS.Infrastructure.Logs.Persistence;
|
||||
|
||||
namespace TakeoutSaaS.Infrastructure.Logs.Consumers;
|
||||
|
||||
/// <summary>
|
||||
/// 身份用户操作日志消费者。
|
||||
/// </summary>
|
||||
public sealed class IdentityUserOperationLogConsumer(TakeoutLogsDbContext logsContext) : IConsumer<IdentityUserOperationLogMessage>
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public async Task Consume(ConsumeContext<IdentityUserOperationLogMessage> context)
|
||||
{
|
||||
// 1. 校验消息标识并进行幂等检查
|
||||
var messageId = context.MessageId;
|
||||
if (!messageId.HasValue)
|
||||
{
|
||||
throw new InvalidOperationException("缺少 MessageId,无法进行日志幂等处理。");
|
||||
}
|
||||
|
||||
var exists = await logsContext.OperationLogInboxMessages
|
||||
.AsNoTracking()
|
||||
.AnyAsync(x => x.MessageId == messageId.Value, context.CancellationToken);
|
||||
if (exists)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. (空行后) 构建日志实体与去重记录
|
||||
var message = context.Message;
|
||||
var log = new OperationLog
|
||||
{
|
||||
OperationType = message.OperationType,
|
||||
TargetType = message.TargetType,
|
||||
TargetIds = message.TargetIds,
|
||||
OperatorId = message.OperatorId,
|
||||
OperatorName = message.OperatorName,
|
||||
Parameters = message.Parameters,
|
||||
Result = message.Result,
|
||||
Success = message.Success
|
||||
};
|
||||
logsContext.OperationLogInboxMessages.Add(new OperationLogInboxMessage
|
||||
{
|
||||
MessageId = messageId.Value,
|
||||
ConsumedAt = DateTime.UtcNow
|
||||
});
|
||||
logsContext.OperationLogs.Add(log);
|
||||
|
||||
// 3. (空行后) 保存并处理并发去重冲突
|
||||
try
|
||||
{
|
||||
await logsContext.SaveChangesAsync(context.CancellationToken);
|
||||
}
|
||||
catch (DbUpdateException ex) when (IsDuplicateMessage(ex))
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsDuplicateMessage(DbUpdateException exception)
|
||||
{
|
||||
if (exception.InnerException is PostgresException postgresException)
|
||||
{
|
||||
return postgresException.SqlState == PostgresErrorCodes.UniqueViolation;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user