diff --git a/src/main/java/com/limechain/babe/BabeService.java b/src/main/java/com/limechain/babe/BabeService.java index 28c110791..2798e6ac1 100644 --- a/src/main/java/com/limechain/babe/BabeService.java +++ b/src/main/java/com/limechain/babe/BabeService.java @@ -4,19 +4,31 @@ import com.limechain.babe.coordinator.SlotChangeListener; import com.limechain.babe.predigest.BabePreDigest; import com.limechain.babe.state.EpochState; +import com.limechain.exception.scale.ScaleEncodingException; +import com.limechain.network.Network; +import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; +import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleWriter; +import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.rpc.server.AppBean; +import com.limechain.storage.block.BlockState; import com.limechain.storage.crypto.KeyStore; +import io.emeraldpay.polkaj.scale.ScaleCodecWriter; import org.apache.commons.collections4.map.HashedMap; import org.springframework.stereotype.Component; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.math.BigInteger; import java.util.Map; @Component public class BabeService implements SlotChangeListener { + private final BlockState blockState = BlockState.getInstance(); private final EpochState epochState; private final KeyStore keyStore; private final Map slotToPreRuntimeDigest = new HashedMap<>(); + private final Network network = AppBean.getBean(Network.class); public BabeService(EpochState epochState, KeyStore keyStore) { this.epochState = epochState; @@ -43,4 +55,21 @@ public void slotChanged(SlotChangeEvent event) { executeEpochLottery(nextEpochIndex); } } + + public byte[] createEncodedBlockAnnounceMessage(BlockHeader blockHeader) { + BlockAnnounceMessage blockAnnounceMessage = new BlockAnnounceMessage(); + blockAnnounceMessage.setHeader(blockHeader); + blockAnnounceMessage.setBestBlock(blockHeader.getHash().equals(blockState.bestBlockHash())); + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { + writer.write(new BlockAnnounceMessageScaleWriter(), blockAnnounceMessage); + } catch (IOException e) { + throw new ScaleEncodingException(e); + } + return buf.toByteArray(); + } + + public void broadcastBlock(BlockHeader blockHeader) { + network.sendBlockAnnounceMessage(createEncodedBlockAnnounceMessage(blockHeader)); + } } diff --git a/src/main/java/com/limechain/network/Network.java b/src/main/java/com/limechain/network/Network.java index 2aeffa4b8..93b23155d 100644 --- a/src/main/java/com/limechain/network/Network.java +++ b/src/main/java/com/limechain/network/Network.java @@ -313,4 +313,11 @@ public void sendMessagesToPeers() { public void sendNeighbourMessage(PeerId peerId) { grandpaService.sendNeighbourMessage(this.host, peerId); } + + public void sendBlockAnnounceMessage(byte[] encodedBlockAnnounceMessage) { + kademliaService.getBootNodePeerIds() + .stream() + .distinct() + .forEach(p -> new Thread(() -> blockAnnounceService.sendBlockAnnounceMessage(this.host, p, encodedBlockAnnounceMessage)).start()); + } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java index dee675984..34b609ac4 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java @@ -13,4 +13,11 @@ public BlockAnnounceController(Stream stream) { public void sendHandshake() { engine.writeHandshakeToStream(stream, stream.remotePeerId()); } + + /** + * Sends a block announce message over the controller stream. + */ + public void sendBlockAnnounceMessage(byte[] encodedBlockAnnounceMessage) { + engine.writeBlockAnnounceMessage(stream, stream.remotePeerId(), encodedBlockAnnounceMessage); + } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 5d9fe542b..c006c5526 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -110,4 +110,9 @@ public void writeHandshakeToStream(Stream stream, PeerId peerId) { log.log(Level.INFO, "Sending handshake to " + peerId); stream.writeAndFlush(buf.toByteArray()); } + + public void writeBlockAnnounceMessage(Stream stream, PeerId peerId, byte[] encodedBlockAnnounceMessage) { + log.log(Level.FINE, "Sending Block Announce message to peer " + peerId); + stream.writeAndFlush(encodedBlockAnnounceMessage); + } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java index d14e0b485..359a22b74 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java @@ -1,12 +1,17 @@ package com.limechain.network.protocol.blockannounce; +import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.NetworkService; import io.libp2p.core.Host; import io.libp2p.core.PeerId; import lombok.extern.java.Log; +import java.util.Optional; + @Log public class BlockAnnounceService extends NetworkService { + ConnectionManager connectionManager = ConnectionManager.getInstance(); + public BlockAnnounceService(String protocolId) { this.protocol = new BlockAnnounce(protocolId, new BlockAnnounceProtocol()); } @@ -19,4 +24,20 @@ public void sendHandshake(Host us, PeerId peer) { log.warning("Error sending handshake request to peer " + peer); } } + + /** + * Sends a Block Announce message to a peer. If there is no initiator stream opened with the peer, + * sends a handshake instead. + * + * @param us our host object + * @param peerId message receiver + */ + public void sendBlockAnnounceMessage(Host us, PeerId peerId, byte[] encodedBlockAnnounceMessage) { + Optional.ofNullable(connectionManager.getPeerInfo(peerId)) + .map(p -> p.getBlockAnnounceStreams().getInitiator()) + .ifPresentOrElse( + stream -> new BlockAnnounceController(stream).sendBlockAnnounceMessage(encodedBlockAnnounceMessage), + () -> sendHandshake(us, peerId) + ); + } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleWriter.java b/src/main/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleWriter.java new file mode 100644 index 000000000..049356c06 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleWriter.java @@ -0,0 +1,16 @@ +package com.limechain.network.protocol.blockannounce.scale; + +import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; +import io.emeraldpay.polkaj.scale.ScaleCodecWriter; +import io.emeraldpay.polkaj.scale.ScaleWriter; +import io.emeraldpay.polkaj.scale.writer.BoolWriter; + +import java.io.IOException; + +public class BlockAnnounceMessageScaleWriter implements ScaleWriter { + @Override + public void write(ScaleCodecWriter writer, BlockAnnounceMessage message) throws IOException { + BlockHeaderScaleWriter.getInstance().write(writer, message.getHeader()); + new BoolWriter().write(writer, message.isBestBlock()); + } +} diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java index 696a52623..21314c8b1 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java @@ -39,4 +39,11 @@ void sendHanshake() { verify(engine).writeHandshakeToStream(stream, peerId); } + @Test + void sendBlockAnnounceMessage() { + byte[] message = {1, 2, 3, 4}; + when(stream.remotePeerId()).thenReturn(peerId); + blockAnnounceController.sendBlockAnnounceMessage(message); + verify(engine).writeBlockAnnounceMessage(stream, peerId, message); + } } \ No newline at end of file diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java index cd7da4ad3..75678288e 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java @@ -149,4 +149,13 @@ void receiveBlockAnnounceWhenConnectedShouldSyncMessage() { verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage); } } + + @Test + void writeBlockAnnounceMessage() { + byte[] message = {1, 2, 3, 4}; + + blockAnnounceEngine.writeBlockAnnounceMessage(stream, peerId, message); + + verify(stream).writeAndFlush(message); + } } \ No newline at end of file diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceServiceTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceServiceTest.java index 46e60cdda..1592dca76 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceServiceTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceServiceTest.java @@ -1,5 +1,8 @@ package com.limechain.network.protocol.blockannounce; +import com.limechain.network.ConnectionManager; +import com.limechain.network.dto.PeerInfo; +import com.limechain.network.dto.ProtocolStreams; import com.limechain.network.kad.KademliaService; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.utils.RandomGenerationUtils; @@ -9,13 +12,16 @@ import io.libp2p.core.AddressBook; import io.libp2p.core.Host; import io.libp2p.core.PeerId; +import io.libp2p.core.Stream; import io.libp2p.core.multiformats.Multiaddr; import io.libp2p.protocol.Ping; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedConstruction; import org.mockito.junit.jupiter.MockitoExtension; import org.peergos.HostBuilder; @@ -23,8 +29,8 @@ import java.math.BigInteger; import java.util.List; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) class BlockAnnounceServiceTest { @@ -38,17 +44,26 @@ class BlockAnnounceServiceTest { private AddressBook addressBook; @Mock private BlockAnnounceController blockAnnounceController; + @Mock + private PeerInfo peerInfo; + @Mock + private Stream stream; + @Mock + private ProtocolStreams protocolStreams; + @Mock + private ConnectionManager connectionManager; + @InjectMocks private final BlockAnnounceService blockAnnounceService = new BlockAnnounceService("pid"); @BeforeEach public void setupEach() throws NoSuchFieldException, IllegalAccessException { - when(host.getAddressBook()).thenReturn(addressBook); setPrivateFieldOfSuperclass(blockAnnounceService, "protocol", protocol); } @Test void sendHandshake() { + when(host.getAddressBook()).thenReturn(addressBook); when(protocol.dialPeer(host, peerId, addressBook)).thenReturn(blockAnnounceController); blockAnnounceService.sendHandshake(host, peerId); @@ -56,6 +71,33 @@ void sendHandshake() { verify(blockAnnounceController).sendHandshake(); } + @Test + void sendBlockAnnounceMessageeWhenNotConnectionShouldSendHandshake() { + byte[] message = {1, 2, 3, 4}; + when(host.getAddressBook()).thenReturn(addressBook); + when(protocol.dialPeer(host, peerId, addressBook)).thenReturn(blockAnnounceController); + + blockAnnounceService.sendBlockAnnounceMessage(host, peerId, message); + + verify(blockAnnounceController).sendHandshake(); + } + + @Test + void sendBlockAnnounceMessageWhenExistingConnection() { + byte[] message = {1, 2, 3, 4}; + when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo); + when(peerInfo.getBlockAnnounceStreams()).thenReturn(protocolStreams); + when(protocolStreams.getInitiator()).thenReturn(stream); + + try (MockedConstruction mock = mockConstruction(BlockAnnounceController.class)) { + blockAnnounceService.sendBlockAnnounceMessage(host, peerId, message); + + assertEquals(1, mock.constructed().size()); + BlockAnnounceController controller = mock.constructed().getFirst(); + verify(controller).sendBlockAnnounceMessage(message); + } + } + @Disabled("This is an integration test") @Test void receivesNotifications() { diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleTest.java new file mode 100644 index 000000000..d1574d90e --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/blockannounce/scale/BlockAnnounceMessageScaleTest.java @@ -0,0 +1,42 @@ +package com.limechain.network.protocol.blockannounce.scale; + +import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; +import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.network.protocol.warp.dto.HeaderDigest; +import com.limechain.utils.StringUtils; +import com.limechain.utils.scale.ScaleUtils; +import io.emeraldpay.polkaj.types.Hash256; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class BlockAnnounceMessageScaleTest { + + @Test + void blockAnnounceMessageEncodeAndDecodeTest() { + BlockHeader blockHeader = new BlockHeader(); + blockHeader.setBlockNumber(BigInteger.ONE); + blockHeader.setParentHash(new Hash256(StringUtils.hexToBytes("0x4545454545454545454545454545454545454545454545454545454545454545"))); + blockHeader.setStateRoot(new Hash256(StringUtils.hexToBytes("0xb3266de137d20a5d0ff3a6401eb57127525fd9b2693701f0bf5a8a853fa3ebe0"))); + blockHeader.setExtrinsicsRoot(new Hash256(StringUtils.hexToBytes("0x03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314"))); + blockHeader.setDigest(new HeaderDigest[]{}); + BlockAnnounceMessage blockAnnounceMessage = new BlockAnnounceMessage(); + blockAnnounceMessage.setHeader(blockHeader); + blockAnnounceMessage.setBestBlock(true); + + byte[] encodedBlockAnnounceMessage = ScaleUtils.Encode.encode(new BlockAnnounceMessageScaleWriter(), blockAnnounceMessage); + + BlockAnnounceMessage decodedBlockAnnounceMessage = ScaleUtils.Decode.decode(encodedBlockAnnounceMessage, new BlockAnnounceMessageScaleReader()); + assertEquals(blockAnnounceMessage.isBestBlock(), decodedBlockAnnounceMessage.isBestBlock()); + + BlockHeader decodedHeader = decodedBlockAnnounceMessage.getHeader(); + assertEquals(blockHeader.getBlockNumber(), decodedHeader.getBlockNumber()); + assertEquals(blockHeader.getParentHash(), decodedHeader.getParentHash()); + assertEquals(blockHeader.getStateRoot(), decodedHeader.getStateRoot()); + assertEquals(blockHeader.getExtrinsicsRoot(), decodedHeader.getExtrinsicsRoot()); + assertArrayEquals(blockHeader.getDigest(), decodedHeader.getDigest()); + } +}