feat: 实现字典管理后端
This commit is contained in:
@@ -0,0 +1,229 @@
|
||||
using System.Diagnostics;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StackExchange.Redis;
|
||||
using TakeoutSaaS.Application.Dictionary.Abstractions;
|
||||
using TakeoutSaaS.Domain.Dictionary.Entities;
|
||||
using TakeoutSaaS.Domain.Dictionary.Enums;
|
||||
using TakeoutSaaS.Domain.Dictionary.Repositories;
|
||||
using TakeoutSaaS.Shared.Abstractions.Security;
|
||||
|
||||
namespace TakeoutSaaS.Infrastructure.Dictionary.Caching;
|
||||
|
||||
/// <summary>
|
||||
/// 两级缓存封装:L1 内存 + L2 Redis。
|
||||
/// </summary>
|
||||
public sealed class HybridCacheService : IDictionaryHybridCache
|
||||
{
|
||||
private static readonly RedisChannel InvalidationChannel = RedisChannel.Literal("dictionary:cache:invalidate");
|
||||
|
||||
private readonly MemoryCacheService _memoryCache;
|
||||
private readonly RedisCacheService _redisCache;
|
||||
private readonly ISubscriber? _subscriber;
|
||||
private readonly ILogger<HybridCacheService>? _logger;
|
||||
private readonly CacheMetricsCollector? _metrics;
|
||||
private readonly IServiceScopeFactory? _scopeFactory;
|
||||
|
||||
/// <summary>
|
||||
/// 初始化两级缓存服务。
|
||||
/// </summary>
|
||||
public HybridCacheService(
|
||||
MemoryCacheService memoryCache,
|
||||
RedisCacheService redisCache,
|
||||
IConnectionMultiplexer? multiplexer = null,
|
||||
ILogger<HybridCacheService>? logger = null,
|
||||
CacheMetricsCollector? metrics = null,
|
||||
IServiceScopeFactory? scopeFactory = null)
|
||||
{
|
||||
_memoryCache = memoryCache;
|
||||
_redisCache = redisCache;
|
||||
_logger = logger;
|
||||
_subscriber = multiplexer?.GetSubscriber();
|
||||
_metrics = metrics;
|
||||
_scopeFactory = scopeFactory;
|
||||
|
||||
if (_subscriber != null)
|
||||
{
|
||||
_subscriber.Subscribe(InvalidationChannel, (_, value) =>
|
||||
{
|
||||
var prefix = value.ToString();
|
||||
if (!string.IsNullOrWhiteSpace(prefix))
|
||||
{
|
||||
_memoryCache.RemoveByPrefix(prefix);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取缓存,如果不存在则创建并回填。
|
||||
/// </summary>
|
||||
public async Task<T?> GetOrCreateAsync<T>(
|
||||
string key,
|
||||
TimeSpan ttl,
|
||||
Func<CancellationToken, Task<T?>> factory,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var dictionaryCode = CacheMetricsCollector.ExtractDictionaryCode(key);
|
||||
var l1Hit = false;
|
||||
var l2Hit = false;
|
||||
|
||||
var cached = await _memoryCache.GetAsync<T>(key, cancellationToken);
|
||||
if (cached != null)
|
||||
{
|
||||
l1Hit = true;
|
||||
_metrics?.RecordHit("L1", dictionaryCode);
|
||||
_metrics?.RecordDuration(dictionaryCode, stopwatch.Elapsed.TotalMilliseconds);
|
||||
_metrics?.RecordQuery(dictionaryCode, l1Hit, l2Hit, stopwatch.Elapsed.TotalMilliseconds);
|
||||
return cached;
|
||||
}
|
||||
|
||||
_metrics?.RecordMiss("L1", dictionaryCode);
|
||||
|
||||
try
|
||||
{
|
||||
cached = await _redisCache.GetAsync<T>(key, cancellationToken);
|
||||
if (cached != null)
|
||||
{
|
||||
l2Hit = true;
|
||||
_metrics?.RecordHit("L2", dictionaryCode);
|
||||
await _memoryCache.SetAsync(key, cached, ttl, cancellationToken);
|
||||
_metrics?.RecordDuration(dictionaryCode, stopwatch.Elapsed.TotalMilliseconds);
|
||||
_metrics?.RecordQuery(dictionaryCode, l1Hit, l2Hit, stopwatch.Elapsed.TotalMilliseconds);
|
||||
return cached;
|
||||
}
|
||||
|
||||
_metrics?.RecordMiss("L2", dictionaryCode);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_metrics?.RecordMiss("L2", dictionaryCode);
|
||||
_logger?.LogWarning(ex, "读取 Redis 缓存失败,降级为数据库查询。");
|
||||
}
|
||||
|
||||
var created = await factory(cancellationToken);
|
||||
if (created == null)
|
||||
{
|
||||
_metrics?.RecordDuration(dictionaryCode, stopwatch.Elapsed.TotalMilliseconds);
|
||||
_metrics?.RecordQuery(dictionaryCode, l1Hit, l2Hit, stopwatch.Elapsed.TotalMilliseconds);
|
||||
return default;
|
||||
}
|
||||
|
||||
await _memoryCache.SetAsync(key, created, ttl, cancellationToken);
|
||||
try
|
||||
{
|
||||
await _redisCache.SetAsync(key, created, ttl, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "写入 Redis 缓存失败。");
|
||||
}
|
||||
|
||||
_metrics?.RecordDuration(dictionaryCode, stopwatch.Elapsed.TotalMilliseconds);
|
||||
_metrics?.RecordQuery(dictionaryCode, l1Hit, l2Hit, stopwatch.Elapsed.TotalMilliseconds);
|
||||
return created;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 失效指定前缀的缓存键。
|
||||
/// </summary>
|
||||
public async Task InvalidateAsync(
|
||||
string prefix,
|
||||
CacheInvalidationOperation operation = CacheInvalidationOperation.Update,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var dictionaryCode = CacheMetricsCollector.ExtractDictionaryCode(prefix);
|
||||
_metrics?.RecordInvalidation(dictionaryCode);
|
||||
|
||||
var removedCount = _memoryCache.RemoveByPrefixWithCount(prefix);
|
||||
long redisRemoved = 0;
|
||||
try
|
||||
{
|
||||
redisRemoved = await _redisCache.RemoveByPrefixWithCountAsync(prefix, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "删除 Redis 缓存失败。");
|
||||
}
|
||||
|
||||
var totalRemoved = removedCount + (int)Math.Min(redisRemoved, int.MaxValue);
|
||||
|
||||
if (_subscriber != null && !string.IsNullOrWhiteSpace(prefix))
|
||||
{
|
||||
await _subscriber.PublishAsync(InvalidationChannel, prefix);
|
||||
}
|
||||
|
||||
_ = WriteInvalidationLogAsync(prefix, dictionaryCode, totalRemoved, operation);
|
||||
}
|
||||
|
||||
private async Task WriteInvalidationLogAsync(
|
||||
string prefix,
|
||||
string dictionaryCode,
|
||||
int removedCount,
|
||||
CacheInvalidationOperation operation)
|
||||
{
|
||||
if (_scopeFactory == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var repo = scope.ServiceProvider.GetService<ICacheInvalidationLogRepository>();
|
||||
if (repo == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var currentUser = scope.ServiceProvider.GetService<ICurrentUserAccessor>();
|
||||
var tenantId = TryExtractTenantId(prefix) ?? 0;
|
||||
var scopeType = tenantId == 0 ? DictionaryScope.System : DictionaryScope.Business;
|
||||
|
||||
var log = new CacheInvalidationLog
|
||||
{
|
||||
TenantId = tenantId,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
DictionaryCode = dictionaryCode,
|
||||
Scope = scopeType,
|
||||
AffectedCacheKeyCount = removedCount,
|
||||
OperatorId = currentUser?.IsAuthenticated == true ? currentUser.UserId : 0,
|
||||
Operation = operation
|
||||
};
|
||||
|
||||
await repo.AddAsync(log);
|
||||
await repo.SaveChangesAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "写入缓存失效日志失败。");
|
||||
}
|
||||
}
|
||||
|
||||
private static long? TryExtractTenantId(string prefix)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(prefix))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (prefix.StartsWith("dict:groups:", StringComparison.Ordinal))
|
||||
{
|
||||
var token = prefix.Replace("dict:groups:", string.Empty, StringComparison.Ordinal).Trim(':');
|
||||
return long.TryParse(token.Split(':', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault(), out var tenantId)
|
||||
? tenantId
|
||||
: null;
|
||||
}
|
||||
|
||||
if (prefix.StartsWith("dict:", StringComparison.Ordinal) && !prefix.StartsWith("dict:items:", StringComparison.Ordinal))
|
||||
{
|
||||
var token = prefix.Replace("dict:", string.Empty, StringComparison.Ordinal);
|
||||
return long.TryParse(token.Split(':', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault(), out var tenantId)
|
||||
? tenantId
|
||||
: null;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user