Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HybridCache: enforce L2 expiration #5987

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ internal async Task<long> SafeReadTagInvalidationAsync(string tag)
}
}

internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions? options)
internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions? options, TimeSpan maxRelativeTime)
{
// incr ref-count for the the cache itself; this *may* be released via the NeedsEvictionCallback path
if (value.TryReserve())
Expand All @@ -190,7 +190,15 @@ internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions?
// that actually commits the add - so: if we fault, we don't want to try
// committing a partially configured cache entry
ICacheEntry cacheEntry = _localCache.CreateEntry(key);
cacheEntry.AbsoluteExpirationRelativeToNow = GetL1AbsoluteExpirationRelativeToNow(options);

var expiry = GetL1AbsoluteExpirationRelativeToNow(options);
if (expiry > maxRelativeTime)
{
// enforce "not exceeding the remaining overall cache lifetime" that we got from L2
expiry = maxRelativeTime;
}

cacheEntry.AbsoluteExpirationRelativeToNow = expiry;
cacheEntry.Value = value;

if (value.TryGetSize(out long size))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private async Task BackgroundFetchAsync()
{
// result is the wider payload including HC headers; unwrap it:
HybridCachePayload.HybridCachePayloadParseResult parseResult = HybridCachePayload.TryParse(
result.AsArraySegment(), Key.Key, CacheItem.Tags, Cache, out ArraySegment<byte> payload,
result.AsArraySegment(), Key.Key, CacheItem.Tags, Cache, out ArraySegment<byte> payload, out TimeSpan remainingTime,
out HybridCachePayload.PayloadFlags flags, out ushort entropy, out TagSet pendingTags, out Exception? fault);
switch (parseResult)
{
Expand All @@ -240,7 +240,7 @@ private async Task BackgroundFetchAsync()
{
// move into the payload segment (minus any framing/header/etc data)
result = new(payload.Array!, payload.Offset, payload.Count, result.ReturnToPool);
SetResultAndRecycleIfAppropriate(ref result);
SetResultAndRecycleIfAppropriate(ref result, remainingTime);
return;
}

Expand Down Expand Up @@ -441,7 +441,7 @@ private void SetDefaultResult()
}
}

private void SetResultAndRecycleIfAppropriate(ref BufferChunk value)
private void SetResultAndRecycleIfAppropriate(ref BufferChunk value, TimeSpan remainingTime)
{
// set a result from L2 cache
Debug.Assert(value.OversizedArray is not null, "expected buffer");
Expand All @@ -467,7 +467,7 @@ private void SetResultAndRecycleIfAppropriate(ref BufferChunk value)
break;
}

SetResult(cacheItem);
SetResult(cacheItem, remainingTime);
}

private void SetImmutableResultWithoutSerialize(T value)
Expand Down Expand Up @@ -527,11 +527,13 @@ private void SetResultPreSerialized(T value, ref BufferChunk buffer, IHybridCach
SetResult(cacheItem);
}

private void SetResult(CacheItem<T> value)
private void SetResult(CacheItem<T> value) => SetResult(value, TimeSpan.MaxValue);

private void SetResult(CacheItem<T> value, TimeSpan maxRelativeTime)
{
if ((Key.Flags & HybridCacheEntryFlags.DisableLocalCacheWrite) == 0)
{
Cache.SetL1(Key.Key, value, _options); // we can do this without a TCS, for SetValue
Cache.SetL1(Key.Key, value, _options, maxRelativeTime); // we can do this without a TCS, for SetValue
}

if (_result is not null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public ValueTask<bool> IsTagExpiredAsync(string tag, long timestamp)
static async ValueTask<bool> AwaitedAsync(Task<long> pending, long timestamp) => timestamp <= await pending.ConfigureAwait(false);
}

internal static TimeSpan TicksToTimeSpan(long ticks) => TimeSpan.FromTicks(ticks);

internal long CurrentTimestamp() => _clock.GetUtcNow().UtcTicks;

internal void DebugInvalidateTag(string tag, Task<long> pending)
{
if (tag == TagSet.WildcardTag)
Expand All @@ -206,8 +210,6 @@ internal void DebugInvalidateTag(string tag, Task<long> pending)
}
}

internal long CurrentTimestamp() => _clock.GetUtcNow().UtcTicks;

