Skip to content

Commit

Permalink
Fix test client unsubscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
Dragemil committed Sep 12, 2023
1 parent 3841118 commit 6543bbf
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 10 deletions.
33 changes: 33 additions & 0 deletions publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using LeanCode.Contracts;

namespace LeanPipe.TestClient;

public class LeanPipeSubscription
{
private readonly List<object> receivedNotifications = new();

public ITopic Topic { get; private init; }
public Guid? SubscriptionId { get; private set; }
public IReadOnlyList<object> ReceivedNotifications => receivedNotifications;

public LeanPipeSubscription(ITopic topic, Guid? subscriptionId)
{
Topic = topic;
SubscriptionId = subscriptionId;
}

public void Subscribe(Guid subscriptionId)
{
SubscriptionId = subscriptionId;
}

public void Unsubscribe()
{
SubscriptionId = null;
}

public void AddNotification(object notification)
{
receivedNotifications.Add(notification);
}
}
53 changes: 45 additions & 8 deletions publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ namespace LeanPipe.TestClient;

public class LeanPipeTestClient : IAsyncDisposable
{
private readonly ConcurrentDictionary<ITopic, List<object>> receivedNotifications =
private readonly ConcurrentDictionary<ITopic, LeanPipeSubscription> subscriptions =
new(TopicDeepEqualityComparer.Instance);

private readonly HubConnection hubConnection;
private readonly NotificationEnvelopeDeserializer notificationEnvelopeDeserializer;
private readonly JsonSerializerOptions? serializerOptions;
private readonly TimeSpan subscriptionCompletionTimeout;

public IReadOnlyDictionary<ITopic, List<object>> ReceivedNotifications => receivedNotifications;
public IReadOnlyDictionary<ITopic, LeanPipeSubscription> Subscriptions => subscriptions;

public LeanPipeTestClient(
Uri leanPipeUrl,
Expand All @@ -39,6 +39,21 @@ public LeanPipeTestClient(
})
.Build();

hubConnection.Closed += e =>
{
foreach (var subscription in subscriptions.Values)
{
subscription.Unsubscribe();
}
if (e is not null)
{
throw e;
}
return Task.CompletedTask;
};

notificationEnvelopeDeserializer = new(leanPipeTypes, serializerOptions);

this.serializerOptions = serializerOptions;
Expand All @@ -51,7 +66,7 @@ public LeanPipeTestClient(
{
if (notificationEnvelopeDeserializer.Deserialize(n) is var (topic, notification))
{
receivedNotifications.GetValueOrDefault(topic)?.Add(notification);
subscriptions.GetValueOrDefault(topic)?.AddNotification(notification);
}
}
);
Expand All @@ -63,12 +78,23 @@ public LeanPipeTestClient(
)
where TTopic : ITopic
{
SubscriptionResult? result;

if (hubConnection.State != HubConnectionState.Connected)
{
return new(default, SubscriptionStatus.Success, OperationType.Unsubscribe);
result = new(default, SubscriptionStatus.Success, OperationType.Unsubscribe);
}
else
{
result = await ManageSubscriptionCoreAsync(topic, OperationType.Unsubscribe, ct);
}

return await ManageSubscriptionCoreAsync(topic, OperationType.Unsubscribe, ct);
if (result?.Status == SubscriptionStatus.Success)
{
subscriptions.GetValueOrDefault(topic)?.Unsubscribe();
}

return result;
}

public async Task<SubscriptionResult?> SubscribeAsync<TTopic>(
Expand All @@ -86,7 +112,7 @@ public LeanPipeTestClient(

if (result?.Status == SubscriptionStatus.Success)
{
receivedNotifications.TryAdd(topic, new());
subscriptions.TryAdd(topic, new(topic, result.SubscriptionId));
}

return result;
Expand All @@ -111,12 +137,13 @@ public async ValueTask DisposeAsync()
OperationType operationType,
CancellationToken ct
)
where TTopic : ITopic
{
var topicType = typeof(TTopic);

var subscriptionEnvelope = new SubscriptionEnvelope
{
Id = Guid.NewGuid(),
Id = subscriptions.GetValueOrDefault(topic)?.SubscriptionId ?? Guid.NewGuid(),
TopicType = topicType.FullName!,
Topic = JsonSerializer.SerializeToDocument(topic, serializerOptions),
};
Expand All @@ -135,7 +162,17 @@ CancellationToken ct
);

await hubConnection.InvokeAsync(
nameof(LeanPipeSubscriber.Subscribe),
operationType switch
{
OperationType.Subscribe => nameof(LeanPipeSubscriber.Subscribe),
OperationType.Unsubscribe => nameof(LeanPipeSubscriber.Unsubscribe),
_
=> throw new ArgumentOutOfRangeException(
nameof(operationType),
operationType,
"LeanPipe OperationType is out of range."
),
},
subscriptionEnvelope,
ct
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static async Task<Guid> UnsubscribeSuccessAsync<TTopic>(
var subscriptionResult = await client.UnsubscribeAsync(topic, ct);

if (
subscriptionResult?.Type == OperationType.Subscribe
subscriptionResult?.Type == OperationType.Unsubscribe
&& subscriptionResult.Status == SubscriptionStatus.Success
)
{
Expand All @@ -58,6 +58,7 @@ TTopic topic
)
where TTopic : ITopic
{
return client.ReceivedNotifications.GetValueOrDefault(topic, new());
return client.Subscriptions.GetValueOrDefault(topic)?.ReceivedNotifications
?? new List<object>();
}
}

0 comments on commit 6543bbf

Please sign in to comment.