Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Announce produced block via block announce #610

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/com/limechain/babe/BabeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@
import com.limechain.babe.coordinator.SlotChangeListener;
import com.limechain.babe.predigest.BabePreDigest;
import com.limechain.babe.state.EpochState;
import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.network.Network;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleWriter;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.block.BlockState;
import com.limechain.storage.crypto.KeyStore;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import org.apache.commons.collections4.map.HashedMap;
import org.springframework.stereotype.Component;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Map;

@Component
public class BabeService implements SlotChangeListener {

private final BlockState blockState = BlockState.getInstance();
private final EpochState epochState;
private final KeyStore keyStore;
private final Map<BigInteger, BabePreDigest> slotToPreRuntimeDigest = new HashedMap<>();
private final Network network = AppBean.getBean(Network.class);

public BabeService(EpochState epochState, KeyStore keyStore) {
this.epochState = epochState;
Expand All @@ -43,4 +55,21 @@ public void slotChanged(SlotChangeEvent event) {
executeEpochLottery(nextEpochIndex);
}
}

public byte[] createEncodedBlockAnnounceMessage(BlockHeader blockHeader) {
BlockAnnounceMessage blockAnnounceMessage = new BlockAnnounceMessage();
blockAnnounceMessage.setHeader(blockHeader);
blockAnnounceMessage.setBestBlock(blockHeader.getHash().equals(blockState.bestBlockHash()));
ByteArrayOutputStream buf = new ByteArrayOutputStream();
try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) {
writer.write(new BlockAnnounceMessageScaleWriter(), blockAnnounceMessage);
} catch (IOException e) {
throw new ScaleEncodingException(e);
}
return buf.toByteArray();
}

