Skip to content

Commit

Permalink
Merge pull request #63 from rmadhwal/master
Browse files Browse the repository at this point in the history
EVA Protocol improvements
  • Loading branch information
InvictusRMC authored Mar 13, 2023
2 parents 9412cb6 + 78465f2 commit ac31dab
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 14 deletions.
67 changes: 54 additions & 13 deletions ipv8/src/main/java/nl/tudelft/ipv8/messaging/eva/EVAProtocol.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private val logger = KotlinLogging.logger {}

open class EVAProtocol(
private var community: Community,
scope: CoroutineScope,
private val scope: CoroutineScope,
var blockSize: Int = BLOCK_SIZE,
var windowSize: Int = WINDOW_SIZE,
var binarySizeLimit: Int = BINARY_SIZE_LIMIT,
Expand All @@ -27,6 +27,7 @@ open class EVAProtocol(
var timeoutInterval: Long = TIMEOUT_INTERVAL,
var reduceWindowAfterTimeout: Int = REDUCE_WINDOW_AFTER_TIMEOUT,
var loggingEnabled: Boolean = true,
private val maxSimultaneousTransfers: Int = 10
) {
private var scheduled: MutableMap<Key, Queue<ScheduledTransfer>> = mutableMapOf()
private var incoming: MutableMap<Key, Transfer> = mutableMapOf()
Expand Down Expand Up @@ -239,7 +240,7 @@ open class EVAProtocol(

if (loggingEnabled) logger.debug { "EVAPROTOCOL: Sending binary with id '$id', nonce '$nonceValue' for info '$info'." }

if (outgoing.containsKey(peer.key) || incoming.containsKey(peer.key) || getConnectedPeer(peer.key) == null) {
if (outgoing.containsKey(peer.key) || incoming.containsKey(peer.key) || getConnectedPeer(peer.key) == null || isSimultaneouslyServedTransfersLimitExceeded()) {
if (!isScheduled(peer.key, id)) {
val windowSize = getWindowSize(peer.key, id)
scheduled.addValue(
Expand All @@ -262,6 +263,16 @@ open class EVAProtocol(
startOutgoingTransfer(peer, info, id, data, nonceValue)
}

/**
* This asserts an upper limit of simultaneously served peers.
* The reason for introducing this parameter is to have a tool for
* limiting socket load which could lead to packet loss.
*/
private fun isSimultaneouslyServedTransfersLimitExceeded(): Boolean {
val transfersCount = incoming.size + outgoing.size
return transfersCount >= maxSimultaneousTransfers
}

/**
* Start an outgoing transfer of data
*
Expand Down Expand Up @@ -328,11 +339,38 @@ open class EVAProtocol(
outgoing[peer.key] = transfer
scheduleTerminate(outgoing, peer, transfer)

sendWriteRequest(peer, transfer)
retryWriteRequestIfNeeded(peer, transfer)
}

private fun retryWriteRequestIfNeeded(
peer: Peer,
transfer: Transfer
) {
scope.launch {
if (retransmitEnabled) {
for (attempt in 1..retransmitAttemptCount) {
delay(retransmitInterval)
if (transfer.released || transfer.ackedWindow != 0)
return@launch

val currentAttempt = "$attempt/$retransmitAttemptCount"
if (loggingEnabled) logger.debug { "EVAPROTOCOL: Retrying Write Request. Attempt $currentAttempt for peer: $peer" }
sendWriteRequest(peer, transfer)
}
}
}
}

private fun sendWriteRequest(
peer: Peer,
transfer: Transfer
) {
val writeRequestPacket = community.createEVAWriteRequest(
peer,
info,
id,
nonce.toULong(),
transfer.info,
transfer.id,
transfer.nonce,
transfer.dataSize,
transfer.blockCount.toUInt(),
transfer.blockSize.toUInt(),
Expand Down Expand Up @@ -364,10 +402,10 @@ open class EVAProtocol(
)
val transfer = Transfer(TransferType.INCOMING, scheduledTransfer)

if (loggingEnabled) logger.debug { "EVAPROTOCOL: $transfer"}
if (loggingEnabled) logger.debug { "EVAPROTOCOL: $transfer" }

when {
payload.dataSize <= 0u -> {
payload.dataSize <= 0u -> {
incomingError(
peer,
transfer,
Expand Down Expand Up @@ -429,15 +467,15 @@ open class EVAProtocol(
if (loggingEnabled) logger.debug { "EVAPROTOCOL: On acknowledgement for window '${payload.ackWindow}'." }

val transfer = outgoing[peer.key] ?: return
if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transfer: $transfer"}
if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transfer: $transfer" }

if (isStopped(peer.key, transfer.id)) return

if (transfer.nonce != payload.nonce) return
transfer.updated = Date().time

if (payload.ackWindow.toInt() > 0) {
if (loggingEnabled) logger.debug { "EVAPROTOCOL: UNACKED ${payload.unAckedBlocks.toString(Charsets.UTF_8)}"}
if (loggingEnabled) logger.debug { "EVAPROTOCOL: UNACKED ${payload.unAckedBlocks.toString(Charsets.UTF_8)}" }
transfer.addUnreceivedBlocks(payload.unAckedBlocks)
}

Expand All @@ -462,7 +500,7 @@ open class EVAProtocol(

if (data.isEmpty()) return

if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transmit($blockNumber/${transfer.blockCount-1})" }
if (loggingEnabled) logger.debug { "EVAPROTOCOL: Transmit($blockNumber/${transfer.blockCount - 1})" }

val dataPacket = community.createEVAData(peer, blockNumber.toUInt(), transfer.nonce, data)
send(peer, dataPacket)
Expand Down Expand Up @@ -556,7 +594,7 @@ open class EVAProtocol(
transfer.getUnreceivedBlocksUntil()
} else byteArrayOf()

if (loggingEnabled) logger.debug { "EVAPROTOCOL: Acknowledgement for window '${transfer.ackedWindow}' sent to ${peer.mid} with windowSize (${transfer.windowSize}) and nonce ${transfer.nonce}"}
if (loggingEnabled) logger.debug { "EVAPROTOCOL: Acknowledgement for window '${transfer.ackedWindow}' sent to ${peer.mid} with windowSize (${transfer.windowSize}) and nonce ${transfer.nonce}" }

val ackPacket = community.createEVAAcknowledgement(
peer,
Expand Down Expand Up @@ -620,6 +658,9 @@ open class EVAProtocol(
* Send the next scheduled transfer
*/
private fun sendScheduled() {
if (isSimultaneouslyServedTransfersLimitExceeded())
return

val idlePeerKeys = scheduled
.filter { !outgoing.contains(it.key) }
.map { it.key }
Expand Down Expand Up @@ -825,12 +866,12 @@ open class EVAProtocol(

companion object {
const val MAX_NONCE = Integer.MAX_VALUE.toLong() * 2
const val BLOCK_SIZE = 1200
const val BLOCK_SIZE = 600
const val WINDOW_SIZE = 80
const val BINARY_SIZE_LIMIT = 1024 * 1024 * 250
const val RETRANSMIT_INTERVAL = 2000L
const val RETRANSMIT_ATTEMPT_COUNT = 3
const val TIMEOUT_INTERVAL = 12000L
const val TIMEOUT_INTERVAL = 20000L
const val REDUCE_WINDOW_AFTER_TIMEOUT = 16
const val MIN_WINDOW_SIZE = 16
const val SCHEDULED_SEND_INTERVAL = 5000L
Expand Down
Loading

0 comments on commit ac31dab

Please sign in to comment.