diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IIpDiscoveryService.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IIpDiscoveryService.cs new file mode 100644 index 00000000..9c076d86 --- /dev/null +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IIpDiscoveryService.cs @@ -0,0 +1,9 @@ +namespace Lavalink4NET.Experiments.Receive.Connections.Discovery; + +using System.Net; +using System.Net.Sockets; + +internal interface IIpDiscoveryService +{ + ValueTask DiscoverExternalAddressAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default); +} diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IpDiscoveryService.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IpDiscoveryService.cs new file mode 100644 index 00000000..b7d63de3 --- /dev/null +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/Discovery/IpDiscoveryService.cs @@ -0,0 +1,107 @@ +namespace Lavalink4NET.Experiments.Receive.Connections.Discovery; + +using System.Buffers; +using System.Buffers.Binary; +using System.Net; +using System.Net.Sockets; +using System.Text; + +internal sealed class IpDiscoveryService : IIpDiscoveryService +{ + public async ValueTask DiscoverExternalAddressAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + ArgumentNullException.ThrowIfNull(socket); + + using var periodicTimer = new PeriodicTimer(TimeSpan.FromSeconds(1)); + + try + { + do + { + // discover external address + var address = await DiscoverExternalAddressSingleAsync( + socket: socket, + ssrc: ssrc, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (address is not null) + { + // got response! + return address; + } + } + while (await periodicTimer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false)); + } + catch (OperationCanceledException) + { + } + + // no attempts left, give up or cancellation requested + return null; + } + + private async ValueTask DiscoverExternalAddressSingleAsync(Socket socket, uint ssrc, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + ArgumentNullException.ThrowIfNull(socket); + + // rent a buffer from the shared buffer array pool with a minimum size of 74 bytes (can + // hold the request). + var pooledBuffer = ArrayPool.Shared.Rent(74); + var buffer = pooledBuffer.AsMemory(0, 74); + + try + { + // encode payload data + BinaryPrimitives.WriteUInt16BigEndian(buffer.Span[0..], 0x01); // Request Payload Type + BinaryPrimitives.WriteUInt16BigEndian(buffer.Span[2..], 70); // encoded payload size (always 70) + BinaryPrimitives.WriteUInt32BigEndian(buffer.Span[4..], ssrc); // encode the client's SSRC (big-endian) + + // send payload + await socket + .SendAsync(buffer, SocketFlags.None, cancellationToken) + .ConfigureAwait(false); + + var startTime = DateTimeOffset.UtcNow; + + while (!cancellationToken.IsCancellationRequested) + { + var receiveResult = await socket + .ReceiveFromAsync(buffer, SocketFlags.None, new IPEndPoint(IPAddress.Any, 0), cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (receiveResult.ReceivedBytes is not 74) // Total Length + { + continue; + } + + var payloadType = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[0..]); + var encodedSize = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[2..]); + var ssrcValue = BinaryPrimitives.ReadUInt32BigEndian(buffer.Span[4..]); + + // validate header + if (payloadType is 0x02 && encodedSize is 70 && ssrcValue == ssrc) + { + var addressSpan = buffer[8..64]; + var addressTerminatorOffset = addressSpan.Span.IndexOf((byte)0); + var addressLength = addressTerminatorOffset is -1 ? 64 : addressTerminatorOffset; + var address = Encoding.ASCII.GetString(addressSpan.Span[..addressLength]); + var port = BinaryPrimitives.ReadUInt16BigEndian(buffer.Span[72..]); + + return new IPEndPoint(IPAddress.Parse(address), port); + } + } + } + finally + { + // return buffer to pool + ArrayPool.Shared.Return(pooledBuffer); + } + + // timeout exceeded + return null; + } +} diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandle.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandle.cs new file mode 100644 index 00000000..228bc8a8 --- /dev/null +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandle.cs @@ -0,0 +1,15 @@ +namespace Lavalink4NET.Experiments.Receive.Connections; + +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Lavalink4NET.Experiments.Receive.Connections.Payloads; + +internal interface IVoiceConnectionHandle +{ + ValueTask SelectProtocolAsync(SelectProtocolPayload selectProtocolPayload, CancellationToken cancellationToken = default); + + ValueTask SetSessionDescriptionAsync(SessionDescriptionPayload sessionDescriptionPayload, CancellationToken cancellationToken = default); + + ValueTask SetReadyAsync(ReadyPayload readyPayload, CancellationToken cancellationToken = default); +} diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandler.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandler.cs index cfb253e8..475e95e5 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandler.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/IVoiceConnectionHandler.cs @@ -1,8 +1,9 @@ namespace Lavalink4NET.Experiments.Receive.Connections; -public interface IVoiceConnectionHandler +internal interface IVoiceConnectionHandler { ValueTask ProcessAsync( VoiceConnectionContext connectionContext, + IVoiceConnectionHandle connectionHandle, CancellationToken cancellationToken = default); } diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/Payloads/SpeakingPayload.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/Payloads/SpeakingPayload.cs index d3faa769..8bce8a2d 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Connections/Payloads/SpeakingPayload.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/Payloads/SpeakingPayload.cs @@ -8,9 +8,8 @@ internal sealed record class SpeakingPayload : IVoicePayload [JsonPropertyName("speaking")] public required SpeakingFlags Flags { get; set; } - [JsonRequired] [JsonPropertyName("delay")] - public required int Delay { get; set; } + public int? Delay { get; set; } [JsonRequired] [JsonPropertyName("ssrc")] diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandle.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandle.cs new file mode 100644 index 00000000..78fa4429 --- /dev/null +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandle.cs @@ -0,0 +1,171 @@ +namespace Lavalink4NET.Experiments.Receive.Connections; + +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Net; +using System.Net.Sockets; +using System.Text; +using Lavalink4NET.Experiments.Receive.Connections.Discovery; +using Lavalink4NET.Experiments.Receive.Connections.Payloads; + +internal sealed class VoiceConnectionHandle : IVoiceConnectionHandle +{ + private readonly IIpDiscoveryService _ipDiscoveryService; + private SelectProtocolPayload? _selectProtocolPayload; + private ReadyPayload? _readyPayload; + private SessionDescriptionPayload? _sessionDescriptionPayload; + private Socket? _localSocket; + private Socket? _remoteSocket; + + public VoiceConnectionHandle(IIpDiscoveryService ipDiscoveryService) + { + ArgumentNullException.ThrowIfNull(ipDiscoveryService); + + _ipDiscoveryService = ipDiscoveryService; + } + + public async ValueTask SelectProtocolAsync(SelectProtocolPayload selectProtocolPayload, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + ArgumentNullException.ThrowIfNull(selectProtocolPayload); + + _selectProtocolPayload = selectProtocolPayload; + + using var discoveryCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var externalRemoteAddress = await _ipDiscoveryService + .DiscoverExternalAddressAsync(_remoteSocket!, _readyPayload!.Ssrc, discoveryCancellationTokenSource.Token) + .ConfigureAwait(false); + + await CompleteAsync(cancellationToken).ConfigureAwait(false); + + return externalRemoteAddress; + } + + public async ValueTask SetReadyAsync(ReadyPayload readyPayload, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + ArgumentNullException.ThrowIfNull(readyPayload); + + if (_readyPayload is not null) + { + throw new InvalidOperationException("Ready payload already received."); + } + + _readyPayload = readyPayload; + + _localSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _localSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + + _remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _remoteSocket.Bind(new IPEndPoint(IPAddress.Any, _readyPayload.Port)); + _remoteSocket.Connect(new IPEndPoint(IPAddress.Parse(_readyPayload.Ip), _readyPayload.Port)); + + _ = ProxyAsync(_localSocket!, _remoteSocket!, cancellationToken).AsTask(); + + await CompleteAsync(cancellationToken).ConfigureAwait(false); + + return (IPEndPoint)_localSocket.LocalEndPoint!; + } + + public async ValueTask SetSessionDescriptionAsync(SessionDescriptionPayload sessionDescriptionPayload, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + ArgumentNullException.ThrowIfNull(sessionDescriptionPayload); + + _sessionDescriptionPayload = sessionDescriptionPayload; + + await CompleteAsync(cancellationToken).ConfigureAwait(false); + } + + private ValueTask CompleteAsync(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (_selectProtocolPayload is null || _readyPayload is null || _sessionDescriptionPayload is null) + { + return default; + } + + _localSocket!.Connect(new IPEndPoint(IPAddress.Parse(_selectProtocolPayload.Data.Address), _selectProtocolPayload.Data.Port)); + _ = ProxyAsync(_remoteSocket!, _localSocket!, cancellationToken).AsTask(); + + return default; + } + + private async ValueTask HandleIpDiscoveryAsync(Socket sourceSocket, EndPoint endPoint, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + ArgumentNullException.ThrowIfNull(sourceSocket); + + using var bufferWriter = new PooledBufferWriter(); + + var header = bufferWriter.GetMemory(8); + + BinaryPrimitives.WriteUInt16BigEndian(header.Span[0..2], 0x02); // Mark as response + BinaryPrimitives.WriteUInt16BigEndian(header.Span[2..4], 70); // Encode (constant) length + BinaryPrimitives.WriteUInt32BigEndian(header.Span[4..8], _readyPayload!.Ssrc); // Encode SSRC + + bufferWriter.Advance(8); + + var localEndPoint = (IPEndPoint)sourceSocket.LocalEndPoint!; + + // Encode IP + var ipContent = bufferWriter.GetMemory(64); + var encodedByteCount = Encoding.UTF8.GetBytes(localEndPoint.Address.ToString(), ipContent.Span); + ipContent.Span[encodedByteCount] = 0; + bufferWriter.Advance(64); + + // Encode port + var portContent = bufferWriter.GetMemory(2); + BinaryPrimitives.WriteUInt16BigEndian(portContent.Span, (ushort)localEndPoint.Port); + bufferWriter.Advance(2); + + await sourceSocket + .SendToAsync(bufferWriter.WrittenMemory, SocketFlags.None, endPoint, cancellationToken) + .ConfigureAwait(false); + } + + private async ValueTask ProxyAsync(Socket sourceSocket, Socket destinationSocket, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + var buffer = ArrayPool.Shared.Rent(64 * 1024); + + try + { + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var result = await sourceSocket + .ReceiveMessageFromAsync(buffer, SocketFlags.None, sourceSocket.LocalEndPoint!, cancellationToken) + .ConfigureAwait(false); + + if (result.ReceivedBytes is 0) + { + break; + } + + var data = new ReadOnlyMemory(buffer, 0, result.ReceivedBytes); + + if (data.Length is 74 && data.Span[0..2].SequenceEqual(new byte[] { 0x00, 0x01, })) + { + await HandleIpDiscoveryAsync(sourceSocket, result.RemoteEndPoint, cancellationToken).ConfigureAwait(false); + continue; + } + + Console.WriteLine(data.Length); + + await destinationSocket + .SendAsync(data, SocketFlags.None, cancellationToken) + .ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } +} diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandler.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandler.cs index c853b945..f40d24de 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandler.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceConnectionHandler.cs @@ -37,6 +37,7 @@ public VoiceConnectionHandler( public async ValueTask ProcessAsync( VoiceConnectionContext connectionContext, + IVoiceConnectionHandle connectionHandle, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); @@ -110,8 +111,8 @@ await _protocolHandler .WriteAsync(remoteConnectionContext, remoteIdentifyPayload, cancellationToken) .ConfigureAwait(false); - var task1 = ProxyAsync(connectionContext, remoteConnectionContext, cancellationToken).AsTask(); - var task2 = ProxyAsync(remoteConnectionContext, connectionContext, cancellationToken).AsTask(); + var task1 = ProxyAsync(connectionContext, remoteConnectionContext, connectionHandle, isRemote: false, cancellationToken).AsTask(); + var task2 = ProxyAsync(remoteConnectionContext, connectionContext, connectionHandle, isRemote: true, cancellationToken).AsTask(); await Task .WhenAny(task1, task2) @@ -121,6 +122,8 @@ await Task private async ValueTask ProxyAsync( VoiceConnectionContext sourceConnectionContext, VoiceConnectionContext destinationConnectionContext, + IVoiceConnectionHandle connectionHandle, + bool isRemote = false, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); @@ -141,8 +144,53 @@ private async ValueTask ProxyAsync( break; } + var receivedPayload = result.Payload; + + switch (receivedPayload) + { + case ReadyPayload payload when isRemote: + var localEndPoint = await connectionHandle + .SetReadyAsync(payload, cancellationToken) + .ConfigureAwait(false); + + receivedPayload = new ReadyPayload + { + Ssrc = payload.Ssrc, + Ip = localEndPoint.Address.ToString(), + Port = localEndPoint.Port, + Modes = payload.Modes, + }; + + break; + + case SessionDescriptionPayload payload when isRemote: + await connectionHandle + .SetSessionDescriptionAsync(payload, cancellationToken) + .ConfigureAwait(false); + + break; + + case SelectProtocolPayload payload when !isRemote: + var remoteEndPoint = await connectionHandle + .SelectProtocolAsync(payload, cancellationToken) + .ConfigureAwait(false); + + receivedPayload = new SelectProtocolPayload + { + Data = new SelectProtocolData + { + Address = remoteEndPoint.Address.ToString(), + Port = remoteEndPoint.Port, + Mode = payload.Data.Mode, + }, + Protocol = payload.Protocol, + }; + + break; + } + await _protocolHandler - .WriteAsync(destinationConnectionContext, result.Payload, cancellationToken) + .WriteAsync(destinationConnectionContext, receivedPayload, cancellationToken) .ConfigureAwait(false); } } diff --git a/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceProtocolHandler.cs b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceProtocolHandler.cs index 8208395c..0a561456 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceProtocolHandler.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Connections/VoiceProtocolHandler.cs @@ -66,7 +66,7 @@ await connectionContext.WebSocket if (!result.EndOfMessage) { - _logger.LogWarning(label, "[{Label}] Received a partial payload from voice gateway."); + _logger.LogWarning("[{Label}] Received a partial payload from voice gateway.", label); await connectionContext.WebSocket .CloseAsync(WebSocketCloseStatus.MessageTooBig, "Payload is too large.", cancellationToken) diff --git a/experiments/Lavalink4NET.Experiments.Receive/Extensions/ServiceCollectionExtensions.cs b/experiments/Lavalink4NET.Experiments.Receive/Extensions/ServiceCollectionExtensions.cs index 3e4753cb..41d08ed7 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Extensions/ServiceCollectionExtensions.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Extensions/ServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ namespace Lavalink4NET.Experiments.Receive.Extensions; using Lavalink4NET.Experiments.Receive.Connections; +using Lavalink4NET.Experiments.Receive.Connections.Discovery; using Lavalink4NET.Experiments.Receive.Server; using Lavalink4NET.Experiments.Receive.Sessions; using Lavalink4NET.Players; @@ -17,6 +18,7 @@ public static IServiceCollection AddLavalinkReceive(this IServiceCollection serv services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.Configure(static _ => { }); diff --git a/experiments/Lavalink4NET.Experiments.Receive/Server/LavalinkVoiceServer.cs b/experiments/Lavalink4NET.Experiments.Receive/Server/LavalinkVoiceServer.cs index 336292db..ef3bc6ce 100644 --- a/experiments/Lavalink4NET.Experiments.Receive/Server/LavalinkVoiceServer.cs +++ b/experiments/Lavalink4NET.Experiments.Receive/Server/LavalinkVoiceServer.cs @@ -4,6 +4,7 @@ using System.Diagnostics.Metrics; using System.Globalization; using Lavalink4NET.Experiments.Receive.Connections; +using Lavalink4NET.Experiments.Receive.Connections.Discovery; using Lavalink4NET.Experiments.Receive.Connections.Features; using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Hosting.Server; @@ -18,18 +19,20 @@ internal sealed class LavalinkVoiceServer : IHttpApplication, ILava { private readonly IServiceProvider _serviceProvider; private readonly IVoiceConnectionHandler _voiceConnectionHandler; + private readonly IIpDiscoveryService _ipDiscoveryService; private readonly IServer _server; private readonly WebSocketMiddleware _webSocketMiddleware; public LavalinkVoiceServer( IVoiceConnectionHandler voiceConnectionHandler, + IIpDiscoveryService ipDiscoveryService, ILoggerFactory loggerFactory, IOptions options) { ArgumentNullException.ThrowIfNull(voiceConnectionHandler); _voiceConnectionHandler = voiceConnectionHandler; - + _ipDiscoveryService = ipDiscoveryService; var services = new ServiceCollection(); // HTTP Kestrel Host @@ -140,7 +143,7 @@ await httpContext.Response connectionContext.Features.Set(new VoiceGatewayVersionFeature(version)); await _voiceConnectionHandler - .ProcessAsync(connectionContext, cancellationToken) + .ProcessAsync(connectionContext, new VoiceConnectionHandle(_ipDiscoveryService), cancellationToken) .ConfigureAwait(false); } }