using MassTransit;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using TakeoutSaaS.Application.Identity.Events;
using TakeoutSaaS.Domain.Tenants.Entities;
using TakeoutSaaS.Infrastructure.Logs.Persistence;
namespace TakeoutSaaS.Infrastructure.Logs.Consumers;
///
/// 身份用户操作日志消费者。
///
public sealed class IdentityUserOperationLogConsumer(TakeoutLogsDbContext logsContext) : IConsumer
{
///
public async Task Consume(ConsumeContext context)
{
// 1. 校验消息标识并进行幂等检查
var messageId = context.MessageId;
if (!messageId.HasValue)
{
throw new InvalidOperationException("缺少 MessageId,无法进行日志幂等处理。");
}
var exists = await logsContext.OperationLogInboxMessages
.AsNoTracking()
.AnyAsync(x => x.MessageId == messageId.Value, context.CancellationToken);
if (exists)
{
return;
}
// 2. 构建日志实体与去重记录
var message = context.Message;
var log = new OperationLog
{
OperationType = message.OperationType,
TargetType = message.TargetType,
TargetIds = message.TargetIds,
OperatorId = message.OperatorId,
OperatorName = message.OperatorName,
Parameters = message.Parameters,
Result = message.Result,
Success = message.Success
};
logsContext.OperationLogInboxMessages.Add(new OperationLogInboxMessage
{
MessageId = messageId.Value,
ConsumedAt = DateTime.UtcNow
});
logsContext.OperationLogs.Add(log);
// 3. 保存并处理并发去重冲突
try
{
await logsContext.SaveChangesAsync(context.CancellationToken);
}
catch (DbUpdateException ex) when (IsDuplicateMessage(ex))
{
return;
}
}
private static bool IsDuplicateMessage(DbUpdateException exception)
{
if (exception.InnerException is PostgresException postgresException)
{
return postgresException.SqlState == PostgresErrorCodes.UniqueViolation;
}
return false;
}
}