internal void PrefetchTags(TagSet tags)
{
if (HasBackendCache && !tags.IsEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public override ValueTask SetAsync<T>(string key, T value, HybridCacheEntryOptio
}

// exposed as internal for testability
internal TimeSpan GetL1AbsoluteExpirationRelativeToNow(HybridCacheEntryOptions? options) => GetEffectiveLocalCacheExpiration(options) ?? _defaultLocalCacheExpiration;
internal TimeSpan GetL1AbsoluteExpirationRelativeToNow(HybridCacheEntryOptions? options)
=> GetEffectiveLocalCacheExpiration(options) ?? _defaultLocalCacheExpiration;

internal TimeSpan GetL2AbsoluteExpirationRelativeToNow(HybridCacheEntryOptions? options) => options?.Expiration ?? _defaultExpiration;

Expand All @@ -235,7 +236,23 @@ private static ValueTask<T> RunWithoutCacheAsync<TState, T>(HybridCacheEntryFlag
// precedence between the inheritance between per-entry options and global options, and if a caller
// provides a per-entry option with *just* the Expiration specified, then that is assumed to also
// specify the LocalCacheExpiration.
return options is not null ? options.LocalCacheExpiration ?? options.Expiration : null;
if (options is not null)
{
if (options.LocalCacheExpiration is { } local)
{
if (options.Expiration is { } overall)
{
// enforce "not exceeding the remaining overall cache lifetime"
return local < overall ? local : overall;
}

return local;
}

return options.Expiration;
}

return null;
}

private bool ValidateKey(string key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ static void WriteString(byte[] target, ref int offset, string value)
[System.Diagnostics.CodeAnalysis.SuppressMessage("Major Code Smell", "S107:Methods should not have too many parameters", Justification = "Borderline")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Exposed for logging")]
public static HybridCachePayloadParseResult TryParse(ArraySegment<byte> source, string key, TagSet knownTags, DefaultHybridCache cache,
out ArraySegment<byte> payload, out PayloadFlags flags, out ushort entropy, out TagSet pendingTags, out Exception? fault)
out ArraySegment<byte> payload, out TimeSpan remainingTime, out PayloadFlags flags, out ushort entropy, out TagSet pendingTags, out Exception? fault)
{
fault = null;

// note "cache" is used primarily for expiration checks; we don't automatically add etc
entropy = 0;
payload = default;
flags = 0;
remainingTime = TimeSpan.Zero;
string[] pendingTagBuffer = [];
int pendingTagsCount = 0;

Expand Down Expand Up @@ -229,11 +230,14 @@ public static HybridCachePayloadParseResult TryParse(ArraySegment<byte> source,
return HybridCachePayloadParseResult.InvalidData;
}

if ((creationTime + (long)duration) <= now)
var remainingTicks = (creationTime + (long)duration) - now;
if (remainingTicks <= 0)
{
return HybridCachePayloadParseResult.ExpiredByEntry;
}

remainingTime = DefaultHybridCache.TicksToTimeSpan(remainingTicks);

if (!TryRead7BitEncodedInt64(ref bytes, out u64) || u64 > int.MaxValue) // tag count
{
return HybridCachePayloadParseResult.InvalidData;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Xunit.Abstractions;
using static Microsoft.Extensions.Caching.Hybrid.Tests.DistributedCacheTests;
using static Microsoft.Extensions.Caching.Hybrid.Tests.L2Tests;

namespace Microsoft.Extensions.Caching.Hybrid.Tests;
public class ExpirationTests(ITestOutputHelper log)
{
[Fact]
public async Task ExpirationRespected()
{
// we want set up separate cache instances with a shared L2 to show relative expiration
// being respected
var clock = new FakeTime();
using var l1 = new MemoryCache(new MemoryCacheOptions { Clock = clock });
var l2 = new LoggingCache(log, new MemoryDistributedCache(Options.Create(new MemoryDistributedCacheOptions { Clock = clock })));

Guid guid0;
string key = nameof(ExpirationRespected);
Func<CancellationToken, ValueTask<Guid>> callback = static _ => new(Guid.NewGuid());
HybridCacheEntryOptions options = new() { Expiration = TimeSpan.FromMinutes(2), LocalCacheExpiration = TimeSpan.FromMinutes(1) };

ServiceCollection services = new();
services.AddSingleton<ISystemClock>(clock);
services.AddSingleton<TimeProvider>(clock);
services.AddSingleton<IMemoryCache>(l1);
services.AddSingleton<IDistributedCache>(l2);
services.AddHybridCache();
using var provider = services.BuildServiceProvider();
var cache = provider.GetRequiredService<HybridCache>();

guid0 = await cache.GetOrCreateAsync(key, callback, options);

// should be fine immediately
Assert.Equal(guid0, await cache.GetOrCreateAsync(key, callback, options));

clock.Add(TimeSpan.FromSeconds(45)); // should still be fine from L1 (L1 has 1 minute expiration)
Assert.Equal(guid0, await cache.GetOrCreateAsync(key, callback, options));

clock.Add(TimeSpan.FromSeconds(45)); // should still be fine from L2 (L1 now expired, but fetches from L2 and detects 0:30 remaining, which limits L1 to 0:30)
Assert.Equal(guid0, await cache.GetOrCreateAsync(key, callback, options));

clock.Add(TimeSpan.FromSeconds(45)); // should now be expired
Assert.NotEqual(guid0, await cache.GetOrCreateAsync(key, callback, options));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ Task IDistributedCache.RemoveAsync(string key, CancellationToken token)
void IDistributedCache.Set(string key, byte[] value, DistributedCacheEntryOptions options)
{
Interlocked.Increment(ref ProtectedOpCount);
Log.WriteLine($"Set (byte[]): {key}");
Log.WriteLine($"Set (byte[]): {key} (expiry: {options.AbsoluteExpirationRelativeToNow})");
Tail.Set(key, value, options);
}

Task IDistributedCache.SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token)
{
Interlocked.Increment(ref ProtectedOpCount);
Log.WriteLine($"SetAsync (byte[]): {key}");
Log.WriteLine($"SetAsync (byte[]): {key} (expiry: {options.AbsoluteExpirationRelativeToNow})");
return Tail.SetAsync(key, value, options, token);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void RoundTrip_Success(string delimitedTags, int expectedLength, int tagC
Assert.Equal(expectedLength, actualLength);

clock.Add(TimeSpan.FromSeconds(10));
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
log.WriteLine($"Entropy: {entropy}; Flags: {flags}");
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.Success, result);
Assert.True(payload.SequenceEqual(bytes));
Expand Down Expand Up @@ -82,13 +82,13 @@ public void RoundTrip_SelfExpiration()
Assert.Equal(1063, actualLength);

clock.Add(TimeSpan.FromSeconds(58));
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.Success, result);
Assert.True(payload.SequenceEqual(bytes));
Assert.True(pendingTags.IsEmpty);

clock.Add(TimeSpan.FromSeconds(4));
result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out payload, out flags, out entropy, out pendingTags, out _);
result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out payload, out remaining, out flags, out entropy, out pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.ExpiredByEntry, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand Down Expand Up @@ -118,7 +118,7 @@ public async Task RoundTrip_WildcardExpiration()
clock.Add(TimeSpan.FromSeconds(2));
await cache.RemoveByTagAsync("*");

var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.ExpiredByWildcard, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand Down Expand Up @@ -148,13 +148,13 @@ public async Task RoundTrip_TagExpiration()
clock.Add(TimeSpan.FromSeconds(2));
await cache.RemoveByTagAsync("other_tag");

var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.Success, result);
Assert.True(payload.SequenceEqual(bytes));
Assert.True(pendingTags.IsEmpty);

await cache.RemoveByTagAsync("some_tag");
result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out payload, out flags, out entropy, out pendingTags, out _);
result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out payload, out remaining, out flags, out entropy, out pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.ExpiredByTag, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand Down Expand Up @@ -186,7 +186,7 @@ public async Task RoundTrip_TagExpiration_Pending()

var tcs = new TaskCompletionSource<long>();
cache.DebugInvalidateTag("some_tag", tcs.Task);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.Success, result);
Assert.True(payload.SequenceEqual(bytes));
Assert.Equal(1, pendingTags.Count);
Expand All @@ -208,7 +208,7 @@ public void Gibberish()
byte[] bytes = new byte[1024];
new Random().NextBytes(bytes);

var result = HybridCachePayload.TryParse(new(bytes), "whatever", TagSet.Empty, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(bytes), "whatever", TagSet.Empty, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.FormatNotRecognized, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand All @@ -235,7 +235,7 @@ public void RoundTrip_Truncated()
log.WriteLine($"bytes written: {actualLength}");
Assert.Equal(1063, actualLength);

var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength - 1), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength - 1), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.InvalidData, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand All @@ -262,7 +262,7 @@ public void RoundTrip_Oversized()
log.WriteLine($"bytes written: {actualLength}");
Assert.Equal(1063, actualLength);

var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength + 1), key, tags, cache, out var payload, out var flags, out var entropy, out var pendingTags, out _);
var result = HybridCachePayload.TryParse(new(oversized, 0, actualLength + 1), key, tags, cache, out var payload, out var remaining, out var flags, out var entropy, out var pendingTags, out _);
Assert.Equal(HybridCachePayload.HybridCachePayloadParseResult.InvalidData, result);
Assert.Equal(0, payload.Count);
Assert.True(pendingTags.IsEmpty);
Expand Down
Loading
Loading