Skip to content

Commit

Permalink
Try aggressively resending
Browse files Browse the repository at this point in the history
  • Loading branch information
Forest committed Feb 29, 2024
1 parent d954b75 commit 8a139a0
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 49 deletions.
13 changes: 12 additions & 1 deletion Hazel.UnitTests/Reliability/PacketDropTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Hazel.Udp.FewerThreads;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;

Expand All @@ -10,6 +11,7 @@ namespace Hazel.UnitTests.Reliability
[TestClass]
public class PacketDropTests
{
// This test fails because even at 10% packet drop and 10ms
[TestMethod]
public void SustainedPacketLossShouldBeFine()
{
Expand All @@ -21,8 +23,17 @@ public void SustainedPacketLossShouldBeFine()
using (SocketCapture capture = new UnreliableSocketCapture(clientEp, serverEp, logger))
using (ThreadLimitedUdpConnectionListener server = new ThreadLimitedUdpConnectionListener(4, serverEp, logger))
using (UnityUdpClientConnection client = new UnityUdpClientConnection(logger, clientEp))
using (Timer timer = new Timer(_ => client.FixedUpdate(), null, 10, 10))
using (Timer timer = new Timer(_ =>
{
var up = Stopwatch.StartNew();
var cnt = client.FixedUpdate();
if (cnt != 0)
{
logger.WriteInfo($"Took {up.ElapsedMilliseconds}ms to resend {cnt} pkts");
}
}, null, 100, 100))
{
server.ReliableResendPollRateMs = 10;
UdpConnection serverClient = null;
server.NewConnection += (evt) => serverClient = (UdpConnection)evt.Connection;

Expand Down
7 changes: 6 additions & 1 deletion Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ private struct ReceiveMessageInfo

private const int SendReceiveBufferSize = 1024 * 1024;

/// <summary>
/// How frequently sent reliable messages are checked for needing resend.
/// </summary>
public int ReliableResendPollRateMs = 100;

private Socket socket;
protected ILogger Logger;

Expand Down Expand Up @@ -154,7 +159,7 @@ private void ManageReliablePackets()
sock.ManageReliablePackets();
}

Thread.Sleep(100);
Thread.Sleep(this.ReliableResendPollRateMs);
}
}

Expand Down
130 changes: 85 additions & 45 deletions Hazel/Udp/UdpConnection.Reliable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,20 @@ namespace Hazel.Udp
{
partial class UdpConnection
{
private const int MinResendDelayMs = 50;
private const int MaxInitialResendDelayMs = 300;
private const int MaxAdditionalResendDelayMs = 1000;
/// <summary>
/// The minimum delay to resend a packet for the first time. Even if <see cref="AveragePingMs"/> times <see cref="ResendPingMultiplier"/> is less.
/// </summary>
public int MinResendDelayMs = 50;

/// <summary>
/// The maximum delay to resend a packet for the first time. Even if <see cref="AveragePingMs"/> times <see cref="ResendPingMultiplier"/> is more.
/// </summary>
public int MaxInitialResendDelayMs = 300;

/// <summary>
/// The maximum delay to resend a packet after the first resend.
/// </summary>
public int MaxAdditionalResendDelayMs = 1000;

public readonly ObjectPool<Packet> PacketPool;

Expand All @@ -19,12 +30,12 @@ partial class UdpConnection
/// </summary>
/// <remarks>
/// <para>
/// For reliable delivery data is resent at specified intervals unless an acknowledgement is received from the
/// Reliable messages are resent at specified intervals unless an acknowledgement is received from the
/// receiving device. The ResendTimeout specifies the interval between the packets being resent, each time a packet
/// is resent the interval is increased for that packet until the duration exceeds the <see cref="DisconnectTimeoutMs"/> value.
/// </para>
/// <para>
/// Setting this to its default of 0 will mean the timeout is 2 times the value of the average ping, usually
/// Setting this to its default of 0 will mean the timeout is <see cref="ResendPingMultiplier"/> times the value of the average ping, usually
/// resulting in a more dynamic resend that responds to endpoints on slower or faster connections.
/// </para>
/// </remarks>
Expand Down Expand Up @@ -95,7 +106,7 @@ public class Packet : IRecyclable
private readonly UdpConnection Connection;
private int Length;

public int NextTimeoutMs;
private int NextTimeoutMs;
private volatile bool Acknowledged;

public Action AckCallback;
Expand Down Expand Up @@ -126,53 +137,68 @@ internal void Set(ushort id, ILogger logger, SmartBuffer data, int length, int t
}

// Packets resent
public int Resend()
public int Resend(bool force = false)
{
if (this.Acknowledged)
{
return 0;
}

var connection = this.Connection;
if (!this.Acknowledged && connection != null)
int lifetimeMs = (int)this.Stopwatch.ElapsedMilliseconds;
if (lifetimeMs >= connection.DisconnectTimeoutMs)
{
long lifetimeMs = this.Stopwatch.ElapsedMilliseconds;
if (lifetimeMs >= connection.DisconnectTimeoutMs)
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetimeMs}ms ({self.Retransmissions} resends)");
connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {lifetimeMs}ms ({self.Retransmissions} resends)");

self.Recycle();
}
self.Recycle();
}

return 0;
}

