From ddac8905695f1f9cd00f0e86473559627c3ed3e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emil=20Draga=C5=84czuk?= Date: Fri, 15 Sep 2023 14:15:22 +0200 Subject: [PATCH] Refactor test client subscription --- .../LeanPipeSubscription.cs | 21 +++++++++---------- .../LeanPipe.TestClient/LeanPipeTestClient.cs | 9 +++++++- .../LeanPipeTestClientExtensions.cs | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs b/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs index e126684..edf3538 100644 --- a/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs +++ b/publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs @@ -1,28 +1,27 @@ +using System.Collections.Concurrent; using LeanCode.Contracts; namespace LeanPipe.TestClient; public class LeanPipeSubscription { - private readonly List receivedNotifications = new(); + private TaskCompletionSource? nextMessageAwaiter; + private readonly ConcurrentStack receivedNotifications = new(); public ITopic Topic { get; private init; } public Guid? SubscriptionId { get; private set; } - public TaskCompletionSource? NextMessageAwaiter { get; private set; } - public IReadOnlyList ReceivedNotifications => receivedNotifications; + public IReadOnlyCollection ReceivedNotifications => receivedNotifications; public LeanPipeSubscription(ITopic topic, Guid? subscriptionId) { Topic = topic; SubscriptionId = subscriptionId; - NextMessageAwaiter = null; + nextMessageAwaiter = null; } public void Subscribe(Guid subscriptionId) { SubscriptionId = subscriptionId; - - ClearMessageAwaiter(); } public void Unsubscribe() @@ -34,20 +33,20 @@ public void Unsubscribe() public void AddNotification(object notification) { - receivedNotifications.Add(notification); - NextMessageAwaiter?.TrySetResult(notification); + receivedNotifications.Push(notification); + nextMessageAwaiter?.TrySetResult(notification); ClearMessageAwaiter(); } public Task GetNextNotificationTask() { - NextMessageAwaiter = new(); - return NextMessageAwaiter.Task; + nextMessageAwaiter ??= new(); + return nextMessageAwaiter.Task; } private void ClearMessageAwaiter() { - NextMessageAwaiter = null; + nextMessageAwaiter = null; } } diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs b/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs index 76d77f2..c2eb1cd 100644 --- a/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs +++ b/publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs @@ -103,6 +103,13 @@ public LeanPipeTestClient( ) where TTopic : ITopic { + var subscription = subscriptions.GetValueOrDefault(topic); + + if (subscription?.SubscriptionId is not null) + { + throw new InvalidOperationException("Already subscribed to topic."); + } + if (hubConnection.State != HubConnectionState.Connected) { await ConnectAsync(ct); @@ -112,7 +119,7 @@ public LeanPipeTestClient( if (result?.Status == SubscriptionStatus.Success) { - if (subscriptions.GetValueOrDefault(topic) is { } subscription) + if (subscription is not null) { subscription.Subscribe(result.SubscriptionId); } diff --git a/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs b/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs index 85411b5..b024b1a 100644 --- a/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs +++ b/publisher/src/LeanPipe.TestClient/LeanPipeTestClientExtensions.cs @@ -74,7 +74,7 @@ public static async Task GetNextNotificationTaskOn( ); } - public static IReadOnlyList NotificationsOn( + public static IReadOnlyCollection NotificationsOn( this LeanPipeTestClient client, TTopic topic )