Skip to content

Commit

Permalink
Merge pull request #27 from leancodepl/publisher/topic-keys-redesign
Browse files Browse the repository at this point in the history
Publisher topic keys redesign
  • Loading branch information
Dragemil authored Sep 22, 2023
2 parents 705a4af + 7f641ed commit f73f98f
Show file tree
Hide file tree
Showing 17 changed files with 456 additions and 92 deletions.
156 changes: 156 additions & 0 deletions publisher/src/LeanCode.Pipe/BasicTopicKeys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using LeanCode.Contracts;

namespace LeanCode.Pipe;

public abstract class BasicTopicKeys<TT> : ISubscribingKeys<TT>
where TT : ITopic
{
public abstract IEnumerable<string> Get(TT topic);

public ValueTask<IEnumerable<string>> GetForSubscribingAsync(
TT topic,
LeanPipeContext context
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1> : IPublishingKeys<TT, TN1>
where TT : ITopic, IProduceNotification<TN1>
where TN1 : notnull
{
public abstract IEnumerable<string> Get(TT topic);

public ValueTask<IEnumerable<string>> GetForSubscribingAsync(
TT topic,
LeanPipeContext context
) => ValueTask.FromResult(Get(topic));

public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN1 notification,
CancellationToken ct
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2>
: BasicTopicKeys<TT, TN1>,
IPublishingKeys<TT, TN2>
where TT : ITopic, IProduceNotification<TN1>, IProduceNotification<TN2>
where TN1 : notnull
where TN2 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN2 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2, TN3>
: BasicTopicKeys<TT, TN1, TN2>,
IPublishingKeys<TT, TN3>
where TT : ITopic,
IProduceNotification<TN1>,
IProduceNotification<TN2>,
IProduceNotification<TN3>
where TN1 : notnull
where TN2 : notnull
where TN3 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN3 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2, TN3, TN4>
: BasicTopicKeys<TT, TN1, TN2, TN3>,
IPublishingKeys<TT, TN4>
where TT : ITopic,
IProduceNotification<TN1>,
IProduceNotification<TN2>,
IProduceNotification<TN3>,
IProduceNotification<TN4>
where TN1 : notnull
where TN2 : notnull
where TN3 : notnull
where TN4 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN4 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2, TN3, TN4, TN5>
: BasicTopicKeys<TT, TN1, TN2, TN3, TN4>,
IPublishingKeys<TT, TN5>
where TT : ITopic,
IProduceNotification<TN1>,
IProduceNotification<TN2>,
IProduceNotification<TN3>,
IProduceNotification<TN4>,
IProduceNotification<TN5>
where TN1 : notnull
where TN2 : notnull
where TN3 : notnull
where TN4 : notnull
where TN5 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN5 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2, TN3, TN4, TN5, TN6>
: BasicTopicKeys<TT, TN1, TN2, TN3, TN4, TN5>,
IPublishingKeys<TT, TN6>
where TT : ITopic,
IProduceNotification<TN1>,
IProduceNotification<TN2>,
IProduceNotification<TN3>,
IProduceNotification<TN4>,
IProduceNotification<TN5>,
IProduceNotification<TN6>
where TN1 : notnull
where TN2 : notnull
where TN3 : notnull
where TN4 : notnull
where TN5 : notnull
where TN6 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN6 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}

public abstract class BasicTopicKeys<TT, TN1, TN2, TN3, TN4, TN5, TN6, TN7>
: BasicTopicKeys<TT, TN1, TN2, TN3, TN4, TN5, TN6>,
IPublishingKeys<TT, TN7>
where TT : ITopic,
IProduceNotification<TN1>,
IProduceNotification<TN2>,
IProduceNotification<TN3>,
IProduceNotification<TN4>,
IProduceNotification<TN5>,
IProduceNotification<TN6>,
IProduceNotification<TN7>
where TN1 : notnull
where TN2 : notnull
where TN3 : notnull
where TN4 : notnull
where TN5 : notnull
where TN6 : notnull
where TN7 : notnull
{
public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TT topic,
TN7 notification,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}
24 changes: 2 additions & 22 deletions publisher/src/LeanCode.Pipe/ITopicKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@

namespace LeanCode.Pipe;

public interface ITopicKeys<in TTopic>
public interface ISubscribingKeys<in TTopic>
where TTopic : ITopic
{
ValueTask<IEnumerable<string>> GetForSubscribingAsync(TTopic topic, LeanPipeContext context);
ValueTask<IEnumerable<string>> GetForPublishingAsync(
TTopic topic,
CancellationToken ct = default
);
}

public interface INotificationKeys<in TTopic, TNotification> : ITopicKeys<TTopic>
public interface IPublishingKeys<in TTopic, TNotification> : ISubscribingKeys<TTopic>
where TTopic : ITopic, IProduceNotification<TNotification>
where TNotification : notnull
{
Expand All @@ -22,19 +18,3 @@ ValueTask<IEnumerable<string>> GetForPublishingAsync(
CancellationToken ct = default
);
}

public abstract class BasicTopicKeys<TTopic> : ITopicKeys<TTopic>
where TTopic : ITopic
{
public abstract IEnumerable<string> Get(TTopic topic);

public ValueTask<IEnumerable<string>> GetForSubscribingAsync(
TTopic topic,
LeanPipeContext context
) => ValueTask.FromResult(Get(topic));

public ValueTask<IEnumerable<string>> GetForPublishingAsync(
TTopic topic,
CancellationToken ct = default
) => ValueTask.FromResult(Get(topic));
}
18 changes: 1 addition & 17 deletions publisher/src/LeanCode.Pipe/LeanPipePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,12 @@ public static async Task PublishAsync<TTopic, TNotification>(
where TNotification : notnull
{
var notificationKeys = publisher.ServiceProvider.GetRequiredService<
INotificationKeys<TTopic, TNotification>
IPublishingKeys<TTopic, TNotification>
>();

var keys = await notificationKeys.GetForPublishingAsync(topic, notification, ct);
var payload = NotificationEnvelope.Create(topic, notification);

await publisher.PublishAsync(keys, payload, ct);
}

public static async Task PublishToTopicAsync<TTopic, TNotification>(
this LeanPipePublisher<TTopic> publisher,
TTopic topic,
TNotification notification,
CancellationToken ct = default
)
where TTopic : ITopic, IProduceNotification<TNotification>
where TNotification : notnull
{
var topicKeys = publisher.ServiceProvider.GetRequiredService<ITopicKeys<TTopic>>();
var keys = await topicKeys.GetForPublishingAsync(topic, ct);
var payload = NotificationEnvelope.Create(topic, notification);

await publisher.PublishAsync(keys, payload, ct);
}
}
24 changes: 14 additions & 10 deletions publisher/src/LeanCode.Pipe/LeanPipeServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,17 @@ public LeanPipeServicesBuilder AddHandlers(TypesCatalog newHandlers)
typeof(ISubscriptionHandler<>),
ServiceLifetime.Transient
);
Services.RegisterGenericTypes(newHandlers, typeof(ITopicKeys<>), ServiceLifetime.Transient);
Services.RegisterGenericTypes(
newHandlers,
typeof(INotificationKeys<,>),
typeof(ISubscribingKeys<>),
ServiceLifetime.Transient
);
VerifyNotificationKeysImplementations();
Services.RegisterGenericTypes(
newHandlers,
typeof(IPublishingKeys<,>),
ServiceLifetime.Transient
);
VerifyPublishingKeysImplementations();
return this;
}