public void broadcastBlock(BlockHeader blockHeader) {
network.sendBlockAnnounceMessage(createEncodedBlockAnnounceMessage(blockHeader));
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,11 @@ public void sendMessagesToPeers() {
public void sendNeighbourMessage(PeerId peerId) {
grandpaService.sendNeighbourMessage(this.host, peerId);
}

public void sendBlockAnnounceMessage(byte[] encodedBlockAnnounceMessage) {
kademliaService.getBootNodePeerIds()
.stream()
.distinct()
.forEach(p -> new Thread(() -> blockAnnounceService.sendBlockAnnounceMessage(this.host, p, encodedBlockAnnounceMessage)).start());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ public BlockAnnounceController(Stream stream) {
public void sendHandshake() {
engine.writeHandshakeToStream(stream, stream.remotePeerId());
}

/**
* Sends a block announce message over the controller stream.
*/
public void sendBlockAnnounceMessage(byte[] encodedBlockAnnounceMessage) {
engine.writeBlockAnnounceMessage(stream, stream.remotePeerId(), encodedBlockAnnounceMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ public void writeHandshakeToStream(Stream stream, PeerId peerId) {
log.log(Level.INFO, "Sending handshake to " + peerId);
stream.writeAndFlush(buf.toByteArray());
}

public void writeBlockAnnounceMessage(Stream stream, PeerId peerId, byte[] encodedBlockAnnounceMessage) {
log.log(Level.FINE, "Sending Block Announce message to peer " + peerId);
stream.writeAndFlush(encodedBlockAnnounceMessage);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.limechain.network.protocol.blockannounce;

import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.NetworkService;
import io.libp2p.core.Host;
import io.libp2p.core.PeerId;
import lombok.extern.java.Log;

import java.util.Optional;

@Log
public class BlockAnnounceService extends NetworkService<BlockAnnounce> {
ConnectionManager connectionManager = ConnectionManager.getInstance();
georg-getz marked this conversation as resolved.
Show resolved Hide resolved

public BlockAnnounceService(String protocolId) {
this.protocol = new BlockAnnounce(protocolId, new BlockAnnounceProtocol());
}
Expand All @@ -19,4 +24,20 @@ public void sendHandshake(Host us, PeerId peer) {
log.warning("Error sending handshake request to peer " + peer);
}
}

/**
* Sends a Block Announce message to a peer. If there is no initiator stream opened with the peer,
* sends a handshake instead.
*
* @param us our host object
* @param peerId message receiver
*/
public void sendBlockAnnounceMessage(Host us, PeerId peerId, byte[] encodedBlockAnnounceMessage) {
Optional.ofNullable(connectionManager.getPeerInfo(peerId))
.map(p -> p.getBlockAnnounceStreams().getInitiator())
.ifPresentOrElse(
stream -> new BlockAnnounceController(stream).sendBlockAnnounceMessage(encodedBlockAnnounceMessage),
() -> sendHandshake(us, peerId)
);
georg-getz marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.limechain.network.protocol.blockannounce.scale;

import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import io.emeraldpay.polkaj.scale.ScaleWriter;
import io.emeraldpay.polkaj.scale.writer.BoolWriter;

import java.io.IOException;

public class BlockAnnounceMessageScaleWriter implements ScaleWriter<BlockAnnounceMessage> {
@Override
public void write(ScaleCodecWriter writer, BlockAnnounceMessage message) throws IOException {
BlockHeaderScaleWriter.getInstance().write(writer, message.getHeader());
new BoolWriter().write(writer, message.isBestBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ void sendHanshake() {
verify(engine).writeHandshakeToStream(stream, peerId);
}

@Test
void sendBlockAnnounceMessage() {
byte[] message = {1, 2, 3, 4};
when(stream.remotePeerId()).thenReturn(peerId);
blockAnnounceController.sendBlockAnnounceMessage(message);
verify(engine).writeBlockAnnounceMessage(stream, peerId, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,13 @@ void receiveBlockAnnounceWhenConnectedShouldSyncMessage() {
verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage);
}
}

@Test
void writeBlockAnnounceMessage() {
byte[] message = {1, 2, 3, 4};

blockAnnounceEngine.writeBlockAnnounceMessage(stream, peerId, message);

verify(stream).writeAndFlush(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.limechain.network.protocol.blockannounce;

import com.limechain.network.ConnectionManager;
import com.limechain.network.dto.PeerInfo;
import com.limechain.network.dto.ProtocolStreams;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake;
import com.limechain.utils.RandomGenerationUtils;
Expand All @@ -9,22 +12,25 @@
import io.libp2p.core.AddressBook;
import io.libp2p.core.Host;
import io.libp2p.core.PeerId;
import io.libp2p.core.Stream;
import io.libp2p.core.multiformats.Multiaddr;
import io.libp2p.protocol.Ping;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension;
import org.peergos.HostBuilder;

import java.lang.reflect.Field;
import java.math.BigInteger;
import java.util.List;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class BlockAnnounceServiceTest {
Expand All @@ -38,24 +44,60 @@ class BlockAnnounceServiceTest {
private AddressBook addressBook;
@Mock
private BlockAnnounceController blockAnnounceController;
@Mock
private PeerInfo peerInfo;
@Mock
private Stream stream;
@Mock
private ProtocolStreams protocolStreams;
@Mock
private ConnectionManager connectionManager;

@InjectMocks
private final BlockAnnounceService blockAnnounceService = new BlockAnnounceService("pid");

@BeforeEach
public void setupEach() throws NoSuchFieldException, IllegalAccessException {
when(host.getAddressBook()).thenReturn(addressBook);
setPrivateFieldOfSuperclass(blockAnnounceService, "protocol", protocol);
}

@Test
void sendHandshake() {
when(host.getAddressBook()).thenReturn(addressBook);
when(protocol.dialPeer(host, peerId, addressBook)).thenReturn(blockAnnounceController);

blockAnnounceService.sendHandshake(host, peerId);

verify(blockAnnounceController).sendHandshake();
}

@Test
void sendBlockAnnounceMessageeWhenNotConnectionShouldSendHandshake() {
byte[] message = {1, 2, 3, 4};
when(host.getAddressBook()).thenReturn(addressBook);
when(protocol.dialPeer(host, peerId, addressBook)).thenReturn(blockAnnounceController);

blockAnnounceService.sendBlockAnnounceMessage(host, peerId, message);

verify(blockAnnounceController).sendHandshake();
}

@Test
void sendBlockAnnounceMessageWhenExistingConnection() {
byte[] message = {1, 2, 3, 4};
when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo);
when(peerInfo.getBlockAnnounceStreams()).thenReturn(protocolStreams);
when(protocolStreams.getInitiator()).thenReturn(stream);

try (MockedConstruction<BlockAnnounceController> mock = mockConstruction(BlockAnnounceController.class)) {
blockAnnounceService.sendBlockAnnounceMessage(host, peerId, message);

assertEquals(1, mock.constructed().size());
BlockAnnounceController controller = mock.constructed().getFirst();
verify(controller).sendBlockAnnounceMessage(message);
}
}

@Disabled("This is an integration test")
@Test
void receivesNotifications() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.limechain.network.protocol.blockannounce.scale;

import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
import com.limechain.utils.StringUtils;
import com.limechain.utils.scale.ScaleUtils;
import io.emeraldpay.polkaj.types.Hash256;
import org.junit.jupiter.api.Test;

import java.math.BigInteger;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

class BlockAnnounceMessageScaleTest {

@Test
void blockAnnounceMessageEncodeAndDecodeTest() {
BlockHeader blockHeader = new BlockHeader();
blockHeader.setBlockNumber(BigInteger.ONE);
blockHeader.setParentHash(new Hash256(StringUtils.hexToBytes("0x4545454545454545454545454545454545454545454545454545454545454545")));
blockHeader.setStateRoot(new Hash256(StringUtils.hexToBytes("0xb3266de137d20a5d0ff3a6401eb57127525fd9b2693701f0bf5a8a853fa3ebe0")));
blockHeader.setExtrinsicsRoot(new Hash256(StringUtils.hexToBytes("0x03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314")));
blockHeader.setDigest(new HeaderDigest[]{});
BlockAnnounceMessage blockAnnounceMessage = new BlockAnnounceMessage();
blockAnnounceMessage.setHeader(blockHeader);
blockAnnounceMessage.setBestBlock(true);

byte[] encodedBlockAnnounceMessage = ScaleUtils.Encode.encode(new BlockAnnounceMessageScaleWriter(), blockAnnounceMessage);

BlockAnnounceMessage decodedBlockAnnounceMessage = ScaleUtils.Decode.decode(encodedBlockAnnounceMessage, new BlockAnnounceMessageScaleReader());
assertEquals(blockAnnounceMessage.isBestBlock(), decodedBlockAnnounceMessage.isBestBlock());

BlockHeader decodedHeader = decodedBlockAnnounceMessage.getHeader();
assertEquals(blockHeader.getBlockNumber(), decodedHeader.getBlockNumber());
assertEquals(blockHeader.getParentHash(), decodedHeader.getParentHash());
assertEquals(blockHeader.getStateRoot(), decodedHeader.getStateRoot());
assertEquals(blockHeader.getExtrinsicsRoot(), decodedHeader.getExtrinsicsRoot());
assertArrayEquals(blockHeader.getDigest(), decodedHeader.getDigest());
}
}
Loading