diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs b/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs new file mode 100644 index 0000000..b5f9411 --- /dev/null +++ b/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs @@ -0,0 +1,33 @@ +using LeanCode.Contracts; + +namespace LeanPipe.TestClient; + +public class LeanPipeSubscription +{ + private readonly List receivedNotifications = new(); + + public ITopic Topic { get; private init; } + public Guid? SubscriptionId { get; private set; } + public IReadOnlyList ReceivedNotifications => receivedNotifications; + + public LeanPipeSubscription(ITopic topic, Guid? subscriptionId) + { + Topic = topic; + SubscriptionId = subscriptionId; + } + + public void Subscribe(Guid subscriptionId) + { + SubscriptionId = subscriptionId; + } + + public void Unsubscribe() + { + SubscriptionId = null; + } + + public void AddNotification(object notification) + { + receivedNotifications.Add(notification); + } +} diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs b/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs index da40424..ed4b16d 100644 --- a/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs +++ b/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs @@ -13,7 +13,7 @@ namespace LeanPipe.TestClient; public class LeanPipeTestClient : IAsyncDisposable { - private readonly ConcurrentDictionary> receivedNotifications = + private readonly ConcurrentDictionary subscriptions = new(TopicDeepEqualityComparer.Instance); private readonly HubConnection hubConnection; @@ -21,7 +21,7 @@ public class LeanPipeTestClient : IAsyncDisposable private readonly JsonSerializerOptions? serializerOptions; private readonly TimeSpan subscriptionCompletionTimeout; - public IReadOnlyDictionary> ReceivedNotifications => receivedNotifications; + public IReadOnlyDictionary Subscriptions => subscriptions; public LeanPipeTestClient( Uri leanPipeUrl, @@ -39,6 +39,21 @@ public LeanPipeTestClient( }) .Build(); + hubConnection.Closed += e => + { + foreach (var subscription in subscriptions.Values) + { + subscription.Unsubscribe(); + } + + if (e is not null) + { + throw e; + } + + return Task.CompletedTask; + }; + notificationEnvelopeDeserializer = new(leanPipeTypes, serializerOptions); this.serializerOptions = serializerOptions; @@ -51,7 +66,7 @@ public LeanPipeTestClient( { if (notificationEnvelopeDeserializer.Deserialize(n) is var (topic, notification)) { - receivedNotifications.GetValueOrDefault(topic)?.Add(notification); + subscriptions.GetValueOrDefault(topic)?.AddNotification(notification); } } ); @@ -63,12 +78,23 @@ public LeanPipeTestClient( ) where TTopic : ITopic { + SubscriptionResult? result; + if (hubConnection.State != HubConnectionState.Connected) { - return new(default, SubscriptionStatus.Success, OperationType.Unsubscribe); + result = new(default, SubscriptionStatus.Success, OperationType.Unsubscribe); + } + else + { + result = await ManageSubscriptionCoreAsync(topic, OperationType.Unsubscribe, ct); } - return await ManageSubscriptionCoreAsync(topic, OperationType.Unsubscribe, ct); + if (result?.Status == SubscriptionStatus.Success) + { + subscriptions.GetValueOrDefault(topic)?.Unsubscribe(); + } + + return result; } public async Task SubscribeAsync( @@ -86,7 +112,7 @@ public LeanPipeTestClient( if (result?.Status == SubscriptionStatus.Success) { - receivedNotifications.TryAdd(topic, new()); + subscriptions.TryAdd(topic, new(topic, result.SubscriptionId)); } return result; @@ -111,12 +137,13 @@ public async ValueTask DisposeAsync() OperationType operationType, CancellationToken ct ) + where TTopic : ITopic { var topicType = typeof(TTopic); var subscriptionEnvelope = new SubscriptionEnvelope { - Id = Guid.NewGuid(), + Id = subscriptions.GetValueOrDefault(topic)?.SubscriptionId ?? Guid.NewGuid(), TopicType = topicType.FullName!, Topic = JsonSerializer.SerializeToDocument(topic, serializerOptions), }; @@ -135,7 +162,17 @@ CancellationToken ct ); await hubConnection.InvokeAsync( - nameof(LeanPipeSubscriber.Subscribe), + operationType switch + { + OperationType.Subscribe => nameof(LeanPipeSubscriber.Subscribe), + OperationType.Unsubscribe => nameof(LeanPipeSubscriber.Unsubscribe), + _ + => throw new ArgumentOutOfRangeException( + nameof(operationType), + operationType, + "LeanPipe OperationType is out of range." + ), + }, subscriptionEnvelope, ct ); diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs b/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs index dd8c535..e00fed7 100644 --- a/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs +++ b/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs @@ -38,7 +38,7 @@ public static async Task UnsubscribeSuccessAsync( var subscriptionResult = await client.UnsubscribeAsync(topic, ct); if ( - subscriptionResult?.Type == OperationType.Subscribe + subscriptionResult?.Type == OperationType.Unsubscribe && subscriptionResult.Status == SubscriptionStatus.Success ) { @@ -58,6 +58,7 @@ TTopic topic ) where TTopic : ITopic { - return client.ReceivedNotifications.GetValueOrDefault(topic, new()); + return client.Subscriptions.GetValueOrDefault(topic)?.ReceivedNotifications + ?? new List(); } }