Skip to content

Commit

Permalink
feat: Implement audio relay and IP discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
angelobreuer committed Apr 28, 2024
1 parent 39433c0 commit 73f5b69
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Lavalink4NET.Experiments.Receive.Connections.Discovery;

using System.Net;
using System.Net.Sockets;

internal interface IIpDiscoveryService
{
ValueTask<IPEndPoint?> DiscoverExternalAddressAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
namespace Lavalink4NET.Experiments.Receive.Connections.Discovery;

using System.Buffers;
using System.Buffers.Binary;
using System.Net;
using System.Net.Sockets;
using System.Text;

internal sealed class IpDiscoveryService : IIpDiscoveryService
{
public async ValueTask<IPEndPoint?> DiscoverExternalAddressAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNull(socket);

using var periodicTimer = new PeriodicTimer(TimeSpan.FromSeconds(1));

try
{
do
{
// discover external address
var address = await DiscoverExternalAddressSingleAsync(
socket: socket,
ssrc: ssrc,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

if (address is not null)
{
// got response!
return address;
}
}
while (await periodicTimer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false));
}
catch (OperationCanceledException)
{
}

// no attempts left, give up or cancellation requested
return null;
}

private async ValueTask<IPEndPoint?> DiscoverExternalAddressSingleAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

ArgumentNullException.ThrowIfNull(socket);

// rent a buffer from the shared buffer array pool with a minimum size of 74 bytes (can
// hold the request).
var pooledBuffer = ArrayPool<byte>.Shared.Rent(74);
var buffer = pooledBuffer.AsMemory(0, 74);

try
{
// encode payload data
BinaryPrimitives.WriteUInt16BigEndian(buffer.Span[0..], 0x01); // Request Payload Type
BinaryPrimitives.WriteUInt16BigEndian(buffer.Span[2..], 70); // encoded payload size (always 70)
BinaryPrimitives.WriteUInt32BigEndian(buffer.Span[4..], ssrc); // encode the client's SSRC (big-endian)

// send payload
await socket
.SendAsync(buffer, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);

var startTime = DateTimeOffset.UtcNow;

while (!cancellationToken.IsCancellationRequested)
{
var receiveResult = await socket
.ReceiveFromAsync(buffer, SocketFlags.None, new IPEndPoint(IPAddress.Any, 0), cancellationToken: cancellationToken)
.ConfigureAwait(false);

if (receiveResult.ReceivedBytes is not 74) // Total Length
{
continue;
}

var payloadType = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[0..]);
var encodedSize = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[2..]);
var ssrcValue = BinaryPrimitives.ReadUInt32BigEndian(buffer.Span[4..]);

// validate header
if (payloadType is 0x02 && encodedSize is 70 && ssrcValue == ssrc)
{
var addressSpan = buffer[8..64];
var addressTerminatorOffset = addressSpan.Span.IndexOf((byte)0);
var addressLength = addressTerminatorOffset is -1 ? 64 : addressTerminatorOffset;
var address = Encoding.ASCII.GetString(addressSpan.Span[..addressLength]);
var port = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[72..]);

return new IPEndPoint(IPAddress.Parse(address), port);
}
}
}
finally
{
// return buffer to pool
ArrayPool<byte>.Shared.Return(pooledBuffer);
}

// timeout exceeded
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Lavalink4NET.Experiments.Receive.Connections;

using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Lavalink4NET.Experiments.Receive.Connections.Payloads;

