Skip to content

Commit

Permalink
Merge branch 'master' into pectra_devnet_4
Browse files Browse the repository at this point in the history
  • Loading branch information
rjnrohit authored Oct 24, 2024
2 parents 480957a + a262636 commit 84e114c
Show file tree
Hide file tree
Showing 23 changed files with 88 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected override async Task RunFullPruning(CancellationToken cancellationToken
}
}

[Test, MaxTime(Timeout.MaxTestTime), Retry(5)]
[Test, MaxTime(Timeout.LongTestTime)]
public async Task prune_on_disk_multiple_times()
{
using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 0 });
Expand All @@ -142,7 +142,7 @@ public async Task prune_on_disk_multiple_times()
}
}

[Test, MaxTime(Timeout.MaxTestTime), Retry(5)]
[Test, MaxTime(Timeout.LongTestTime)]
public async Task prune_on_disk_only_once()
{
using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 10 });
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Nethermind.Blockchain.Test;
internal class Timeout
{
public const int LongTestTime = 60_000;
public const int MaxTestTime = 10_000;
public const int MaxWaitTime = 1_000;
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Core.Test/TestMemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public override IWriteBatch StartWriteBatch()
return new InMemoryWriteBatch(this);
}

public override void Flush()
public override void Flush(bool onlyWal)
{
FlushCount++;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return _mainDb.KeyExistsWithColumn(key, _columnFamily);
}

public void Flush()
public void Flush(bool onlyWal)
{
_mainDb.Flush();
_mainDb.Flush(onlyWal);
}

public void Compact()
Expand Down
15 changes: 10 additions & 5 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,23 +1321,28 @@ private void FlushOnTooManyWrites()
}
}

public void Flush()
public void Flush(bool onlyWal = false)
{
ObjectDisposedException.ThrowIf(_isDisposing, this);

InnerFlush();
InnerFlush(onlyWal);
}

public virtual void Compact()
{
_db.CompactRange(Keccak.Zero.BytesToArray(), Keccak.MaxValue.BytesToArray());
}

private void InnerFlush()
private void InnerFlush(bool onlyWal)
{
try
{
_rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle);
_rocksDbNative.rocksdb_flush_wal(_db.Handle, true);

if (!onlyWal)
{
_rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle);
}
}
catch (RocksDbSharpException e)
{
Expand Down Expand Up @@ -1439,7 +1444,7 @@ public void Dispose()
dbMetricsUpdater.Dispose();
}

InnerFlush();
InnerFlush(false);
ReleaseUnmanagedResources();

_dbsByPath.Remove(_fullPath!, out _);
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ public IColumnsWriteBatch<T> StartWriteBatch()
return new InMemoryColumnWriteBatch<T>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)

public IDb Innermost => this; // record db is just a helper DB here
public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _recordDb.GetAll();
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db/CompressingDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public IEnumerable<byte[]> GetAllValues(bool ordered = false) =>

public bool KeyExists(ReadOnlySpan<byte> key) => _wrapped.KeyExists(key);

public void Flush() => _wrapped.Flush();
public void Flush(bool onlyWal) => _wrapped.Flush(onlyWal);

public void Clear() => _wrapped.Clear();

Expand Down
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ public void Remove(ReadOnlySpan<byte> key)
public IDb Innermost => this;

// we need to flush both DB's
public void Flush()
public void Flush(bool onlyWal)
{
_currentDb.Flush();
_currentDb.Flush(onlyWal);
IDb? cloningDb = _pruningContext?.CloningDb;
cloningDb?.Flush();
cloningDb?.Flush(onlyWal);
}

// we need to clear both DB's
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db/IDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface IDbMeta
{
DbMetric GatherMetric(bool includeSharedCache = false) => new DbMetric();

void Flush() { }
void Flush(bool onlyWal = false);
void Clear() { }
void Compact() { }

Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db/MemColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
}
4 changes: 1 addition & 3 deletions src/Nethermind/Nethermind.Db/MemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public bool KeyExists(ReadOnlySpan<byte> key)

public IDb Innermost => this;

public virtual void Flush()
{
}
public virtual void Flush(bool onlyWal = false) { }

public void Clear()
{
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/NullDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return false;
}

public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => Enumerable.Empty<KeyValuePair<byte[], byte[]>>();
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ public void Dispose()
readOnlyColumn.Value.Dispose();
}
}

