Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate device factory into a creator layer #970

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/cluster/ClusterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Garnet.cluster
public class ClusterFactory : IClusterFactory
{
/// <inheritdoc />
public DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default)
=> new ReplicationLogCheckpointManager(deviceFactory, checkpointNamingScheme, isMainStore, logger: logger);
public DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default)
=> new ReplicationLogCheckpointManager(deviceFactoryCreator, checkpointNamingScheme, isMainStore, logger: logger);

/// <inheritdoc />
public IClusterProvider CreateClusterProvider(StoreWrapper store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
namespace Garnet.cluster
{
internal sealed class ReplicationLogCheckpointManager(
INamedDeviceFactory deviceFactory,
INamedDeviceFactoryCreator deviceFactoryCreator,
ICheckpointNamingScheme checkpointNamingScheme,
bool isMainStore,
bool removeOutdated = false,
int fastCommitThrottleFreq = 0,
ILogger logger = null) : DeviceLogCommitCheckpointManager(deviceFactory, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger), IDisposable
ILogger logger = null) : DeviceLogCommitCheckpointManager(deviceFactoryCreator, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger), IDisposable
{
public long CurrentSafeAofAddress = 0;
public long RecoveredSafeAofAddress = 0;
Expand Down
14 changes: 8 additions & 6 deletions libs/common/StreamProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,21 @@ public static IStreamProvider GetStreamProvider(FileLocationType locationType, s
internal class AzureStreamProvider : StreamProviderBase
{
private readonly string _connectionString;
private readonly AzureStorageNamedDeviceFactoryCreator azureStorageNamedDeviceFactoryCreator;

public AzureStreamProvider(string connectionString)
{
this._connectionString = connectionString;
this.azureStorageNamedDeviceFactoryCreator = new AzureStorageNamedDeviceFactoryCreator(this._connectionString, default);
}

protected override IDevice GetDevice(string path)
{
var fileInfo = new FileInfo(path);
INamedDeviceFactory settingsDeviceFactoryCreator = new AzureStorageNamedDeviceFactory(this._connectionString, default);

// Get the container info, if it does not exist it will be created
settingsDeviceFactoryCreator.Initialize($"{fileInfo.Directory?.Name}");
var settingsDevice = settingsDeviceFactoryCreator.Get(new FileDescriptor("", fileInfo.Name));
var settingsDeviceFactory = azureStorageNamedDeviceFactoryCreator.Create($"{fileInfo.Directory?.Name}");
var settingsDevice = settingsDeviceFactory.Get(new FileDescriptor("", fileInfo.Name));
settingsDevice.Initialize(MaxConfigFileSizeAligned, epoch: null, omitSegmentIdFromFilename: false);
return settingsDevice;
}
Expand All @@ -183,19 +184,20 @@ protected override long GetBytesToWrite(byte[] bytes, IDevice device)
internal class LocalFileStreamProvider : StreamProviderBase
{
private readonly bool readOnly;
private readonly LocalStorageNamedDeviceFactoryCreator localDeviceFactoryCreator;

public LocalFileStreamProvider(bool readOnly = false)
{
this.readOnly = readOnly;
this.localDeviceFactoryCreator = new LocalStorageNamedDeviceFactoryCreator(disableFileBuffering: false, readOnly: readOnly);
}

protected override IDevice GetDevice(string path)
{
var fileInfo = new FileInfo(path);

INamedDeviceFactory settingsDeviceFactoryCreator = new LocalStorageNamedDeviceFactory(disableFileBuffering: false, readOnly: readOnly);
settingsDeviceFactoryCreator.Initialize("");
var settingsDevice = settingsDeviceFactoryCreator.Get(new FileDescriptor(fileInfo.DirectoryName, fileInfo.Name));
var settingsDeviceFactory = localDeviceFactoryCreator.Create("");
var settingsDevice = settingsDeviceFactory.Get(new FileDescriptor(fileInfo.DirectoryName, fileInfo.Name));
settingsDevice.Initialize(-1, epoch: null, omitSegmentIdFromFilename: true);
return settingsDevice;
}
Expand Down
4 changes: 2 additions & 2 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
ThreadPoolMaxIOCompletionThreads = ThreadPoolMaxIOCompletionThreads,
NetworkConnectionLimit = NetworkConnectionLimit,
DeviceFactoryCreator = useAzureStorage
? () => new AzureStorageNamedDeviceFactory(AzureStorageConnectionString, logger)
: () => new LocalStorageNamedDeviceFactory(useNativeDeviceLinux: UseNativeDeviceLinux.GetValueOrDefault(), logger: logger),
? new AzureStorageNamedDeviceFactoryCreator(AzureStorageConnectionString, logger)
: new LocalStorageNamedDeviceFactoryCreator(useNativeDeviceLinux: UseNativeDeviceLinux.GetValueOrDefault(), logger: logger),
CheckpointThrottleFlushDelayMs = CheckpointThrottleFlushDelayMs,
EnableScatterGatherGet = EnableScatterGatherGet.GetValueOrDefault(),
ReplicaSyncDelayMs = ReplicaSyncDelayMs,
Expand Down
14 changes: 6 additions & 8 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,14 @@ private void CreateMainStore(IClusterFactory clusterFactory, out string checkpoi
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
kvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;

var checkpointFactory = opts.DeviceFactoryCreator();
if (opts.EnableCluster)
{
kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(checkpointFactory,
kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true, logger);
}
else
{
kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(checkpointFactory,
kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
}

Expand All @@ -335,11 +334,11 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana

if (opts.EnableCluster)
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
opts.DeviceFactoryCreator(),
opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
isMainStore: false, logger);
else
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator(),
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
removeOutdated: true);

Expand Down Expand Up @@ -407,9 +406,8 @@ public void Dispose(bool deleteDir = true)
logFactory?.Delete(new FileDescriptor { directoryName = "" });
if (opts.CheckpointDir != opts.LogDir && !string.IsNullOrEmpty(opts.CheckpointDir))
{
var ckptdir = opts.DeviceFactoryCreator();
ckptdir.Initialize(opts.CheckpointDir);
ckptdir.Delete(new FileDescriptor { directoryName = "" });
var checkpointDeviceFactory = opts.DeviceFactoryCreator.Create(opts.CheckpointDir);
checkpointDeviceFactory.Delete(new FileDescriptor { directoryName = "" });
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Cluster/IClusterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface IClusterFactory
/// <summary>
/// Create checkpoint manager
/// </summary>
DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default);
DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default);

/// <summary>
/// Create cluster provider
Expand Down
18 changes: 5 additions & 13 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ public class GarnetServerOptions : ServerOptions
public int NetworkConnectionLimit = -1;

/// <summary>
/// Creator of device factories
/// Func to help create device factories
/// </summary>
public Func<INamedDeviceFactory> DeviceFactoryCreator = null;
public INamedDeviceFactoryCreator DeviceFactoryCreator = null;

/// <summary>
/// Whether and by how much should we throttle the disk IO for checkpoints (default = 0)
Expand Down Expand Up @@ -495,7 +495,7 @@ public KVSettings<SpanByte, SpanByte> GetSettings(ILoggerFactory loggerFactory,
}
logger?.LogInformation("[Store] Using log mutable percentage of {MutablePercent}%", MutablePercent);

DeviceFactoryCreator ??= () => new LocalStorageNamedDeviceFactory(useNativeDeviceLinux: UseNativeDeviceLinux, logger: logger);
DeviceFactoryCreator ??= new LocalStorageNamedDeviceFactoryCreator(useNativeDeviceLinux: UseNativeDeviceLinux, logger: logger);

if (LatencyMonitor && MetricsSamplingFrequency == 0)
throw new Exception("LatencyMonitor requires MetricsSamplingFrequency to be set");
Expand Down Expand Up @@ -735,7 +735,7 @@ public void GetAofSettings(out TsavoriteLogSettings tsavoriteLogSettings)
throw new Exception("AOF Page size cannot be more than the AOF memory size.");
}
tsavoriteLogSettings.LogCommitManager = new DeviceLogCommitCheckpointManager(
MainMemoryReplication ? new NullNamedDeviceFactory() : DeviceFactoryCreator(),
MainMemoryReplication ? new NullNamedDeviceFactoryCreator() : DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/AOF"),
removeOutdated: true,
fastCommitThrottleFreq: EnableFastCommit ? FastCommitThrottleFreq : 0);
Expand All @@ -748,9 +748,7 @@ public void GetAofSettings(out TsavoriteLogSettings tsavoriteLogSettings)
/// <returns></returns>
public INamedDeviceFactory GetInitializedDeviceFactory(string baseName)
{
var deviceFactory = GetDeviceFactory();
deviceFactory.Initialize(baseName);
return deviceFactory;
return DeviceFactoryCreator.Create(baseName);
}

/// <summary>
Expand Down Expand Up @@ -829,11 +827,5 @@ IDevice GetAofDevice()
if (UseAofNullDevice) return new NullDevice();
else return GetInitializedDeviceFactory(CheckpointDir).Get(new FileDescriptor("AOF", "aof.log"));
}

/// <summary>
/// Get device factory
/// </summary>
/// <returns></returns>
public INamedDeviceFactory GetDeviceFactory() => DeviceFactoryCreator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="deviceFactoryCreator">Factory for getting devices</param>
/// <param name="checkpointNamingScheme">Checkpoint naming helper</param>
/// <param name="removeOutdated">Remote older Tsavorite log commits</param>
/// <param name="fastCommitThrottleFreq">FastCommit throttle frequency - use only in FastCommit mode</param>
/// <param name="logger">Remote older Tsavorite log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
public DeviceLogCommitCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
{
this.logger = logger;
this.deviceFactory = deviceFactory;
this.deviceFactory = deviceFactoryCreator.Create(checkpointNamingScheme.BaseName);
this.checkpointNamingScheme = checkpointNamingScheme;
this.fastCommitThrottleFreq = fastCommitThrottleFreq;

Expand All @@ -76,7 +76,6 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec
// // We only keep the latest TsavoriteLog commit
flogCommitHistory = new long[flogCommitCount];
}
deviceFactory.Initialize(checkpointNamingScheme.BaseName);
}

/// <inheritdoc />
Expand All @@ -96,12 +95,12 @@ public void Purge(Guid token)
/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="deviceFactoryCreator">Creator of factory for getting devices</param>
/// <param name="baseName">Overall location specifier (e.g., local path or cloud container name)</param>
/// <param name="removeOutdated">Remote older Tsavorite log commits</param>
/// <param name="logger">Remote older Tsavorite log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, string baseName, bool removeOutdated = false, ILogger logger = null)
: this(deviceFactory, new DefaultCheckpointNamingScheme(baseName), removeOutdated)
public DeviceLogCommitCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, string baseName, bool removeOutdated = false, ILogger logger = null)
: this(deviceFactoryCreator, new DefaultCheckpointNamingScheme(baseName), removeOutdated)
{
this.logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@
namespace Tsavorite.core
{
/// <summary>
/// Factory for getting IDevice instances for checkpointing
/// Factory for getting IDevice instances for checkpointing. The factory is specific to a particular base path or container.
/// </summary>
public interface INamedDeviceFactory
{
/// <summary>
/// Initialize base name or container
/// </summary>
/// <param name="baseName">Base name or container</param>
void Initialize(string baseName);

/// <summary>
/// Get IDevice instance for given file info
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Tsavorite.core
{
/// <summary>
/// Factory creator for getting IDevice instances for checkpointing
/// </summary>
public interface INamedDeviceFactoryCreator
{
/// <summary>
/// Create factory for creating IDevice instances, for the given base name or container
/// </summary>
/// <param name="baseName">Base name or container</param>
/// <returns></returns>
INamedDeviceFactory Create(string baseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Tsavorite.core
/// </summary>
public class LocalStorageNamedDeviceFactory : INamedDeviceFactory
{
string baseName;
readonly string baseName;
readonly bool deleteOnClose;
readonly int? throttleLimit;
readonly bool preallocateFile;
Expand All @@ -32,22 +32,19 @@ public class LocalStorageNamedDeviceFactory : INamedDeviceFactory
/// <param name="disableFileBuffering">Whether file buffering (during write) is disabled (default of true requires aligned writes)</param>
/// <param name="throttleLimit">Throttle limit (max number of pending I/Os) for this device instance</param>
/// <param name="useNativeDeviceLinux">Use native device on Linux</param>
/// <param name="readOnly"></param>
/// <param name="baseName"></param>
/// <param name="logger"></param>
public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, ILogger logger = null)
public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, string baseName = null, ILogger logger = null)
{
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
this.disableFileBuffering = disableFileBuffering;
this.throttleLimit = throttleLimit;
this.useNativeDeviceLinux = useNativeDeviceLinux;
this.readOnly = readOnly;
this.logger = logger;
}

/// <inheritdoc />
public void Initialize(string baseName)
{
this.baseName = baseName;
this.logger = logger;
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using Microsoft.Extensions.Logging;

namespace Tsavorite.core
{
public class LocalStorageNamedDeviceFactoryCreator : INamedDeviceFactoryCreator
{
readonly bool preallocateFile;
readonly bool deleteOnClose;
readonly int? throttleLimit;
readonly bool disableFileBuffering;
readonly bool useNativeDeviceLinux;
readonly bool readOnly;
readonly ILogger logger;

public LocalStorageNamedDeviceFactoryCreator(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, ILogger logger = null)
{
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
this.disableFileBuffering = disableFileBuffering;
this.throttleLimit = throttleLimit;
this.useNativeDeviceLinux = useNativeDeviceLinux;
this.readOnly = readOnly;
this.logger = logger;
}

public INamedDeviceFactory Create(string baseName)
{
return new LocalStorageNamedDeviceFactory(preallocateFile, deleteOnClose, disableFileBuffering, throttleLimit, useNativeDeviceLinux, readOnly, baseName, logger);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace Tsavorite.core
{
/// <summary>
/// Local storage device factory
/// Null device factory
/// </summary>
public class NullNamedDeviceFactory : INamedDeviceFactory
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Tsavorite.core
{
/// <summary>
/// Creator of factory for getting null device instances
/// </summary>
public class NullNamedDeviceFactoryCreator : INamedDeviceFactoryCreator
{
static readonly NullNamedDeviceFactory nullNamedDeviceFactory = new();

public INamedDeviceFactory Create(string baseName)
{
return nullNamedDeviceFactory;
}
}
}
Loading