Expand All @@ -99,9 +103,9 @@ private void ReplaceDefaultEnvelopeDeserializer()
}
}

private void VerifyNotificationKeysImplementations()
private void VerifyPublishingKeysImplementations()
{
var notificationKeysType = typeof(INotificationKeys<,>);
var notificationKeysType = typeof(IPublishingKeys<,>);
var typesToCheck = new HashSet<(Type, Type)>();

foreach (var service in Services)
Expand All @@ -120,18 +124,18 @@ private void VerifyNotificationKeysImplementations()

foreach (var (topicType, keysType) in typesToCheck)
{
VerifyNotificationKeys(topicType, keysType);
VerifyPublishingKeys(topicType, keysType);
}
}

private static void VerifyNotificationKeys(Type topicType, Type keysType)
private static void VerifyPublishingKeys(Type topicType, Type keysType)
{
var producedNotifications = topicType
.FindInterfaces(Filter, typeof(IProduceNotification<>))
.Select(t => t.GenericTypeArguments[0])
.ToHashSet();
var implementedKeys = keysType
.FindInterfaces(Filter, typeof(INotificationKeys<,>))
.FindInterfaces(Filter, typeof(IPublishingKeys<,>))
.Select(t => t.GenericTypeArguments[1])
.ToHashSet();

Expand All @@ -140,9 +144,9 @@ private static void VerifyNotificationKeys(Type topicType, Type keysType)
if (implementedKeys.Count > 0 && missing.Any())
{
var msg = $"""
If topic keys implements `INotificationKeys`, it needs to be implemented for all notification types.
Topic must have implemented `IPublishingKeys` for all notification types.
The class `{keysType.FullName}` is missing following implementations:
{string.Join(", ", missing.Select(t => $" - INotificationKeys<{topicType.Name}, {t.Name}>"))}
{string.Join(", ", missing.Select(t => $" - IPublishingKeys<{topicType.Name}, {t.Name}>"))}
""";
throw new InvalidOperationException(msg);
}
Expand Down
10 changes: 5 additions & 5 deletions publisher/src/LeanCode.Pipe/SubscriptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ LeanPipeContext context
public class KeyedSubscriptionHandler<TTopic> : ISubscriptionHandler<TTopic>
where TTopic : ITopic
{
private readonly ITopicKeys<TTopic> topicKeys;
private readonly ISubscribingKeys<TTopic> subscribingKeys;

public KeyedSubscriptionHandler(ITopicKeys<TTopic> topicKeys)
public KeyedSubscriptionHandler(ISubscribingKeys<TTopic> subscribingKeys)
{
this.topicKeys = topicKeys;
this.subscribingKeys = subscribingKeys;
}

public async ValueTask<bool> OnSubscribedAsync(
Expand All @@ -36,7 +36,7 @@ public async ValueTask<bool> OnSubscribedAsync(
LeanPipeContext context
)
{
var keys = await topicKeys.GetForSubscribingAsync(topic, context);
var keys = await subscribingKeys.GetForSubscribingAsync(topic, context);

var tasks = keys.Select(
key =>
Expand All @@ -61,7 +61,7 @@ LeanPipeContext context
// With this implementation there is a problem of "higher level" groups:
// if we subscribe to topic.something and topic.something.specific,
// then we do not know when to unsubscribe from topic.something
var keys = await topicKeys.GetForSubscribingAsync(topic, context);
var keys = await subscribingKeys.GetForSubscribingAsync(topic, context);

var tasks = keys.Select(
key =>
Expand Down
42 changes: 40 additions & 2 deletions publisher/test/LeanCode.Pipe.IntegrationTests/App/ApiHandlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ CancellationToken ct
switch (notificationData.Kind)
{
case NotificationKindDTO.Greeting:
await publisher.PublishToTopicAsync(topic, CreateGreeting(notificationData), ct);
await publisher.PublishAsync(topic, CreateGreeting(notificationData), ct);
break;
case NotificationKindDTO.Farewell:
await publisher.PublishToTopicAsync(topic, CreateFarewell(notificationData), ct);
await publisher.PublishAsync(topic, CreateFarewell(notificationData), ct);
break;
default:
throw new InvalidOperationException(
Expand All @@ -29,9 +29,47 @@ CancellationToken ct
}
}

public static async Task PublishProjectUpdatedOrDeletedAsync(
LeanPipePublisher<MyFavouriteProjectsTopic> publisher,
MyFavouriteProjectsTopic topic,
ProjectNotificationDataDTO projectNotificationData,
CancellationToken ct
)
{
switch (projectNotificationData.Kind)
{
case ProjectNotificationKindDTO.Updated:
await publisher.PublishAsync(
topic,
CreateProjectUpdated(projectNotificationData),
ct
);
break;
case ProjectNotificationKindDTO.Deleted:
await publisher.PublishAsync(
topic,
CreateProjectDeleted(projectNotificationData),
ct
);
break;
default:
throw new InvalidOperationException(
$"Invalid project notification kind {projectNotificationData.Kind}"
);
}
}

private static GreetingNotificationDTO CreateGreeting(NotificationDataDTO notificationData) =>
new() { Greeting = $"Hello {notificationData.Name}" };

private static FarewellNotificationDTO CreateFarewell(NotificationDataDTO notificationData) =>
new() { Farewell = $"Goodbye {notificationData.Name}" };

private static ProjectUpdatedNotificationDTO CreateProjectUpdated(
ProjectNotificationDataDTO notificationData
) => new() { ProjectId = notificationData.ProjectId };

private static ProjectDeletedNotificationDTO CreateProjectDeleted(
ProjectNotificationDataDTO notificationData
) => new() { ProjectId = notificationData.ProjectId };
}
Loading

0 comments on commit f73f98f

Please sign in to comment.