Skip to content

Commit

Permalink
Implemented an async interface for HzCache and RedisHzCache.
Browse files Browse the repository at this point in the history
  • Loading branch information
aheintz committed Mar 27, 2024
1 parent 28ef3c0 commit 94f3af2
Show file tree
Hide file tree
Showing 21 changed files with 988 additions and 215 deletions.
2 changes: 1 addition & 1 deletion HzCache.Benchmarks/HzCache.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<ItemGroup>
<ProjectReference Include="..\HzMemoryCache\HzMemoryCache.csproj"/>
<ProjectReference Include="..\RedisBackplaneHzCache\RedisBackplaneHzCache.csproj"/>
<ProjectReference Include="..\RedisBackedHzCache\RedisBackedHzCache.csproj"/>
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion HzCache.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Runtime.Caching;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using hzcache;
using HzCache;
using HzCache.Benchmarks;

BenchmarkRunner.Run<WithRedisInvalidation>();
Expand Down
4 changes: 1 addition & 3 deletions HzCache.Benchmarks/WithRedisInvalidation.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
using System.Collections.Concurrent;
using System.Runtime.Caching;
using BenchmarkDotNet.Attributes;
using hzcache;
using RedisBackplaneMemoryCache;

namespace HzCache.Benchmarks
{
[ShortRunJob]
[MemoryDiagnoser]
public class WithRedisInvalidation
{
private static readonly IDetailedHzCache _hzCache = new RedisBackplaneHzCache(new RedisBackplaneMemoryMemoryCacheOptions
private static readonly IDetailedHzCache _hzCache = new RedisBackedHzCache(new RedisBackedHzCacheOptions
{
applicationCachePrefix = "benchmark",
redisConnectionString = "localhost",
Expand Down
2 changes: 1 addition & 1 deletion HzMemoryCache/HzCacheMemoryLocker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;

namespace hzcache
namespace HzCache
{
public class HzCacheMemoryLockerOptions
{
Expand Down
23 changes: 12 additions & 11 deletions HzMemoryCache/HzMemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.Logging;

namespace hzcache
namespace HzCache
{
/// <summary>
/// Simple MemoryCache alternative. Basically a concurrent dictionary with expiration and cache value change
/// notifications.
/// </summary>
public class HzMemoryCache : IEnumerable<KeyValuePair<string, object>>, IDisposable, IDetailedHzCache
public partial class HzMemoryCache : IEnumerable<KeyValuePair<string, object>>, IDisposable, IDetailedHzCache
{
private static readonly IPropagatorBlock<TTLValue, IList<TTLValue>> updateChecksumAndSerializeQueue = CreateBuffer<TTLValue>(TimeSpan.FromMilliseconds(10), 100);
private static readonly IPropagatorBlock<TTLValue, IList<TTLValue>> updateChecksumAndSerializeQueue = CreateBuffer<TTLValue>(TimeSpan.FromMilliseconds(35), 100);
private static readonly SemaphoreSlim globalStaticLock = new(1);
private readonly Timer cleanUpTimer;
private readonly ConcurrentDictionary<string, TTLValue> dictionary = new();
Expand All @@ -42,6 +42,7 @@ public HzMemoryCache(HzCacheOptions? options = null)
}

public int Count => dictionary.Count;
public long SizeInBytes => dictionary.Values.Sum(ttlv => ttlv.sizeInBytes);


public void RemoveByPattern(string pattern, bool sendNotification = true)
Expand Down Expand Up @@ -135,7 +136,6 @@ public void Set<T>(string key, T? value)
/// </summary>
public void Set<T>(string key, T? value, TimeSpan ttl)
{
// Console.WriteLine($"[{options.instanceId}] Set {key} with TTL {ttl} and value {value}");
var v = new TTLValue(key, value, ttl, updateChecksumAndSerializeQueue, options.notificationType,
(tv, objectData) => NotifyItemChange(key, CacheItemChangeType.AddOrUpdate, tv, objectData));
dictionary[key] = v;
Expand Down Expand Up @@ -171,12 +171,13 @@ public void Set<T>(string key, T? value, TimeSpan ttl)
ReleaseLock(factoryLock, "GET", key);
return value;
}

public IList<T> GetOrSetBatch<T>(IList<string> keys, Func<IList<string>, List<KeyValuePair<string, T>>> valueFactory)
{
return GetOrSetBatch(keys, valueFactory, options.defaultTTL);
}
public IList<T> GetOrSetBatch<T>(IList<string> keys, Func<IList<string>, List<KeyValuePair<string,T>>> valueFactory, TimeSpan ttl)

public IList<T> GetOrSetBatch<T>(IList<string> keys, Func<IList<string>, List<KeyValuePair<string, T>>> valueFactory, TimeSpan ttl)
{
var cachedItems = keys.Select(key => new KeyValuePair<string, T?>(key, Get<T>(key)));
var missingKeys = cachedItems.Where(kvp => IsNullOrDefault(kvp.Value)).Select(kvp => kvp.Key).ToList();
Expand All @@ -189,6 +190,7 @@ public IList<T> GetOrSetBatch<T>(IList<string> keys, Func<IList<string>, List<Ke
{
value = kv.Value;
}

if (kv.Value == null)
{
factoryRetrievedItems.TryGetValue(kv.Key, out value);
Expand All @@ -199,6 +201,7 @@ public IList<T> GetOrSetBatch<T>(IList<string> keys, Func<IList<string>, List<Ke
}).Where(v => v.Value is not null).Select(kv => kv.Value).ToList();
}


/// <summary>
/// @see <see cref="IHzCache" />
/// </summary>
Expand All @@ -211,14 +214,14 @@ public bool Remove(string key)
/// <summary>
/// @see <see cref="IDetailedHzCache" />
/// </summary>
public bool Remove(string key, bool sendBackplaneNotification = true, Func<string, bool>? skipRemoveIfEqualFunc = null)
public bool Remove(string key, bool sendBackplaneNotification, Func<string, bool>? skipRemoveIfEqualFunc = null)
{
return RemoveItem(key, CacheItemChangeType.Remove, sendBackplaneNotification, skipRemoveIfEqualFunc);
}

public CacheStatistics GetStatistics()
{
return new CacheStatistics {Counts = Count, SizeInBytes = 0};
return new CacheStatistics {Counts = Count, SizeInBytes = SizeInBytes};
}

/// <summary>
Expand Down Expand Up @@ -301,7 +304,6 @@ private async Task ProcessExpiredEviction()

private bool RemoveItem(string key, CacheItemChangeType changeType, bool sendNotification, Func<string, bool>? areEqualFunc = null)
{
// Console.WriteLine($"[{options.instanceId}] Delete {key}");
var result = !(!dictionary.TryGetValue(key, out TTLValue ttlValue) || (areEqualFunc != null && areEqualFunc.Invoke(ttlValue.checksum)));

if (result)
Expand All @@ -323,8 +325,7 @@ private bool RemoveItem(string key, CacheItemChangeType changeType, bool sendNot

private void NotifyItemChange(string key, CacheItemChangeType changeType, TTLValue ttlValue, byte[]? objectData = null, bool isPattern = false)
{
// Console.WriteLine($"Publishing {changeType} for {key} and pattern {isPattern}");
options.valueChangeListener.Invoke(key, changeType, ttlValue, objectData, isPattern);
options.valueChangeListener(key, changeType, ttlValue, objectData, isPattern);
}

public void SetRaw(string key, TTLValue value)
Expand Down
2 changes: 1 addition & 1 deletion HzMemoryCache/HzMemoryCache.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<LangVersion>latestmajor</LangVersion>
<RootNamespace>hzcache</RootNamespace>
<RootNamespace>HzCache</RootNamespace>
</PropertyGroup>

<ItemGroup>
Expand Down
170 changes: 170 additions & 0 deletions HzMemoryCache/HzMemoryCacheAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace HzCache
{
public partial class HzMemoryCache
{
public Task SetAsync<T>(string key, T? value)
{
return SetAsync(key, value, options.defaultTTL);
}

public Task SetAsync<T>(string key, T? value, TimeSpan ttl)
{
Set(key, value, ttl);
return Task.CompletedTask;
}

public async Task<T?> GetOrSetAsync<T>(string key, Func<string, Task<T>> valueFactory, TimeSpan ttl, long maxMsToWaitForFactory = 10000)
{
var value = Get<T>(key);
if (!IsNullOrDefault(value))
{
return value;
}

options.logger?.LogDebug("Cache miss for key {Key}, calling value factory", key);

var factoryLock = memoryLocker.AcquireLock(options.applicationCachePrefix, options.instanceId, "GET", key, TimeSpan.FromMilliseconds(maxMsToWaitForFactory),
options.logger, CancellationToken.None);
if (factoryLock is null)
{
options.logger?.LogDebug("Could not acquire lock for key {Key}, returning default value", key);
return value;
}

value = await valueFactory(key);
var ttlValue = new TTLValue(key, value, ttl, updateChecksumAndSerializeQueue, options.notificationType, (tv, objectData) =>
{
NotifyItemChange(key, CacheItemChangeType.AddOrUpdate, tv, objectData);
});
dictionary[key] = ttlValue;
ReleaseLock(factoryLock, "GET", key);
return value;
}


public async Task<IList<T>> GetOrSetBatchAsync<T>(IList<string> keys, Func<IList<string>, Task<List<KeyValuePair<string, T>>>> valueFactory)
{
return await GetOrSetBatchAsync(keys, valueFactory, options.defaultTTL);
}

public async Task<IList<T>> GetOrSetBatchAsync<T>(IList<string> keys, Func<IList<string>, Task<List<KeyValuePair<string, T>>>> valueFactory, TimeSpan ttl)
{
var cachedItems = keys.Select(key => new KeyValuePair<string, T?>(key, Get<T>(key)));
var missingKeys = cachedItems.Where(kvp => IsNullOrDefault(kvp.Value)).Select(kvp => kvp.Key).ToList();
var factoryRetrievedItems = (await valueFactory(missingKeys)).ToDictionary(kv => kv.Key, kv => kv.Value);

return cachedItems.Select(kv =>
{
T? value = default;
if (kv.Value != null)
{
value = kv.Value;
}

if (kv.Value == null)
{
factoryRetrievedItems.TryGetValue(kv.Key, out value);
Set(kv.Key, value, ttl);
}

return new KeyValuePair<string, T?>(kv.Key, value);
}).Where(v => v.Value is not null).Select(kv => kv.Value).ToList();
}

public async Task ClearAsync()
{
var kvps = dictionary.ToArray();
dictionary.Clear();
foreach (var kv in kvps)
{
NotifyItemChange("*", CacheItemChangeType.Remove, null, null, true);
}
}

public async Task<bool> RemoveAsync(string key)
{
return await RemoveAsync(key, options.notificationType != NotificationType.None);
}

public async Task RemoveByPatternAsync(string pattern, bool sendNotification = true)
{
var myPattern = pattern;
if (pattern[0] != '*')
{
myPattern = "^" + pattern;
}

var re = new Regex(myPattern.Replace("*", ".*"));
var victims = dictionary.Keys.Where(k => re.IsMatch(k)).ToList();
victims.AsParallel().ForAll(async key =>
{
await RemoveItemAsync(key, CacheItemChangeType.Remove, false);
});
if (sendNotification)
{
NotifyItemChange(pattern, CacheItemChangeType.Remove, null, null, true);
}
}

public async Task<T> GetAsync<T>(string key)
{
var defaultValue = default(T);

if (!dictionary.TryGetValue(key, out var ttlValue))
{
return defaultValue;
}

if (ttlValue.IsExpired()) //found but expired
{
return defaultValue;
}

if (options.evictionPolicy == EvictionPolicy.LRU)
{
ttlValue.UpdateTimeToKill();
}

if (ttlValue.value is T o)
{
return o;
}

return default;
}

public async Task<bool> RemoveAsync(string key, bool sendBackplaneNotification = true, Func<string, bool>? skipRemoveIfEqualFunc = null)
{
return await RemoveItemAsync(key, CacheItemChangeType.Remove, sendBackplaneNotification, skipRemoveIfEqualFunc);
}

private async Task<bool> RemoveItemAsync(string key, CacheItemChangeType changeType, bool sendNotification, Func<string, bool>? areEqualFunc = null)
{
var result = !(!dictionary.TryGetValue(key, out var ttlValue) || (areEqualFunc != null && areEqualFunc.Invoke(ttlValue.checksum)));

if (result)
{
result = dictionary.TryRemove(key, out ttlValue);
if (result)
{
result = !ttlValue.IsExpired();
}
}

if (sendNotification)
{
NotifyItemChange(key, changeType, ttlValue);
}

return result;
}
}
}
Loading

0 comments on commit 94f3af2

Please sign in to comment.