if (force || lifetimeMs >= this.NextTimeoutMs)
{
// Enforce 10 ms min resend delay
if (this.NextTimeoutMs > lifetimeMs + 10)
{
return 0;
}

if (lifetimeMs >= this.NextTimeoutMs)
++this.Retransmissions;
if (connection.ResendLimit != 0
&& this.Retransmissions > connection.ResendLimit)
{
++this.Retransmissions;
if (connection.ResendLimit != 0
&& this.Retransmissions > connection.ResendLimit)
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
if (connection.reliableDataPacketsSent.TryRemove(this.Id, out Packet self))
{
connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetimeMs}ms)");
connection.DisconnectInternal(HazelInternalErrors.ReliablePacketWithoutResponse, $"Reliable packet {self.Id} (size={this.Length}) was not ack'd after {self.Retransmissions} resends ({lifetimeMs}ms)");

self.Recycle();
}

return 0;
self.Recycle();
}

this.NextTimeoutMs += (int)Math.Min(this.NextTimeoutMs * connection.ResendPingMultiplier, MaxAdditionalResendDelayMs);
try
{
this.logger.WriteVerbose($"Resent message id {this.Data[1] >> 8 | this.Data[2]} after {lifetimeMs}ms");
return 0;
}

connection.WriteBytesToConnection(this.Data, this.Length);
connection.Statistics.LogMessageResent();
return 1;
}
catch (InvalidOperationException)
{
connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected");
}
#if DEBUG
this.logger.WriteVerbose($"Resent message id {this.Data[1] >> 8 | this.Data[2]} after {lifetimeMs}ms {this.NextTimeoutMs - lifetimeMs}ms delta (Forced: {force})");
#endif

if (force)
{
this.NextTimeoutMs = lifetimeMs;
}

this.NextTimeoutMs = connection.CalculateNextResendDelayMs(this.NextTimeoutMs);
try
{
connection.WriteBytesToConnection(this.Data, this.Length);
connection.Statistics.LogMessageResent();
return 1;
}
catch (InvalidOperationException)
{
connection.DisconnectInternal(HazelInternalErrors.ConnectionDisconnected, "Could not resend data as connection is no longer connected");
}
}

Expand Down Expand Up @@ -226,7 +252,7 @@ protected void AttachReliableID(SmartBuffer buffer, int offset, int length, Acti
int resendDelayMs = this.ResendTimeoutMs;
if (resendDelayMs <= 0)
{
resendDelayMs = (_pingMs * this.ResendPingMultiplier).ClampToInt(MinResendDelayMs, MaxInitialResendDelayMs);
resendDelayMs = (_pingMs * this.ResendPingMultiplier).ClampToInt(this.MinResendDelayMs, this.MaxInitialResendDelayMs);
}

Packet packet = this.PacketPool.GetObject();
Expand All @@ -244,11 +270,9 @@ protected void AttachReliableID(SmartBuffer buffer, int offset, int length, Acti
}
}

public static int ClampToInt(float value, int min, int max)
public int CalculateNextResendDelayMs(int lastDelayMs)
{
if (value < min) return min;
if (value > max) return max;
return (int)value;
return lastDelayMs + (int)Math.Min(lastDelayMs * this.ResendPingMultiplier, this.MaxAdditionalResendDelayMs);
}

/// <summary>
Expand Down Expand Up @@ -406,6 +430,10 @@ private void AcknowledgementMessageReceive(byte[] bytes, int bytesReceived)
{
AcknowledgeMessageId((ushort)(id - i));
}
else
{
ForceResendMessageId((ushort)(id - i));
}

recentPackets >>= 1;
}
Expand All @@ -414,6 +442,14 @@ private void AcknowledgementMessageReceive(byte[] bytes, int bytesReceived)
Statistics.LogAcknowledgementReceive(bytesReceived);
}

private void ForceResendMessageId(ushort id)
{
if (this.reliableDataPacketsSent.TryGetValue(id, out Packet pkt))
{
pkt.Resend(force: true);
}
}

private void AcknowledgeMessageId(ushort id)
{
// Dispose of timer and remove from dictionary
Expand All @@ -430,7 +466,9 @@ private void AcknowledgeMessageId(ushort id)
this._pingMs = this._pingMs * .7f + rt * .3f;
}

this.logger.WriteVerbose($"Packet {id} RT: {rt}ms Ping:{this._pingMs} Active: {reliableDataPacketsSent.Count}/{activePingPackets.Count}");
#if DEBUG
this.logger.WriteVerbose($"Packet {id} RTT: {rt}ms Ping:{this._pingMs} Active: {reliableDataPacketsSent.Count}/{activePingPackets.Count}");
#endif
}
else if (this.activePingPackets.TryRemove(id, out PingPacket pingPkt))
{
Expand All @@ -444,7 +482,9 @@ private void AcknowledgeMessageId(ushort id)
this._pingMs = this._pingMs * .7f + rt * .3f;
}

this.logger.WriteVerbose($"Ping {id} RT: {rt}ms Ping:{this._pingMs} Active: {reliableDataPacketsSent.Count}/{activePingPackets.Count}");
#if DEBUG
this.logger.WriteVerbose($"Ping {id} RTT: {rt}ms Ping:{this._pingMs} Active: {reliableDataPacketsSent.Count}/{activePingPackets.Count}");
#endif
}
}

Expand Down
6 changes: 4 additions & 2 deletions Hazel/Udp/UnityUdpClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public UnityUdpClientConnection(ILogger logger, IPEndPoint remoteEndPoint, IPMod
this.Dispose(false);
}

public void FixedUpdate()
public int FixedUpdate()
{
try
{
Expand All @@ -54,12 +54,14 @@ public void FixedUpdate()

try
{
ManageReliablePackets();
return ManageReliablePackets();
}
catch (Exception e)
{
this.logger.WriteError("FixedUpdate: " + e);
}

return 0;
}

protected virtual void RestartConnection()
Expand Down

0 comments on commit 8a139a0

Please sign in to comment.