internal interface IVoiceConnectionHandle
{
ValueTask<IPEndPoint> SelectProtocolAsync(SelectProtocolPayload selectProtocolPayload, CancellationToken cancellationToken = default);

ValueTask SetSessionDescriptionAsync(SessionDescriptionPayload sessionDescriptionPayload, CancellationToken cancellationToken = default);

ValueTask<IPEndPoint> SetReadyAsync(ReadyPayload readyPayload, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
namespace Lavalink4NET.Experiments.Receive.Connections;

public interface IVoiceConnectionHandler
internal interface IVoiceConnectionHandler
{
ValueTask ProcessAsync(
VoiceConnectionContext connectionContext,
IVoiceConnectionHandle connectionHandle,
CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ internal sealed record class SpeakingPayload : IVoicePayload
[JsonPropertyName("speaking")]
public required SpeakingFlags Flags { get; set; }

[JsonRequired]
[JsonPropertyName("delay")]
public required int Delay { get; set; }
public int? Delay { get; set; }

[JsonRequired]
[JsonPropertyName("ssrc")]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
namespace Lavalink4NET.Experiments.Receive.Connections;

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Net;
using System.Net.Sockets;
using System.Text;
using Lavalink4NET.Experiments.Receive.Connections.Discovery;
using Lavalink4NET.Experiments.Receive.Connections.Payloads;

internal sealed class VoiceConnectionHandle : IVoiceConnectionHandle
{
private readonly IIpDiscoveryService _ipDiscoveryService;
private SelectProtocolPayload? _selectProtocolPayload;
private ReadyPayload? _readyPayload;
private SessionDescriptionPayload? _sessionDescriptionPayload;
private Socket? _localSocket;
private Socket? _remoteSocket;

public VoiceConnectionHandle(IIpDiscoveryService ipDiscoveryService)
{
ArgumentNullException.ThrowIfNull(ipDiscoveryService);

_ipDiscoveryService = ipDiscoveryService;
}

public async ValueTask<IPEndPoint> SelectProtocolAsync(SelectProtocolPayload selectProtocolPayload, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNull(selectProtocolPayload);

_selectProtocolPayload = selectProtocolPayload;

using var discoveryCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var externalRemoteAddress = await _ipDiscoveryService
.DiscoverExternalAddressAsync(_remoteSocket!, _readyPayload!.Ssrc, discoveryCancellationTokenSource.Token)
.ConfigureAwait(false);

await CompleteAsync(cancellationToken).ConfigureAwait(false);

return externalRemoteAddress;
}

public async ValueTask<IPEndPoint> SetReadyAsync(ReadyPayload readyPayload, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNull(readyPayload);

if (_readyPayload is not null)
{
throw new InvalidOperationException("Ready payload already received.");
}

_readyPayload = readyPayload;

_localSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
_localSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0));

_remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
_remoteSocket.Bind(new IPEndPoint(IPAddress.Any, _readyPayload.Port));
_remoteSocket.Connect(new IPEndPoint(IPAddress.Parse(_readyPayload.Ip), _readyPayload.Port));

_ = ProxyAsync(_localSocket!, _remoteSocket!, cancellationToken).AsTask();

await CompleteAsync(cancellationToken).ConfigureAwait(false);

return (IPEndPoint)_localSocket.LocalEndPoint!;
}

public async ValueTask SetSessionDescriptionAsync(SessionDescriptionPayload sessionDescriptionPayload, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNull(sessionDescriptionPayload);

_sessionDescriptionPayload = sessionDescriptionPayload;

await CompleteAsync(cancellationToken).ConfigureAwait(false);
}

private ValueTask<IPEndPoint> CompleteAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

if (_selectProtocolPayload is null || _readyPayload is null || _sessionDescriptionPayload is null)
{
return default;
}

_localSocket!.Connect(new IPEndPoint(IPAddress.Parse(_selectProtocolPayload.Data.Address), _selectProtocolPayload.Data.Port));
_ = ProxyAsync(_remoteSocket!, _localSocket!, cancellationToken).AsTask();

return default;
}

private async ValueTask HandleIpDiscoveryAsync(Socket sourceSocket, EndPoint endPoint, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNull(sourceSocket);

using var bufferWriter = new PooledBufferWriter<byte>();

var header = bufferWriter.GetMemory(8);

BinaryPrimitives.WriteUInt16BigEndian(header.Span[0..2], 0x02); // Mark as response
BinaryPrimitives.WriteUInt16BigEndian(header.Span[2..4], 70); // Encode (constant) length
BinaryPrimitives.WriteUInt32BigEndian(header.Span[4..8], _readyPayload!.Ssrc); // Encode SSRC

bufferWriter.Advance(8);

var localEndPoint = (IPEndPoint)sourceSocket.LocalEndPoint!;

// Encode IP
var ipContent = bufferWriter.GetMemory(64);
var encodedByteCount = Encoding.UTF8.GetBytes(localEndPoint.Address.ToString(), ipContent.Span);
ipContent.Span[encodedByteCount] = 0;
bufferWriter.Advance(64);

// Encode port
var portContent = bufferWriter.GetMemory(2);
BinaryPrimitives.WriteUInt16BigEndian(portContent.Span, (ushort)localEndPoint.Port);
bufferWriter.Advance(2);

await sourceSocket
.SendToAsync(bufferWriter.WrittenMemory, SocketFlags.None, endPoint, cancellationToken)
.ConfigureAwait(false);
}

private async ValueTask ProxyAsync(Socket sourceSocket, Socket destinationSocket, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

var buffer = ArrayPool<byte>.Shared.Rent(64 * 1024);

try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

var result = await sourceSocket
.ReceiveMessageFromAsync(buffer, SocketFlags.None, sourceSocket.LocalEndPoint!, cancellationToken)
.ConfigureAwait(false);

if (result.ReceivedBytes is 0)
{
break;
}

var data = new ReadOnlyMemory<byte>(buffer, 0, result.ReceivedBytes);

if (data.Length is 74 && data.Span[0..2].SequenceEqual(new byte[] { 0x00, 0x01, }))
{
await HandleIpDiscoveryAsync(sourceSocket, result.RemoteEndPoint, cancellationToken).ConfigureAwait(false);
continue;
}

Console.WriteLine(data.Length);

await destinationSocket
.SendAsync(data, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
Loading

0 comments on commit 73f5b69

Please sign in to comment.