Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): optimize fetch inventory message processing logic #5895

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep

FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg;

check(peer, fetchInvDataMsg);
boolean isAdv = isAdvInv(peer, fetchInvDataMsg);

check(peer, fetchInvDataMsg, isAdv);

InventoryType type = fetchInvDataMsg.getInventoryType();
List<Transaction> transactions = Lists.newArrayList();
Expand All @@ -64,6 +66,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep

for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
Item item = new Item(hash, type);
if (isAdv) {
peer.getAdvInvSpread().invalidate(item);
}

Message message = advService.getMessage(item);
if (message == null) {
try {
Expand Down Expand Up @@ -127,7 +133,21 @@ private void sendPbftCommitMessage(PeerConnection peer, BlockCapsule blockCapsul
}
}

private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) throws P2pException {
public boolean isAdvInv(PeerConnection peer, FetchInvDataMessage msg) {
MessageTypes type = msg.getInvMessageType();
if (type == MessageTypes.TRX) {
return true;
}
for (Sha256Hash hash : msg.getHashList()) {
if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.BLOCK)) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多次请求 BLOCK 仅是 isAdvInv 方法返回 false,并不会导致 check 方法失败吧?

Copy link
Contributor

@xxo1shine xxo1shine Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second request will be treated as synchronous and the check will fail. Refer to the code in the check function.

if (!isAdv) {
      if (!peer.isNeedSyncFromUs()) {
        throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
      }
      for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
        long blockNum = new BlockId(hash).getNum();
        long minBlockNum =
            peer.getLastSyncBlockId().getNum() - 2 * NetConstants.SYNC_FETCH_BATCH_NUM;
        if (blockNum < minBlockNum) {
          throw new P2pException(TypeEnum.BAD_MESSAGE,
            "minBlockNum: " + minBlockNum + ", blockNum: " + blockNum);
        }
        if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) {
          throw new P2pException(TypeEnum.BAD_MESSAGE, new BlockId(hash).getString() + " is exist");
        }
        peer.getSyncBlockIdCache().put(hash, System.currentTimeMillis());
      }
    }

return false;
}
}
return true;
}

private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg,
boolean isAdv) throws P2pException {
MessageTypes type = fetchInvDataMsg.getInvMessageType();

if (type == MessageTypes.TRX) {
Expand All @@ -144,38 +164,30 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
+ "maxCount: {}, fetchCount: {}, peer: {}",
maxCount, fetchCount, peer.getInetAddress());
}
} else {
boolean isAdv = true;
}

if (!isAdv) {
if (!peer.isNeedSyncFromUs()) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
}
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.BLOCK)) == null) {
isAdv = false;
break;
long blockNum = new BlockId(hash).getNum();
long minBlockNum =
peer.getLastSyncBlockId().getNum() - 2 * NetConstants.SYNC_FETCH_BATCH_NUM;
if (blockNum < minBlockNum) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"minBlockNum: " + minBlockNum + ", blockNum: " + blockNum);
}
}
if (!isAdv) {
if (!peer.isNeedSyncFromUs()) {
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
if (blockNum > peer.getLastSyncBlockId().getNum()) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"maxBlockNum: " + peer.getLastSyncBlockId().getNum() + ", blockNum: " + blockNum);
}
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
long blockNum = new BlockId(hash).getNum();
long minBlockNum =
peer.getLastSyncBlockId().getNum() - 2 * NetConstants.SYNC_FETCH_BATCH_NUM;
if (blockNum < minBlockNum) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"minBlockNum: " + minBlockNum + ", blockNum: " + blockNum);
}
if (blockNum > peer.getLastSyncBlockId().getNum()) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"maxBlockNum: " + peer.getLastSyncBlockId().getNum() + ", blockNum: " + blockNum);
}
if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
new BlockId(hash).getString() + " is exist");
}
peer.getSyncBlockIdCache().put(hash, System.currentTimeMillis());
if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
new BlockId(hash).getString() + " is exist");
}
peer.getSyncBlockIdCache().put(hash, System.currentTimeMillis());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ public void testProcessMessage() throws Exception {
Assert.assertNotNull(syncBlockIdCache.getIfPresent(blockId));
}

@Test
public void testIsAdvInv() {
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();

List<Sha256Hash> list = new LinkedList<>();
list.add(Sha256Hash.ZERO_HASH);
FetchInvDataMessage msg =
new FetchInvDataMessage(list, Protocol.Inventory.InventoryType.TRX);

boolean isAdv = fetchInvDataMsgHandler.isAdvInv(null, msg);
Assert.assertTrue(isAdv);

PeerConnection peer = Mockito.mock(PeerConnection.class);
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().build();
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);

msg = new FetchInvDataMessage(list, Protocol.Inventory.InventoryType.BLOCK);
isAdv = fetchInvDataMsgHandler.isAdvInv(peer, msg);
Assert.assertTrue(!isAdv);

advInvSpread.put(new Item(Sha256Hash.ZERO_HASH, Protocol.Inventory.InventoryType.BLOCK), 1L);
isAdv = fetchInvDataMsgHandler.isAdvInv(peer, msg);
Assert.assertTrue(isAdv);
}

@Test
public void testSyncFetchCheck() {
BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L);
Expand Down
Loading