Skip to content

Commit

Permalink
Merge pull request #3945 from greymistcube/chore/cleanup
Browse files Browse the repository at this point in the history
♻️ 🧹 Streamline block syncing
  • Loading branch information
greymistcube authored Sep 5, 2024
2 parents 19cacb7 + 8cfdc97 commit 475d433
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 144 deletions.
7 changes: 2 additions & 5 deletions src/Libplanet.Net/Swarm.BlockCandidate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ private async Task<bool> ProcessBlockDemandAsync(
var result = await BlockCandidateDownload(
peer: peer,
blockChain: BlockChain,
stop: demand.BlockExcerpt,
logSessionId: sessionId,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -423,17 +422,15 @@ private async Task<bool> ProcessBlockDemandAsync(
private async Task<bool> BlockCandidateDownload(
BoundPeer peer,
BlockChain blockChain,
IBlockExcerpt stop,
int logSessionId,
CancellationToken cancellationToken)
{
BlockLocator locator = blockChain.GetBlockLocator();
Block tip = blockChain.Tip;

List<(long, BlockHash)> hashes = await GetBlockHashes(
List<BlockHash> hashes = await GetBlockHashes(
peer: peer,
locator: locator,
timeout: null,
cancellationToken: cancellationToken);

if (!hashes.Any())
Expand All @@ -444,7 +441,7 @@ private async Task<bool> BlockCandidateDownload(

IAsyncEnumerable<(Block, BlockCommit)> blocksAsync = GetBlocksAsync(
peer,
hashes.Select(pair => pair.Item2),
hashes,
cancellationToken);
try
{
Expand Down
85 changes: 9 additions & 76 deletions src/Libplanet.Net/Swarm.BlockSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,101 +81,34 @@ private async Task PullBlocksAsync(

try
{
// NOTE: demandBlockHashes is always non-empty.
(var peer, var demandBlockHashes) = await GetDemandBlockHashes(
BlockChain,
peersWithBlockExcerpt,
progress,
cancellationToken);
foreach ((long index, BlockHash hash) in demandBlockHashes)
{
cancellationToken.ThrowIfCancellationRequested();

if (index == 0 && !hash.Equals(BlockChain.Genesis.Hash))
{
// FIXME: This behavior can unexpectedly terminate the swarm
// (and the game app) if it encounters a peer having a different
// blockchain, and therefore can be exploited to remotely shut
// down other nodes as well.
// Since the intention of this behavior is to prevent mistakes
// to try to connect incorrect seeds (by a user),
// this behavior should be limited for only seed peers.
// FIXME: ChainStatus message became to contain hash value of
// the genesis block, so this exception will not be happened.
var msg =
$"Since the genesis block is fixed to {BlockChain.Genesis} " +
"protocol-wise, the blockchain which does not share " +
"any mutual block is not acceptable.";
var e = new InvalidGenesisBlockException(
msg,
hash,
BlockChain.Genesis.Hash);
throw new AggregateException(msg, e);
}

_logger.Verbose(
"Enqueue #{BlockIndex} {BlockHash} to demands queue...",
index,
hash
);
totalBlocksToDownload++;
}
totalBlocksToDownload = demandBlockHashes.Count;

if (totalBlocksToDownload == 0)
{
_logger.Debug("No any blocks to fetch");
return;
}
_logger.Verbose(
"Enqueue {BlockHashes} to demands queue...",
demandBlockHashes);

var downloadedBlocks = GetBlocksAsync(
peer,
demandBlockHashes.Select(pair => pair.Item2),
demandBlockHashes,
cancellationToken);

await foreach (
(Block block, BlockCommit commit)
in downloadedBlocks.WithCancellation(cancellationToken))
(Block block, BlockCommit commit) in
downloadedBlocks.WithCancellation(cancellationToken))
{
_logger.Verbose(
"Got #{BlockIndex} {BlockHash} from {Peer}",
"Got block #{BlockIndex} {BlockHash} from {Peer}",
block.Index,
block.Hash,
peer);
cancellationToken.ThrowIfCancellationRequested();

if (block.Index == 0 && !block.Hash.Equals(BlockChain.Genesis.Hash))
{
// FIXME: This behavior can unexpectedly terminate the swarm
// (and the game app) if it encounters a peer having a different
// blockchain, and therefore can be exploited to remotely shut
// down other nodes as well.
// Since the intention of this behavior is to prevent mistakes
// to try to connect incorrect seeds (by a user),
// this behavior should be limited for only seed peers.
var msg =
$"Since the genesis block is fixed to {BlockChain.Genesis} " +
"protocol-wise, the blockchain which does not share " +
"any mutual block is not acceptable.";

// Although it's actually not aggregated, but to be consistent with
// above code throwing InvalidGenesisBlockException, makes this
// to wrap an exception with AggregateException... Not sure if
// it show be wrapped from the very beginning.
throw new AggregateException(
msg,
new InvalidGenesisBlockException(
msg,
block.Hash,
BlockChain.Genesis.Hash
)
);
}

block.ValidateTimestamp();
blocks.Add((block, commit));
if (block.Index > tempTip.Index)
{
tempTip = block;
}
}
}
catch (Exception e)
Expand Down
113 changes: 67 additions & 46 deletions src/Libplanet.Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -736,19 +736,13 @@ public Task AddPeersAsync(
}

// FIXME: This would be better if it's merged with GetDemandBlockHashes
internal async Task<List<(long, BlockHash)>> GetBlockHashes(
internal async Task<List<BlockHash>> GetBlockHashes(
BoundPeer peer,
BlockLocator locator,
TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
{
var sessionRandom = new System.Random();
var request = new GetBlockHashesMsg(locator);

TimeSpan transportTimeout = timeout is { } t
&& t > Options.TimeoutOptions.GetBlockHashesTimeout
? t
: Options.TimeoutOptions.GetBlockHashesTimeout;
const string sendMsg =
"Sending a {MessageType} message with locator [{LocatorHead}]";
_logger.Debug(
Expand All @@ -762,37 +756,45 @@ public Task AddPeersAsync(
parsedMessage = await Transport.SendMessageAsync(
peer,
request,
timeout: transportTimeout,
timeout: Options.TimeoutOptions.GetBlockHashesTimeout,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (CommunicationFailException)
{
_logger.Debug(
"Failed to get a response for " + nameof(GetBlockHashesMsg) +
" due to a communication failure");
return new List<(long, BlockHash)>();
return new List<BlockHash>();
}

if (parsedMessage.Content is BlockHashesMsg blockHashes)
{
if (blockHashes.StartIndex is long idx)
if (blockHashes.Hashes.Any())
{
List<(long, BlockHash)> hashes = blockHashes.Hashes
.Select((hash, i) => (idx + i, hash))
.ToList();
const string msg =
"Received a " + nameof(BlockHashesMsg) +
" message with an offset index {OffsetIndex} (total {Length} hashes)";
_logger.Debug(msg, idx, hashes.LongCount());
return hashes;
if (locator.Hash.Equals(blockHashes.Hashes.First()))
{
List<BlockHash> hashes = blockHashes.Hashes.ToList();
_logger.Debug(
"Received a " + nameof(BlockHashesMsg) + " with {Length} hashes",
hashes.Count);
return hashes;
}
else
{
const string msg =
"Received a " + nameof(BlockHashesMsg) + " but its " +
"first hash {ActualBlockHash} does not match " +
"the locator hash {ExpectedBlockHash}";
_logger.Debug(msg, blockHashes.Hashes.First(), locator.Hash);
return new List<BlockHash>();
}
}
else
{
const string msg =
"Received a " + nameof(BlockHashesMsg) +
" message, but it has zero hashes";
"Received a " + nameof(BlockHashesMsg) + " with zero hashes";
_logger.Debug(msg);
return new List<(long, BlockHash)>();
return new List<BlockHash>();
}
}
else
Expand All @@ -802,24 +804,23 @@ public Task AddPeersAsync(
" is expected to be {ExpectedType}: {ReceivedType}",
nameof(BlockHashesMsg),
parsedMessage.GetType());
return new List<(long, BlockHash)>();
return new List<BlockHash>();
}
}

internal async IAsyncEnumerable<(Block, BlockCommit)> GetBlocksAsync(
BoundPeer peer,
IEnumerable<BlockHash> blockHashes,
List<BlockHash> blockHashes,
[EnumeratorCancellation] CancellationToken cancellationToken
)
{
var blockHashesAsArray = blockHashes as BlockHash[] ?? blockHashes.ToArray();
_logger.Information(
"Trying to download {BlockHashes} block(s) from {Peer}...",
blockHashesAsArray.Length,
"Trying to download {BlockHashesCount} block(s) from {Peer}...",
blockHashes.Count,
peer);

var request = new GetBlocksMsg(blockHashesAsArray);
int hashCount = blockHashesAsArray.Length;
var request = new GetBlocksMsg(blockHashes);
int hashCount = blockHashes.Count;

if (hashCount < 1)
{
Expand Down Expand Up @@ -866,17 +867,40 @@ [EnumeratorCancellation] CancellationToken cancellationToken
message.Remote);
for (int i = 0; i < payloads.Count; i += 2)
{
cancellationToken.ThrowIfCancellationRequested();
byte[] blockPayload = payloads[i];
byte[] commitPayload = payloads[i + 1];
cancellationToken.ThrowIfCancellationRequested();
Block block = BlockMarshaler.UnmarshalBlock(
(Bencodex.Types.Dictionary)Codec.Decode(blockPayload));
BlockCommit commit = commitPayload.Length == 0
? null
: new BlockCommit(Codec.Decode(commitPayload));

yield return (block, commit);
count++;
if (count < blockHashes.Count)
{
if (blockHashes[count].Equals(block.Hash))
{
yield return (block, commit);
count++;
}
else
{
_logger.Debug(
"Expected a block with hash {ExpectedBlockHash} but " +
"received a block with hash {ActualBlockHash}",
blockHashes[count],
block.Hash);
yield break;
}
}
else
{
_logger.Debug(
"Expected to receive {BlockCount} blocks but " +
"received more blocks than expected",
blockHashes.Count);
yield break;
}
}
}
else
Expand Down Expand Up @@ -958,10 +982,9 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
/// <param name="progress">The <see cref="IProgress{T}"/> to report to.</param>
/// <param name="cancellationToken">The cancellation token that should be used to propagate
/// a notification that this operation should be canceled.</param>
/// <returns>An <see cref="IAsyncEnumerable{T}"/> of <see langword="long"/> and
/// <see cref="BlockHash"/> pairs, where the <see langword="long"/> value is the
/// <see cref="Block.Index"/> of the <see cref="Block"/> associated with the
/// <see cref="BlockHash"/> value.</returns>
/// <returns>An <see cref="List{T}"/> of <see cref="BlockHash"/>es together with
/// its source <see cref="BoundPeer"/>. This is guaranteed to always return a non-empty
/// <see cref="List{T}"/> unless an <see cref="Exception"/> is thrown.</returns>
/// <exception cref="AggregateException">Thrown when failed to download
/// <see cref="BlockHash"/>es from a <see cref="BoundPeer"/>.</exception>
/// <remarks>
Expand All @@ -981,7 +1004,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
/// to download.
/// </para>
/// </remarks>
internal async Task<(BoundPeer, List<(long, BlockHash)>)> GetDemandBlockHashes(
internal async Task<(BoundPeer, List<BlockHash>)> GetDemandBlockHashes(
BlockChain blockChain,
IList<(BoundPeer, IBlockExcerpt)> peersWithExcerpts,
IProgress<BlockSyncState> progress = null,
Expand All @@ -1000,7 +1023,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(

try
{
List<(long, BlockHash)> downloadedHashes = await GetDemandBlockHashesFromPeer(
List<BlockHash> downloadedHashes = await GetDemandBlockHashesFromPeer(
blockChain,
peer,
excerpt,
Expand Down Expand Up @@ -1036,7 +1059,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
exceptions);
}

internal async Task<List<(long, BlockHash)>> GetDemandBlockHashesFromPeer(
internal async Task<List<BlockHash>> GetDemandBlockHashesFromPeer(
BlockChain blockChain,
BoundPeer peer,
IBlockExcerpt excerpt,
Expand All @@ -1045,7 +1068,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
{
BlockLocator locator = blockChain.GetBlockLocator();
long peerIndex = excerpt.Index;
var downloaded = new List<(long, BlockHash)>();
var downloaded = new List<BlockHash>();

try
{
Expand All @@ -1056,20 +1079,18 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
peerIndex,
locator.Hash);

List<(long, BlockHash)> blockHashes = await GetBlockHashes(
List<BlockHash> blockHashes = await GetBlockHashes(
peer: peer,
locator: locator,
timeout: null,
cancellationToken: cancellationToken);

foreach (var pair in blockHashes)
foreach (var blockHash in blockHashes)
{
_logger.Verbose(
"Received a block hash from {Peer}: #{BlockIndex} {BlockHash}",
"Received a block hash from {Peer}: {BlockHash}",
peer,
pair.Item1,
pair.Item2);
downloaded.Add(pair);
blockHash);
downloaded.Add(blockHash);
}

return downloaded;
Expand Down
Loading

0 comments on commit 475d433

Please sign in to comment.