Skip to content

Commit

Permalink
Merge pull request #32 from leancodepl/publisher/extend-notifications…
Browse files Browse the repository at this point in the history
…-await-test-client-api

Extend notification awaiting API in the test client
  • Loading branch information
Dragemil authored Oct 3, 2023
2 parents 8fe7115 + 2bf4f5a commit 8d6a009
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 53 deletions.
29 changes: 26 additions & 3 deletions publisher/src/LeanCode.Pipe.TestClient/LeanPipeSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,34 @@ public void AddNotification(object notification)
}
}

public Task<object> WaitForNextNotification()
public Task<object> WaitForNextNotification(CancellationToken ct = default)
{
lock (notificationMutex)
var nextNotificationTask = GetNextNotificationTask();

if (ct.CanBeCanceled)
{
if (ct.IsCancellationRequested)
{
return Task.FromException<object>(new TaskCanceledException());
}

var tcs = new TaskCompletionSource<object>(
TaskCreationOptions.RunContinuationsAsynchronously
);

return Task.WhenAny(tcs.Task, nextNotificationTask).Unwrap();
}
else
{
return nextNotificationTask;
}

Task<object> GetNextNotificationTask()
{
return nextMessageAwaiter.Task;
lock (notificationMutex)
{
return nextMessageAwaiter.Task;
}
}
}
}
40 changes: 15 additions & 25 deletions publisher/src/LeanCode.Pipe.TestClient/LeanPipeTestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public LeanPipeTestClient(
);
}

