Skip to content

Commit

Permalink
Refactor test client subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
Dragemil committed Sep 15, 2023
1 parent 11572c2 commit ddac890
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
21 changes: 10 additions & 11 deletions publisher/src/LeanPipe.TestClient/LeanPipeSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
using System.Collections.Concurrent;
using LeanCode.Contracts;

namespace LeanPipe.TestClient;

public class LeanPipeSubscription
{
private readonly List<object> receivedNotifications = new();
private TaskCompletionSource<object>? nextMessageAwaiter;
private readonly ConcurrentStack<object> receivedNotifications = new();

public ITopic Topic { get; private init; }
public Guid? SubscriptionId { get; private set; }
public TaskCompletionSource<object>? NextMessageAwaiter { get; private set; }
public IReadOnlyList<object> ReceivedNotifications => receivedNotifications;
public IReadOnlyCollection<object> ReceivedNotifications => receivedNotifications;

public LeanPipeSubscription(ITopic topic, Guid? subscriptionId)
{
Topic = topic;
SubscriptionId = subscriptionId;
NextMessageAwaiter = null;
nextMessageAwaiter = null;
}

public void Subscribe(Guid subscriptionId)
{
SubscriptionId = subscriptionId;

ClearMessageAwaiter();
}

public void Unsubscribe()
Expand All @@ -34,20 +33,20 @@ public void Unsubscribe()

public void AddNotification(object notification)
{
receivedNotifications.Add(notification);
NextMessageAwaiter?.TrySetResult(notification);
receivedNotifications.Push(notification);
nextMessageAwaiter?.TrySetResult(notification);

ClearMessageAwaiter();
}

public Task<object> GetNextNotificationTask()
{
NextMessageAwaiter = new();
return NextMessageAwaiter.Task;
nextMessageAwaiter ??= new();
return nextMessageAwaiter.Task;
}

private void ClearMessageAwaiter()
{
NextMessageAwaiter = null;
nextMessageAwaiter = null;
}
}
9 changes: 8 additions & 1 deletion publisher/src/LeanPipe.TestClient/LeanPipeTestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ public LeanPipeTestClient(
)
where TTopic : ITopic
{
var subscription = subscriptions.GetValueOrDefault(topic);

if (subscription?.SubscriptionId is not null)
{
throw new InvalidOperationException("Already subscribed to topic.");
}

if (hubConnection.State != HubConnectionState.Connected)
{
await ConnectAsync(ct);
Expand All @@ -112,7 +119,7 @@ public LeanPipeTestClient(

if (result?.Status == SubscriptionStatus.Success)
{
if (subscriptions.GetValueOrDefault(topic) is { } subscription)
if (subscription is not null)
{
subscription.Subscribe(result.SubscriptionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static async Task<object> GetNextNotificationTaskOn<TTopic>(
);
}

public static IReadOnlyList<object> NotificationsOn<TTopic>(
public static IReadOnlyCollection<object> NotificationsOn<TTopic>(
this LeanPipeTestClient client,
TTopic topic
)
Expand Down

0 comments on commit ddac890

Please sign in to comment.