Skip to content

Commit

Permalink
Merge branch 'dev' into feat/querybuilder-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Quin Lynch committed Apr 19, 2024
2 parents 08258d2 + e20c36d commit 16c204c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 33 deletions.
49 changes: 39 additions & 10 deletions src/EdgeDB.Net.Driver/Binary/Duplexers/StreamDuplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Logging;
using System.Buffers;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.CompilerServices;

namespace EdgeDB.Binary;
Expand Down Expand Up @@ -111,7 +112,7 @@ public async ValueTask DisconnectAsync(CancellationToken token = default)

if (packetFactory is null)
{
// unknow/unsupported packet
// unknown/unsupported packet
_client.Logger.UnknownPacket($"{header.Type}{{0x{(byte)header.Type}}}:{header.Length}");

await DisconnectInternalAsync();
Expand All @@ -121,16 +122,27 @@ public async ValueTask DisconnectAsync(CancellationToken token = default)
var packet = PacketSerializer.DeserializePacket(in packetFactory, in buffer);

// check for idle timeout
if (packet is IProtocolError err && err.ErrorCode == ServerErrorCodes.IdleSessionTimeoutError)
{
// all connection state needs to be reset for the client here.
_client.Logger.IdleDisconnect();
if (packet is not IProtocolError {ErrorCode: ServerErrorCodes.IdleSessionTimeoutError} err) return packet;

await DisconnectInternalAsync();
throw new EdgeDBErrorException(err);
}
// all connection state needs to be reset for the client here.
_client.Logger.IdleDisconnect();

return packet;
await DisconnectInternalAsync();
throw new EdgeDBErrorException(err);
}
catch (IOException ioException) when (ioException.InnerException is SocketException socketException)
{
switch (socketException.SocketErrorCode)
{
case SocketError.ConnectionRefused
or SocketError.ConnectionAborted
or SocketError.ConnectionReset
or SocketError.HostNotFound
or SocketError.NotInitialized:
throw new ConnectionFailedTemporarilyException(socketException.SocketErrorCode);
default:
throw;
}
}
catch (EndOfStreamException)
{
Expand Down Expand Up @@ -168,7 +180,24 @@ public async ValueTask SendAsync(CancellationToken token = default, params Senda

using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(token, _disconnectTokenSource.Token);

await _stream.WriteAsync(BinaryUtils.BuildPackets(packets), linkedToken.Token).ConfigureAwait(false);
try
{
await _stream.WriteAsync(BinaryUtils.BuildPackets(packets), linkedToken.Token).ConfigureAwait(false);
}
catch (IOException ioException) when (ioException.InnerException is SocketException socketException)
{
switch (socketException.SocketErrorCode)
{
case SocketError.ConnectionRefused
or SocketError.ConnectionAborted
or SocketError.ConnectionReset
or SocketError.HostNotFound
or SocketError.NotInitialized:
throw new ConnectionFailedTemporarilyException(socketException.SocketErrorCode);
default:
throw;
}
}

// only perform second iteration if debug log enabled.
if (_client.Logger.IsEnabled(LogLevel.Debug))
Expand Down
2 changes: 2 additions & 0 deletions src/EdgeDB.Net.Driver/Binary/Protocol/IProtocolProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ internal interface IProtocolProvider

int? SuggestedPoolConcurrency { get; }

void Reset();

public static IProtocolProvider GetDefaultProvider(EdgeDBBinaryClient client)
=> (_defaultProvider ??= Providers[ProtocolVersion.EdgeDBBinaryDefaultVersion].Factory)(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ private ILogger Logger

public virtual ProtocolVersion Version { get; } = (1, 0);

public void Reset()
{
Phase = ProtocolPhase.Connection;
_rawServerConfig.Clear();
_serverKey = Array.Empty<byte>();
}

public virtual PacketReadFactory? GetPacketFactory(ServerMessageType type) =>
type switch
{
Expand Down Expand Up @@ -491,6 +498,7 @@ public virtual ValueTask ProcessAsync<T>(in T message) where T : IReceiveable
if (authStatus.AuthStatus != AuthStatus.AuthenticationOK)
throw new UnexpectedMessageException("Expected AuthenticationRequiredSASLMessage, got " +
authStatus.AuthStatus);
Logger.LogDebug("Got authentication OK");
break;
case ServerKeyData keyData:
_serverKey = keyData.KeyBuffer;
Expand Down
53 changes: 30 additions & 23 deletions src/EdgeDB.Net.Driver/Clients/EdgeDBBinaryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,20 @@ await Task.WhenAll(
}
catch (EdgeDBException x) when (x.ShouldReconnect && !isRetry)
{
Logger.LogDebug("Execute threw an exception which allows reconnects, reconnecting...");

await ReconnectAsync(token).ConfigureAwait(false);
_semaphore.Release();
released = true;

Logger.LogDebug("Preforming retry after a reconnect exception");
return await ExecuteInternalAsync(query, args, cardinality, capabilities, format, true, implicitTypeName,
preheat, token).ConfigureAwait(false);
}
catch (EdgeDBException x) when (x.ShouldRetry && !isRetry)
{
Logger.LogDebug("Execute threw an exception which allows retries, retrying...");

_semaphore.Release();
released = true;

Expand Down Expand Up @@ -527,35 +532,35 @@ public override async ValueTask ConnectAsync(CancellationToken token = default)

try
{
Logger.ConnectionMessageProcessing(message.Type);
await _protocolProvider.ProcessAsync(in message);
}
catch (EdgeDBErrorException x) when (x.ShouldReconnect)
{
if (ClientConfig.RetryMode is ConnectionRetryMode.AlwaysRetry)
if (ClientConfig.RetryMode is not ConnectionRetryMode.AlwaysRetry) throw;

if (_currentRetries < ClientConfig.MaxConnectionRetries)
{
if (_currentRetries < ClientConfig.MaxConnectionRetries)
{
_currentRetries++;

Logger.AttemptToReconnect(_currentRetries, ClientConfig.MaxConnectionRetries, x);

// do not forward the linked token in this method to the new
// reconnection, only supply the external token. We also don't
// want to call 'ReconnectAsync' since we queue up a disconnect
// and connect request, if this method was called externally
// while we handle the error, it would be next in line to attempt
// to connect, if that external call completes we would then disconnect
// and connect after a successful connection attempt which wouldn't be ideal.
await DisconnectAsync(token);

_connectSemaphone.Release();

await ConnectAsync(token);
return;
}
else
Logger.MaxConnectionRetries(ClientConfig.MaxConnectionRetries, x);
_currentRetries++;

Logger.AttemptToReconnect(_currentRetries, ClientConfig.MaxConnectionRetries, x);

// do not forward the linked token in this method to the new
// reconnection, only supply the external token. We also don't
// want to call 'ReconnectAsync' since we queue up a disconnect
// and connect request, if this method was called externally
// while we handle the error, it would be next in line to attempt
// to connect, if that external call completes we would then disconnect
// and connect after a successful connection attempt which wouldn't be ideal.
await DisconnectAsync(token);

_connectSemaphone.Release();

await ConnectAsync(token);
return;
}
else
Logger.MaxConnectionRetries(ClientConfig.MaxConnectionRetries, x);

throw;
}
Expand All @@ -564,6 +569,7 @@ public override async ValueTask ConnectAsync(CancellationToken token = default)
{
// reset connection attempts
_currentRetries = 0;
Logger.ConnectionPhaseComplete(_protocolProvider.Phase);
break;
}
}
Expand All @@ -589,6 +595,7 @@ private async Task ConnectInternalAsync(int attempts = 0, CancellationToken toke

_readyCancelTokenSource = new CancellationTokenSource();
_readySource = new TaskCompletionSource();
_protocolProvider.Reset();

Duplexer.Reset();

Expand Down
11 changes: 11 additions & 0 deletions src/EdgeDB.Net.Driver/Log.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EdgeDB.Binary;
using EdgeDB.Binary.Codecs;
using EdgeDB.Binary.Protocol;
using Microsoft.Extensions.Logging;

namespace EdgeDB;
Expand Down Expand Up @@ -228,4 +229,14 @@ public static partial void BeginProtocolNegotiation(this ILogger logget, Protoco
LogLevel.Trace,
"Codec tree information:\n{CodecTree}")]
public static partial void CodecTree(this ILogger logger, string codecTree);

[LoggerMessage(34,
LogLevel.Debug,
"Processing {Type} in connection step")]
public static partial void ConnectionMessageProcessing(this ILogger logger, ServerMessageType type);

[LoggerMessage(35,
LogLevel.Debug,
"Protocol phase is {Phase}. Ending connection task")]
public static partial void ConnectionPhaseComplete(this ILogger logger, ProtocolPhase phase);
}

0 comments on commit 16c204c

Please sign in to comment.