From 6799255e0bd95e0b39f93bb7ad0406c6343ef58d Mon Sep 17 00:00:00 2001 From: rmadhwal Date: Mon, 18 Apr 2022 15:47:48 +0200 Subject: [PATCH 1/4] change params and add upper limit for simultaneously served peers --- .../tudelft/ipv8/messaging/eva/EVAProtocol.kt | 32 +++++-- .../ipv8/messaging/eva/EVAProtocolTest.kt | 91 ++++++++++++++++++- 2 files changed, 113 insertions(+), 10 deletions(-) diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt index 48476506..bff89a34 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt @@ -27,6 +27,7 @@ open class EVAProtocol( var timeoutInterval: Long = TIMEOUT_INTERVAL, var reduceWindowAfterTimeout: Int = REDUCE_WINDOW_AFTER_TIMEOUT, var loggingEnabled: Boolean = true, + private val maxSimultaneousTransfers: Int = 10 ) { private var scheduled: MutableMap> = mutableMapOf() private var incoming: MutableMap = mutableMapOf() @@ -239,7 +240,7 @@ open class EVAProtocol( if (loggingEnabled) logger.debug { "EVAPROTOCOL: Sending binary with id '$id', nonce '$nonceValue' for info '$info'." } - if (outgoing.containsKey(peer.key) || incoming.containsKey(peer.key) || getConnectedPeer(peer.key) == null) { + if (outgoing.containsKey(peer.key) || incoming.containsKey(peer.key) || getConnectedPeer(peer.key) == null || isSimultaneouslyServedTransfersLimitExceeded()) { if (!isScheduled(peer.key, id)) { val windowSize = getWindowSize(peer.key, id) scheduled.addValue( @@ -262,6 +263,16 @@ open class EVAProtocol( startOutgoingTransfer(peer, info, id, data, nonceValue) } + /** + * This asserts an upper limit of simultaneously served peers. + * The reason for introducing this parameter is to have a tool for + * limiting socket load which could lead to packet loss. + */ + private fun isSimultaneouslyServedTransfersLimitExceeded(): Boolean { + val transfersCount = incoming.size + outgoing.size + return transfersCount >= maxSimultaneousTransfers + } + /** * Start an outgoing transfer of data * @@ -364,10 +375,10 @@ open class EVAProtocol( ) val transfer = Transfer(TransferType.INCOMING, scheduledTransfer) - if (loggingEnabled) logger.debug { "EVAPROTOCOL: $transfer"} + if (loggingEnabled) logger.debug { "EVAPROTOCOL: $transfer" } when { - payload.dataSize <= 0u -> { + payload.dataSize <= 0u -> { incomingError( peer, transfer, @@ -429,7 +440,7 @@ open class EVAProtocol( if (loggingEnabled) logger.debug { "EVAPROTOCOL: On acknowledgement for window '${payload.ackWindow}'." } val transfer = outgoing[peer.key] ?: return - if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transfer: $transfer"} + if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transfer: $transfer" } if (isStopped(peer.key, transfer.id)) return @@ -437,7 +448,7 @@ open class EVAProtocol( transfer.updated = Date().time if (payload.ackWindow.toInt() > 0) { - if (loggingEnabled) logger.debug { "EVAPROTOCOL: UNACKED ${payload.unAckedBlocks.toString(Charsets.UTF_8)}"} + if (loggingEnabled) logger.debug { "EVAPROTOCOL: UNACKED ${payload.unAckedBlocks.toString(Charsets.UTF_8)}" } transfer.addUnreceivedBlocks(payload.unAckedBlocks) } @@ -462,7 +473,7 @@ open class EVAProtocol( if (data.isEmpty()) return - if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transmit($blockNumber/${transfer.blockCount-1})" } + if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transmit($blockNumber/${transfer.blockCount - 1})" } val dataPacket = community.createEVAData(peer, blockNumber.toUInt(), transfer.nonce, data) send(peer, dataPacket) @@ -556,7 +567,7 @@ open class EVAProtocol( transfer.getUnreceivedBlocksUntil() } else byteArrayOf() - if (loggingEnabled) logger.debug { "EVAPROTOCOL: Acknowledgement for window '${transfer.ackedWindow}' sent to ${peer.mid} with windowSize (${transfer.windowSize}) and nonce ${transfer.nonce}"} + if (loggingEnabled) logger.debug { "EVAPROTOCOL: Acknowledgement for window '${transfer.ackedWindow}' sent to ${peer.mid} with windowSize (${transfer.windowSize}) and nonce ${transfer.nonce}" } val ackPacket = community.createEVAAcknowledgement( peer, @@ -620,6 +631,9 @@ open class EVAProtocol( * Send the next scheduled transfer */ private fun sendScheduled() { + if (isSimultaneouslyServedTransfersLimitExceeded()) + return + val idlePeerKeys = scheduled .filter { !outgoing.contains(it.key) } .map { it.key } @@ -825,12 +839,12 @@ open class EVAProtocol( companion object { const val MAX_NONCE = Integer.MAX_VALUE.toLong() * 2 - const val BLOCK_SIZE = 1200 + const val BLOCK_SIZE = 600 const val WINDOW_SIZE = 80 const val BINARY_SIZE_LIMIT = 1024 * 1024 * 250 const val RETRANSMIT_INTERVAL = 2000L const val RETRANSMIT_ATTEMPT_COUNT = 3 - const val TIMEOUT_INTERVAL = 12000L + const val TIMEOUT_INTERVAL = 20000L const val REDUCE_WINDOW_AFTER_TIMEOUT = 16 const val MIN_WINDOW_SIZE = 16 const val SCHEDULED_SEND_INTERVAL = 5000L diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt index 7a45d910..1ccaf5a2 100644 --- a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt @@ -1,10 +1,12 @@ package nl.tudelft.ipv8.messaging.eva +import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import kotlinx.coroutines.* -import kotlinx.coroutines.test.runBlockingTest import nl.tudelft.ipv8.* import nl.tudelft.ipv8.keyvault.Key +import nl.tudelft.ipv8.keyvault.PublicKey import nl.tudelft.ipv8.keyvault.defaultCryptoProvider import nl.tudelft.ipv8.peerdiscovery.Network import org.junit.Assert.* @@ -48,6 +50,17 @@ class EVAProtocolTest : BaseCommunityTest() { return ScheduledTransfer(info, data, nonce, id, blockCount, data.size.toULong(), blockSize, windowSize) } + private fun addRandomTransfersToOutgoing(community: TestCommunity) { + val outgoingMap: MutableMap = mutableMapOf() + List(10) { + Pair(mockk(), Transfer(TransferType.OUTGOING, createScheduledTransfer())) + }.forEach { + outgoingMap[it.first] = it.second + } + val outgoing = community.setOutgoing(outgoingMap) + assertEquals(10, outgoing.size) + } + @Suppress("UNCHECKED_CAST") fun Community.getIncoming(): MutableMap { return evaProtocol?.getPrivateProperty("incoming") as MutableMap @@ -218,6 +231,44 @@ class EVAProtocolTest : BaseCommunityTest() { community.unload() } + @Test + fun sendBinaryAddsToScheduledWhenMaxTransfersExceeded() { + val community = spyk(getCommunity()) + val mockPeer = mockk(relaxed = true) + val mockPublicKey = mockk() + + every { mockPublicKey.encrypt(any()) } returns byteArrayOf() + every { mockPeer.publicKey } returns mockPublicKey + every { mockPeer.key } returns mockk() + every { community.getPeers() } returns listOf(mockPeer) + community.load() + + val scheduled = community.getScheduled() + assertEquals(0, scheduled.size) + + community.evaProtocol?.sendBinary( + mockPeer, + Community.EVAId.EVA_PEERCHAT_ATTACHMENT, + "012345678", + byteArrayOf(0, 1, 2, 3, 4, 5) + ) + + assertEquals(0, scheduled.size) + + addRandomTransfersToOutgoing(community) + + community.evaProtocol?.sendBinary( + mockPeer, + Community.EVAId.EVA_PEERCHAT_ATTACHMENT, + "012345678", + byteArrayOf(0, 1, 2, 3, 4, 5) + ) + + assertEquals(1, scheduled.size) + + community.unload() + } + @Test fun startOutgoingTransfer_scheduled() { val community = getCommunity() @@ -889,6 +940,44 @@ class EVAProtocolTest : BaseCommunityTest() { community.unload() } + @Test + fun sendScheduledDoesntSendWhenMaxTransfersExceeded() { + val community = getCommunity() + community.load() + + assertEquals(0, community.getPeers().size) + + val address = IPv4Address("1.2.3.4", 1234) + val peer = Peer(defaultCryptoProvider.generateKey(), address) + community.network.addVerifiedPeer(peer) + community.network.discoverServices(peer, listOf(community.serviceId)) + + assertEquals(1, community.getPeers().size) + + val scheduledTransfer = createScheduledTransfer() + + community.evaProtocol?.let { evaProtocol -> + val scheduled = community.getScheduled() + assertEquals(0, scheduled.size) + scheduled.addValue(peer.key, scheduledTransfer) + assertEquals(1, community.getScheduled().size) + + evaProtocol.javaClass.declaredMethods.first { + it.name == "sendScheduled" + }.let { method -> + method.isAccessible = true + addRandomTransfersToOutgoing(community) + val outgoing = community.getOutgoing() + assertEquals(10, outgoing.size) + method.invoke(evaProtocol) + assertEquals(1, scheduled.size) + assertEquals(10, outgoing.size) + } + } + + community.unload() + } + @Test fun isScheduled_true() { val community = getCommunity() From f3b47f741c7daabc07d9f1147d8ad2f3393be033 Mon Sep 17 00:00:00 2001 From: rmadhwal Date: Mon, 18 Apr 2022 22:23:16 +0200 Subject: [PATCH 2/4] add retries to write request sending --- .../tudelft/ipv8/messaging/eva/EVAProtocol.kt | 32 +++- .../ipv8/messaging/eva/EVAProtocolTest.kt | 147 +++++++++++++++++- 2 files changed, 175 insertions(+), 4 deletions(-) diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt index bff89a34..9d32b9d3 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt @@ -14,7 +14,7 @@ private val logger = KotlinLogging.logger {} open class EVAProtocol( private var community: Community, - scope: CoroutineScope, + private val scope: CoroutineScope, var blockSize: Int = BLOCK_SIZE, var windowSize: Int = WINDOW_SIZE, var binarySizeLimit: Int = BINARY_SIZE_LIMIT, @@ -339,6 +339,36 @@ open class EVAProtocol( outgoing[peer.key] = transfer scheduleTerminate(outgoing, peer, transfer) + sendWriteRequest(peer, info, id, nonce, transfer) + retryWriteRequestIfNeeded(transfer, peer) + } + + private fun retryWriteRequestIfNeeded( + transfer: Transfer, + peer: Peer, + ) { + scope.launch { + if (retransmitEnabled) { + for (attempt in 1..retransmitAttemptCount) { + delay(retransmitInterval) + if (transfer.released || transfer.ackedWindow != 0) + return@launch + + val currentAttempt = "$attempt/$retransmitAttemptCount" + if (loggingEnabled) logger.debug { "EVAPROTOCOL: Retrying Write Request. Attempt $currentAttempt for peer: $peer" } + sendWriteRequest(peer, transfer.info, transfer.id, transfer.nonce.toLong(), transfer) + } + } + } + } + + private fun sendWriteRequest( + peer: Peer, + info: String, + id: String, + nonce: Long, + transfer: Transfer + ) { val writeRequestPacket = community.createEVAWriteRequest( peer, info, diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt index 1ccaf5a2..9ca4f9a4 100644 --- a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt @@ -1,13 +1,16 @@ package nl.tudelft.ipv8.messaging.eva -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk +import io.mockk.* import kotlinx.coroutines.* +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import nl.tudelft.ipv8.* import nl.tudelft.ipv8.keyvault.Key +import nl.tudelft.ipv8.keyvault.PrivateKey import nl.tudelft.ipv8.keyvault.PublicKey import nl.tudelft.ipv8.keyvault.defaultCryptoProvider +import nl.tudelft.ipv8.messaging.EndpointAggregator +import nl.tudelft.ipv8.messaging.Packet import nl.tudelft.ipv8.peerdiscovery.Network import org.junit.Assert.* import org.junit.Test @@ -349,6 +352,144 @@ class EVAProtocolTest : BaseCommunityTest() { community.unload() } + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun startOutgoingTransferRetriesWriteRequestOnFailure() = runBlockingTest { + val community = spyk(getCommunity()) + community.load() + + assertEquals(0, community.getPeers().size) + + val address = IPv4Address("1.2.3.4", 1234) + val peer = Peer(defaultCryptoProvider.generateKey(), address) + + val mockAggregator = mockk(relaxed = true) + every { community.endpoint } returns mockAggregator + + community.network.addVerifiedPeer(peer) + community.network.discoverServices(peer, listOf(community.serviceId)) + + assertEquals(1, community.getPeers().size) + + val scope = TestCoroutineScope() + community.evaProtocol = EVAProtocol(community, scope, timeoutInterval = 30000) + + community.evaProtocol?.let { evaProtocol -> + evaProtocol.javaClass.declaredMethods.first { + it.name == "startOutgoingTransfer" + }.let { method -> + method.isAccessible = true + + val info = Community.EVAId.EVA_PEERCHAT_ATTACHMENT + val id = "012345678" + val nonce = 1234 + val data = byteArrayOf(0, 1, 2, 3, 4, 5) + + val writeRequestSlots = mutableListOf() + method.invoke( + evaProtocol, + peer, + info, + id, + data, + nonce + ) + + for (i in 1..evaProtocol.retransmitAttemptCount) { + scope.advanceTimeBy(evaProtocol.retransmitInterval) + delay(1000) + } + + verify( + exactly = evaProtocol.retransmitAttemptCount + 1, + timeout = (evaProtocol.retransmitAttemptCount + 1) * evaProtocol.retransmitInterval + ) { + mockAggregator.send(peer, capture(writeRequestSlots)) + } + + val payloads = writeRequestSlots.map { + val packet = Packet(peer.address, it) + val (_, payload) = packet.getDecryptedAuthPayload( + EVAWriteRequestPayload.Deserializer, peer.key as PrivateKey + ) + payload + } + + val payloadSet = setOf(payloads) + assertEquals(1, payloadSet.size) + + assertEquals(id, payloads[0].id) + assertEquals(info, payloads[0].info) + assertEquals(data.size.toULong(), payloads[0].dataSize) + assertEquals(nonce.toULong(), payloads[0].nonce) + + } + } + + community.unload() + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun doesNotRetryWriteRequestIfNotNeeded() = runBlockingTest { + val community = spyk(getCommunity()) + community.load() + + assertEquals(0, community.getPeers().size) + + val address = IPv4Address("1.2.3.4", 1234) + val peer = Peer(defaultCryptoProvider.generateKey(), address) + + val mockAggregator = mockk(relaxed = true) + every { community.endpoint } returns mockAggregator + + community.network.addVerifiedPeer(peer) + community.network.discoverServices(peer, listOf(community.serviceId)) + + assertEquals(1, community.getPeers().size) + + val scope = TestCoroutineScope() + + community.evaProtocol = EVAProtocol(community, scope, timeoutInterval = 30000) + + community.evaProtocol?.let { evaProtocol -> + evaProtocol.javaClass.declaredMethods.first { + it.name == "retryWriteRequestIfNeeded" + }.let { method -> + method.isAccessible = true + + val info = Community.EVAId.EVA_PEERCHAT_ATTACHMENT + val id = "012345678" + val nonce = 1234 + val data = byteArrayOf(0, 1, 2, 3, 4, 5) + + val scheduledTransfer = ScheduledTransfer(info, data, nonce.toULong(), id, 0, data.size.toULong(), 0, 0) + val transfer = Transfer(TransferType.OUTGOING, scheduledTransfer) + transfer.release() + + method.invoke( + evaProtocol, + transfer, + peer + ) + + for (i in 1..evaProtocol.retransmitAttemptCount) { + scope.advanceTimeBy(evaProtocol.retransmitInterval) + delay(1000) + } + + verify( + exactly = 0, + timeout = (evaProtocol.retransmitAttemptCount + 1) * evaProtocol.retransmitInterval + ) { + mockAggregator.send(peer, any()) + } + } + } + + community.unload() + } + @Test fun startOutgoingTransfer_datasize_over_limit() = runBlocking { val community = getCommunity() From e0615697f24ef33d09f2f6d72e0ca13ad3ef3a15 Mon Sep 17 00:00:00 2001 From: rmadhwal Date: Mon, 18 Apr 2022 23:44:13 +0200 Subject: [PATCH 3/4] remove superfluous params --- .../nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt index 9d32b9d3..fe58a623 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt @@ -339,7 +339,7 @@ open class EVAProtocol( outgoing[peer.key] = transfer scheduleTerminate(outgoing, peer, transfer) - sendWriteRequest(peer, info, id, nonce, transfer) + sendWriteRequest(peer, transfer) retryWriteRequestIfNeeded(transfer, peer) } @@ -356,7 +356,7 @@ open class EVAProtocol( val currentAttempt = "$attempt/$retransmitAttemptCount" if (loggingEnabled) logger.debug { "EVAPROTOCOL: Retrying Write Request. Attempt $currentAttempt for peer: $peer" } - sendWriteRequest(peer, transfer.info, transfer.id, transfer.nonce.toLong(), transfer) + sendWriteRequest(peer, transfer) } } } @@ -364,16 +364,13 @@ open class EVAProtocol( private fun sendWriteRequest( peer: Peer, - info: String, - id: String, - nonce: Long, transfer: Transfer ) { val writeRequestPacket = community.createEVAWriteRequest( peer, - info, - id, - nonce.toULong(), + transfer.info, + transfer.id, + transfer.nonce, transfer.dataSize, transfer.blockCount.toUInt(), transfer.blockSize.toUInt(), From 78465f252cd3244851c3deced87c57afced3e563 Mon Sep 17 00:00:00 2001 From: rmadhwal Date: Mon, 18 Apr 2022 23:51:42 +0200 Subject: [PATCH 4/4] formatting --- .../main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt | 4 ++-- .../java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt index fe58a623..dd39d8bc 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt @@ -340,12 +340,12 @@ open class EVAProtocol( scheduleTerminate(outgoing, peer, transfer) sendWriteRequest(peer, transfer) - retryWriteRequestIfNeeded(transfer, peer) + retryWriteRequestIfNeeded(peer, transfer) } private fun retryWriteRequestIfNeeded( - transfer: Transfer, peer: Peer, + transfer: Transfer ) { scope.launch { if (retransmitEnabled) { diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt index 9ca4f9a4..3702517f 100644 --- a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/eva/EVAProtocolTest.kt @@ -469,8 +469,8 @@ class EVAProtocolTest : BaseCommunityTest() { method.invoke( evaProtocol, - transfer, - peer + peer, + transfer ) for (i in 1..evaProtocol.retransmitAttemptCount) {