public void Flush(bool onlyWal = false) { }
}
}
6 changes: 1 addition & 5 deletions src/Nethermind/Nethermind.Db/ReadOnlyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ public void Remove(ReadOnlySpan<byte> key) { }

public bool KeyExists(ReadOnlySpan<byte> key) => _memDb.KeyExists(key) || wrappedDb.KeyExists(key);

public void Flush()
{
wrappedDb.Flush();
_memDb.Flush();
}
public void Flush(bool onlyWal) { }

public void Clear() => throw new InvalidOperationException();

Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return _cache.ContainsKey(key);
}

public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear()
{
File.Delete(DbPath);
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/InitializeStateDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ public Task Execute(CancellationToken cancellationToken)
var minimumWriteBufferMb = 0.2 * pruningConfig.CacheMb;
if (totalWriteBufferMb < minimumWriteBufferMb)
{
int minimumWriteBufferNumber = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferSize);
long minimumWriteBufferSize = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferNumber);

if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.WriteBufferNumber)} {minimumWriteBufferNumber}`.");
if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.StateDbWriteBufferSize)} {minimumWriteBufferSize}`.");
}

pruningStrategy = Prune
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ private void SaveNode(StateSyncItem syncItem, byte[] data)
{
if (_logger.IsInfo) _logger.Info($"Saving root {syncItem.Hash} of {_branchProgress.CurrentSyncBlock}");

_nodeStorage.Flush();
_nodeStorage.Flush(onlyWal: false);
_codeDb.Flush();

Interlocked.Exchange(ref _rootSaved, 1);
Expand Down
26 changes: 24 additions & 2 deletions src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ public void Memory_with_one_node_is_288()
trieNode.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize);
}

[Test]
public void Flush_ShouldBeCalledOnEachPersist()
{
TrieNode trieNode = new(NodeType.Leaf, Keccak.Zero);

TestMemDb testMemDb = new TestMemDb();
using TrieStore fullTrieStore = CreateTrieStore(persistenceStrategy: Archive.Instance, kvStore: testMemDb);
PatriciaTree pt = new PatriciaTree(fullTrieStore.GetTrieStore(null), LimboLogs.Instance);

for (int i = 0; i < 4; i++)
{
pt.Set(TestItem.KeccakA.BytesToArray(), TestItem.Keccaks[i].BytesToArray());
using (ICommitter? committer = fullTrieStore.BeginStateBlockCommit(i + 1, trieNode))
{
pt.Commit();
}
}

testMemDb.FlushCount.Should().Be(4);
}

[Test]
public void Pruning_off_cache_should_not_change_commit_node()
Expand Down Expand Up @@ -978,8 +998,9 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning()
pruningStrategy: new TestPruningStrategy(true, true, 2, 100000),
persistenceStrategy: isPruningPersistenceStrategy);

IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null);
TreePath emptyPath = TreePath.Empty;
TaskCompletionSource tcs = new TaskCompletionSource();
fullTrieStore.OnMemoryPruneCompleted += (sender, args) => tcs.TrySetResult();

for (int i = 0; i < 64; i++)
{
Expand All @@ -990,7 +1011,8 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning()
}

// Pruning is done in background
await Task.Delay(TimeSpan.FromMilliseconds(10));
await tcs.Task;
tcs = new TaskCompletionSource();
}

memDb.Count.Should().Be(61);
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Trie/INodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface INodeStorage
/// <summary>
/// Used by StateSync to make sure values are flushed.
/// </summary>
void Flush();
/// <param name="onlyWal">True if only WAL file should be flushed, not memtable.</param>
void Flush(bool onlyWal);
void Compact();

public enum KeyScheme
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Trie/NodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, Read
_keyValueStore.PutSpan(GetExpectedPath(stackalloc byte[StoragePathLength], address, path, keccak), data, writeFlags);
}

public void Flush()
public void Flush(bool onlyWal)
{
if (_keyValueStore is IDb db)
{
db.Flush();
db.Flush(onlyWal);
}
}

Expand Down
36 changes: 23 additions & 13 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ private void FinishBlockCommit(BlockCommitSet set, TrieNode? root)

public event EventHandler<ReorgBoundaryReached>? ReorgBoundaryReached;

// Used in testing to not have to wait for condition.
public event EventHandler OnMemoryPruneCompleted;