/// <returns>Unsubscription result or null if the request times out.</returns>
public async Task<SubscriptionResult?> UnsubscribeAsync<TTopic>(
public async Task<SubscriptionResult> UnsubscribeAsync<TTopic>(
TTopic topic,
CancellationToken ct = default
)
Expand All @@ -83,7 +82,7 @@ public LeanPipeTestClient(
result = await ManageSubscriptionCoreAsync(topic, OperationType.Unsubscribe, ct);
}

if (result?.Status == SubscriptionStatus.Success)
if (result.Status == SubscriptionStatus.Success)
{
subscriptions.GetValueOrDefault(topic)?.Unsubscribe();
}
Expand All @@ -92,8 +91,8 @@ public LeanPipeTestClient(
}

/// <remarks>Connects if there is no active connection.</remarks>
/// <returns>Subscription result or null if the request times out.</returns>
public async Task<SubscriptionResult?> SubscribeAsync<TTopic>(
/// <exception cref="InvalidOperationException">Already subscribed to the topic instance.</exception>
public async Task<SubscriptionResult> SubscribeAsync<TTopic>(
TTopic topic,
CancellationToken ct = default
)
Expand All @@ -103,7 +102,7 @@ public LeanPipeTestClient(

if (subscription?.SubscriptionId is not null)
{
throw new InvalidOperationException("Already subscribed to topic.");
throw new InvalidOperationException("Already subscribed to topic instance.");
}

if (hubConnection.State != HubConnectionState.Connected)
Expand All @@ -113,7 +112,7 @@ public LeanPipeTestClient(

var result = await ManageSubscriptionCoreAsync(topic, OperationType.Subscribe, ct);

if (result?.Status == SubscriptionStatus.Success)
if (result.Status == SubscriptionStatus.Success)
{
if (subscription is not null)
{
Expand Down Expand Up @@ -148,7 +147,7 @@ public async ValueTask DisposeAsync()
GC.SuppressFinalize(this);
}

private async Task<SubscriptionResult?> ManageSubscriptionCoreAsync<TTopic>(
private async Task<SubscriptionResult> ManageSubscriptionCoreAsync<TTopic>(
TTopic topic,
OperationType operationType,
CancellationToken ct
Expand All @@ -166,6 +165,13 @@ CancellationToken ct

var subscriptionCompletionSource = new TaskCompletionSource<SubscriptionResult>();

using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(subscriptionCompletionTimeout);

await using var ctRegistration = cts.Token.Register(
() => subscriptionCompletionSource.TrySetCanceled()
);

using var subscriptionResponseCallback = hubConnection.On<SubscriptionResult>(
"subscriptionResult",
r =>
Expand Down Expand Up @@ -193,22 +199,6 @@ await hubConnection.InvokeAsync(
ct
);

return await AwaitWithTimeout(
subscriptionCompletionSource.Task,
subscriptionCompletionTimeout,
ct
);
}

internal static async Task<TResult?> AwaitWithTimeout<TResult>(
Task<TResult> task,
TimeSpan timeout,
CancellationToken ct
)
where TResult : class
{
return await Task.WhenAny(task, Task.Delay(timeout, ct)) == task
? task.GetAwaiter().GetResult()
: null;
return await subscriptionCompletionSource.Task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static async Task<Guid> SubscribeSuccessAsync<TTopic>(
var subscriptionResult = await client.SubscribeAsync(topic, ct);

if (
subscriptionResult?.Type == OperationType.Subscribe
subscriptionResult.Type == OperationType.Subscribe
&& subscriptionResult.Status == SubscriptionStatus.Success
)
{
Expand Down Expand Up @@ -52,7 +52,7 @@ public static async Task<Guid> UnsubscribeSuccessAsync<TTopic>(
var subscriptionResult = await client.UnsubscribeAsync(topic, ct);

if (
subscriptionResult?.Type == OperationType.Unsubscribe
subscriptionResult.Type == OperationType.Unsubscribe
&& subscriptionResult.Status == SubscriptionStatus.Success
)
{
Expand All @@ -67,35 +67,70 @@ public static async Task<Guid> UnsubscribeSuccessAsync<TTopic>(
}

/// <summary>
/// Returns a task, which completes when the next notification on the topic is received.
/// Returns a task, which completes when subscription on the topic receives next notification satisfying the predicate.
/// </summary>
/// <remarks>
/// The task should be collected before the action that triggers the notification to be published
/// and it should be awaited after the triggering event. Otherwise there is a possibility that
/// the awaited notification is a notification subsequent to the expected one.
/// </remarks>
/// <param name="topic">Topic instance, on which notification is to be awaited.</param>
/// <param name="notificationPredicate">Specifies notification to wait for.</param>
/// <param name="timeout">Timeout, after which the notification is assumed to be not delivered.</param>
/// <returns>Task containing the received notification.</returns>
/// <exception cref="InvalidOperationException">The topic instance received no notifications before the timeout.</exception>
public static async Task<object> WaitForNextNotificationOn<TTopic>(
this LeanPipeTestClient client,
TTopic topic,
Func<object, bool>? notificationPredicate = null,
TimeSpan? timeout = null,
CancellationToken ct = default
)
where TTopic : ITopic
{
var notificationTask = client.Subscriptions[topic].WaitForNextNotification();
notificationPredicate ??= _ => true;

return await LeanPipeTestClient.AwaitWithTimeout(
notificationTask,
timeout ?? DefaultNotificationAwaitTimeout,
ct
object notification;

using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(timeout ?? DefaultNotificationAwaitTimeout);

while (
!notificationPredicate(
notification = await client.Subscriptions[topic].WaitForNextNotification(cts.Token)
)
?? throw new InvalidOperationException(
"LeanPipe test client did not receive any notification on topic."
) { }

return notification;
}

/// <inheritdoc cref="WaitForNextNotificationOn{TTopic}"/>
/// <summary>
/// Returns a task, which completes when subscription on the topic receives next notification of the specified type,
/// satisfying the predicate.
/// </summary>
public static async Task<TNotification> WaitForNextNotificationOn<TTopic, TNotification>(
this LeanPipeTestClient client,
TTopic topic,
Func<TNotification, bool>? notificationPredicate = null,
TimeSpan? timeout = null,
CancellationToken ct = default
)
where TTopic : ITopic
where TNotification : notnull
{
notificationPredicate ??= _ => true;

return (TNotification)
await WaitForNextNotificationOn(
client,
topic,
NotificationAndTypePredicate,
timeout,
ct
);

bool NotificationAndTypePredicate(object n) =>
n is TNotification tn && notificationPredicate(tn);
}

/// <returns>A FIFO collection of received notifications on topic instance.</returns>
Expand Down
2 changes: 2 additions & 0 deletions publisher/src/LeanCode.Pipe/LeanPipePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static class LeanPipePublisherExtensions
/// Publishes to topic instance using provided SignalR groups keys.
/// Ignores publishing keys implemented in <see cref="IPublishingKeys{TTopic,TNotification}"/>.
/// </summary>
/// <remarks>Does not wait for a response from the receivers.</remarks>
public static async Task PublishAsync<TTopic, TNotification>(
this LeanPipePublisher<TTopic> publisher,
IEnumerable<string> keys,
Expand All @@ -58,6 +59,7 @@ public static async Task PublishAsync<TTopic, TNotification>(
/// Publishes to topic instance using SignalR groups keys generated via implementation of
/// <see cref="IPublishingKeys{TTopic,TNotification}"/>.
/// </summary>
/// <remarks>Does not wait for a response from the receivers.</remarks>
public static async Task PublishAsync<TTopic, TNotification>(
this LeanPipePublisher<TTopic> publisher,
TTopic topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ namespace LeanCode.Pipe.IntegrationTests;

public static class PublishingExtensions
{
public const string SimpleTopicPublishEndpoint = "/publish_simple";
public const string DynamicTopicPublishEndpoint = "/publish_dynamic";
public const string AuthorizedTopicPublishEndpoint = "/publish_authorized";

public static Task PublishToSimpleTopicAndAwaitNotificationAsync<TNotification>(
this HttpClient client,
NotificationDataDTO notificationData,
LeanPipeTestClient leanPipeClient,
SimpleTopic topic,
TNotification expectedNotification
)
where TNotification : notnull
{
return PostToPublishAndCheckNotificationAsync(
client,
"/publish_simple",
SimpleTopicPublishEndpoint,
notificationData,
leanPipeClient,
topic,
Expand All @@ -33,7 +38,7 @@ NotificationDataDTO notificationData
{
return PostToPublishAndAwaitNoNotificationsAsync(
client,
"/publish_simple",
SimpleTopicPublishEndpoint,
notificationData
);
}
Expand All @@ -45,10 +50,11 @@ public static Task PublishToDynamicTopicAndAwaitNotificationAsync<TNotification>
MyFavouriteProjectsTopic topic,
TNotification expectedNotification
)
where TNotification : notnull
{
return PostToPublishAndCheckNotificationAsync(
client,
"/publish_dynamic",
DynamicTopicPublishEndpoint,
notificationData,
leanPipeClient,
topic,
Expand All @@ -63,7 +69,7 @@ ProjectNotificationDataDTO notificationData
{
return PostToPublishAndAwaitNoNotificationsAsync(
client,
"/publish_dynamic",
DynamicTopicPublishEndpoint,
notificationData
);
}
Expand All @@ -75,10 +81,11 @@ public static Task PublishToAuthorizedTopicAndAwaitNotificationAsync<TNotificati
AuthorizedTopic topic,
TNotification expectedNotification
)
where TNotification : notnull
{
return PostToPublishAndCheckNotificationAsync(
client,
"/publish_authorized",
AuthorizedTopicPublishEndpoint,
notificationData,
leanPipeClient,
topic,
Expand All @@ -93,7 +100,7 @@ NotificationDataDTO notificationData
{
return PostToPublishAndAwaitNoNotificationsAsync(
client,
"/publish_authorized",
AuthorizedTopicPublishEndpoint,
notificationData
);
}
Expand All @@ -111,15 +118,15 @@ private static async Task PostToPublishAndCheckNotificationAsync<
TNotification expectedNotification
)
where TTopic : ITopic
where TNotification : notnull
{
var notificationTask = leanPipeClient.WaitForNextNotificationOn(topic);
var notificationTask = leanPipeClient.WaitForNextNotificationOn<TTopic, TNotification>(
topic
);

await PostAsync(client, uri, payload);
await PostAndEnsureSuccessAsync(client, uri, payload);

(await notificationTask)
.Should()
.BeOfType<TNotification>()
.And.BeEquivalentTo(expectedNotification);
(await notificationTask).Should().BeEquivalentTo(expectedNotification);
}

private static async Task PostToPublishAndAwaitNoNotificationsAsync<TPayload>(
Expand All @@ -129,12 +136,16 @@ private static async Task PostToPublishAndAwaitNoNotificationsAsync<TPayload>(
TimeSpan? awaitTime = null
)
{
await PostAsync(client, uri, payload);
await PostAndEnsureSuccessAsync(client, uri, payload);

await Task.Delay(awaitTime ?? TimeSpan.FromSeconds(1));
}

private static async Task PostAsync<TPayload>(HttpClient client, string uri, TPayload payload)
public static async Task PostAndEnsureSuccessAsync<TPayload>(
this HttpClient client,
string uri,
TPayload payload
)
{
using var response = await client.PostAsJsonAsync(uri, payload);

Expand Down
Loading

0 comments on commit 8d6a009

Please sign in to comment.