public byte[]? TryLoadRlp(Hash256? address, in TreePath path, Hash256 keccak, INodeStorage? nodeStorage, ReadFlags readFlags = ReadFlags.None)
{
nodeStorage ??= _nodeStorage;
Expand Down Expand Up @@ -457,14 +460,15 @@ public void Prune()
// otherwise, it may not fit the whole dirty cache.
// Additionally, if (WriteBufferSize * (WriteBufferNumber - 1)) is already more than 20% of pruning
// cache, it is likely that there are enough space for it on most time, except for syncing maybe.
_nodeStorage.Flush();
_nodeStorage.Flush(onlyWal: false);
lock (_dirtyNodesLock)
{
long start = Stopwatch.GetTimestamp();
if (_logger.IsDebug) _logger.Debug($"Locked {nameof(TrieStore)} for pruning.");
long memoryUsedByDirtyCache = MemoryUsedByDirtyCache;
if (!_pruningTaskCancellationTokenSource.IsCancellationRequested && _pruningStrategy.ShouldPrune(memoryUsedByDirtyCache))
if (!_pruningTaskCancellationTokenSource.IsCancellationRequested &&
_pruningStrategy.ShouldPrune(memoryUsedByDirtyCache))
{
// Most of the time in memory pruning is on `PrunePersistedRecursively`. So its
// usually faster to just SaveSnapshot causing most of the entry to be persisted.
Expand Down Expand Up @@ -497,6 +501,11 @@ public void Prune()
if (_logger.IsError) _logger.Error("Pruning failed with exception.", e);
}
});

_pruningTask.ContinueWith((_) =>
{
OnMemoryPruneCompleted?.Invoke(this, EventArgs.Empty);
});
}
}

Expand Down Expand Up @@ -673,9 +682,7 @@ public void WaitForPruning()
private ConcurrentQueue<BlockCommitSet> CommitSetQueue =>
(_commitSetQueue ?? CreateQueueAtomic(ref _commitSetQueue));

#if DEBUG
private BlockCommitSet? _lastCommitSet = null;
#endif

private long _memoryUsedByDirtyCache;

Expand Down Expand Up @@ -703,18 +710,20 @@ private BlockCommitSet CreateCommitSet(long blockNumber)
{
if (_logger.IsDebug) _logger.Debug($"Beginning new {nameof(BlockCommitSet)} - {blockNumber}");

// TODO: this throws on reorgs, does it not? let us recreate it in test
#if DEBUG
Debug.Assert(_lastCommitSet == null || blockNumber == _lastCommitSet.BlockNumber + 1 || _lastCommitSet.BlockNumber == 0, $"Newly begun block is not a successor of the last one.");
Debug.Assert(_lastCommitSet == null || _lastCommitSet.IsSealed, "Not sealed when beginning new block");
#endif
if (_lastCommitSet is not null)
{
Debug.Assert(_lastCommitSet.IsSealed, "Not sealed when beginning new block");

if (_lastCommitSet.BlockNumber != blockNumber - 1 && blockNumber != 0 && _lastCommitSet.BlockNumber != 0)
{
if (_logger.IsInfo) _logger.Info($"Non consecutive block commit. This is likely a reorg. Last block commit: {_lastCommitSet.BlockNumber}. New block commit: {blockNumber}.");
}
}

BlockCommitSet commitSet = new(blockNumber);
CommitSetQueue.Enqueue(commitSet);

#if DEBUG
_lastCommitSet = commitSet;
#endif

LatestCommittedBlockNumber = Math.Max(blockNumber, LatestCommittedBlockNumber);
// Why are we announcing **before** committing next block??
Expand Down Expand Up @@ -757,7 +766,7 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
}
}

if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root} in {commitSet.BlockNumber}");
if (_logger.IsInfo) _logger.Info($"Persisting from root {commitSet.Root?.Keccak} in block {commitSet.BlockNumber}");

long start = Stopwatch.GetTimestamp();

Expand Down Expand Up @@ -793,8 +802,9 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
disposeQueue.CompleteAdding();
Task.WaitAll(_disposeTasks);

// Dispose top level last in case something goes wrong, at least the root wont be stored
// Dispose top level last in case something goes wrong, at least the root won't be stored
topLevelWriteBatch.Dispose();
_nodeStorage.Flush(onlyWal: true);

long elapsedMilliseconds = (long)Stopwatch.GetElapsedTime(start).TotalMilliseconds;
Metrics.SnapshotPersistenceTime = elapsedMilliseconds;
Expand Down

0 comments on commit 84e114c

Please sign in to comment.