Skip to content

Commit

Permalink
Ensure the draft is actually working
Browse files Browse the repository at this point in the history
  • Loading branch information
Dragemil committed Oct 5, 2023
1 parent e98c51b commit 3697a43
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 31 deletions.
2 changes: 2 additions & 0 deletions publisher/Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
<PackageReference Update="LeanCode.Components" Version="$(CoreLibVersion)" />
<PackageReference Update="LeanCode.CQRS.Security" Version="$(CoreLibVersion)" />
<PackageReference Update="LeanCode.IntegrationTestHelpers" Version="$(CoreLibVersion)" />
<PackageReference Update="LeanCode.Logging" Version="$(CoreLibVersion)" />

<PackageReference Update="MassTransit.RabbitMQ" Version="8.1.1" />
<PackageReference Update="MassTransit.SignalR" Version="8.1.1" />

<PackageReference Update="Microsoft.AspNetCore.SignalR.Client" Version="8.0.0-rc.1.23421.29" />
Expand Down
42 changes: 42 additions & 0 deletions publisher/src/LeanCode.Pipe/Funnel/ClaimsPrincipalSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Security.Claims;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace LeanCode.Pipe.Funnel;

public class ClaimsPrincipalJsonConverter : JsonConverter<ClaimsPrincipal>
{
public static ClaimsPrincipalJsonConverter Instance = new();

public override ClaimsPrincipal? Read(
ref Utf8JsonReader reader,
Type typeToConvert,
JsonSerializerOptions options
)
{
var base64 = reader.GetString();

if (base64 is null)
{
return null;
}

using var ms = new MemoryStream(Convert.FromBase64String(base64));
using var binaryReader = new BinaryReader(ms);

return new(binaryReader);
}

public override void Write(
Utf8JsonWriter writer,
ClaimsPrincipal value,
JsonSerializerOptions options
)
{
using var ms = new MemoryStream();
using var binaryWriter = new BinaryWriter(ms);

value.WriteTo(binaryWriter);
writer.WriteStringValue(Convert.ToBase64String(ms.ToArray()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using LeanCode.Contracts;

namespace LeanCode.Pipe.Funnel;

public record ExecuteTopicsSubscriptionPipeline(
SubscriptionEnvelope Envelope,
OperationType OperationType,
LeanPipeContext Context
);
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TypesCatalog handlers
)
{
services.AddTransient<LeanPipeSecurity>();
services.AddTransient(typeof(ISubscriptionExecutor), typeof(SubscriptionExecutor));
services.AddTransient<SubscriptionExecutor>();
services.AddTransient(typeof(ISubscriptionHandler<>), typeof(KeyedSubscriptionHandler<>));
services.AddTransient(typeof(ILeanPipePublisher<>), typeof(FunnelledLeanPipePublisher<>));
services.AddTransient<SubscriptionHandlerResolver>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@

namespace LeanCode.Pipe.Funnel.FunnelledService;

public record ExecuteTopicsSubscriptionPipeline(
SubscriptionEnvelope Envelope,
OperationType OperationType,
LeanPipeContext Context
);

public record SubscriptionPipelineResult(SubscriptionStatus Status, List<string> GroupKeys);

public class FunnelledSubscriber<TTopic> : IConsumer<ExecuteTopicsSubscriptionPipeline>
where TTopic : ITopic
{
Expand All @@ -35,26 +27,17 @@ public async Task Consume(ConsumeContext<ExecuteTopicsSubscriptionPipeline> cont
context.CancellationToken
);

var groupKeys = subscribeContext.GroupKeys!;

await context.RespondAsync<SubscriptionPipelineResult>(
new(subscriptionStatus, groupKeys.ToList())
new(subscriptionStatus, subscribeContext.GroupKeys?.ToList() ?? new())
);
}
}

public class FunnelledSubscriberDefinition<TTopic> : ConsumerDefinition<FunnelledSubscriber<TTopic>>
where TTopic : ITopic
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<FunnelledSubscriber<TTopic>> consumerConfigurator,
IRegistrationContext context
)
public FunnelledSubscriberDefinition()
{
Endpoint(cfg =>
{
cfg.Name = FunnelledSubscriberEndpointNameProvider.GetName<TTopic>();
});
EndpointName = FunnelledSubscriberEndpointNameProvider.GetName<TTopic>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public Task RemoveFromGroupsAsync(IEnumerable<string> groupKeys, CancellationTok

private void PopulateGroupKeys(IEnumerable<string> groupKeys)
{
if (GroupKeys is not null)
if (GroupKeys is null)
{
GroupKeys = groupKeys;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using LeanCode.Contracts;
using LeanCode.Pipe.Funnel.FunnelledService;
using MassTransit;

namespace LeanCode.Pipe.Funnel.Instance;

public class FunnelSubscriptionExecutor : ISubscriptionExecutor
{
private readonly Serilog.ILogger logger = Serilog.Log.ForContext<FunnelSubscriptionExecutor>();

private readonly IEndpointNameFormatter endpointNameFormatter;
private readonly IBus bus;

Expand All @@ -23,12 +24,14 @@ public async Task<SubscriptionStatus> ExecuteAsync(
CancellationToken ct
)
{
var endpointUri = new Uri(
endpointNameFormatter.SanitizeName(
FunnelledSubscriberEndpointNameProvider.GetName(envelope.TopicType)
)
var endpoint = endpointNameFormatter.SanitizeName(
FunnelledSubscriberEndpointNameProvider.GetName(envelope.TopicType)
);

var endpointPrefix = bus.Topology is IRabbitMqBusTopology ? "exchange" : "topic";

var endpointUri = new Uri($"{endpointPrefix}:{endpoint}");

var subscriberRequestClient = bus.CreateRequestClient<ExecuteTopicsSubscriptionPipeline>(
endpointUri
);
Expand All @@ -37,6 +40,16 @@ CancellationToken ct
{
var response = await subscriberRequestClient.GetResponse<SubscriptionPipelineResult>(
new(envelope, type, context),
rpc =>
{
rpc.UseExecute(ctx =>
{
if (ctx is RabbitMqSendContext rctx)
{
rctx.Mandatory = true;
}
});
},
ct
);

Expand All @@ -58,7 +71,7 @@ CancellationToken ct
}
catch (Exception e)
{
// TODO: Log error
logger.Error(e, "LeanPipe subscription executor failed");
return SubscriptionStatus.InternalServerError;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using LeanCode.Contracts;

namespace LeanCode.Pipe.Funnel;

public record SubscriptionPipelineResult(SubscriptionStatus Status, List<string> GroupKeys);
4 changes: 3 additions & 1 deletion publisher/src/LeanCode.Pipe/LeanCode.Pipe.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

<ItemGroup>
<ProjectReference Include="../../../../corelibrary/src/CQRS/LeanCode.CQRS.Security/LeanCode.CQRS.Security.csproj" />
<!-- <PackageReference Include="LeanCode.CQRS.Security" />-->
<PackageReference Include="LeanCode.Components" />
<PackageReference Include="LeanCode.Contracts" />
<!-- <PackageReference Include="LeanCode.CQRS.Security" />-->
<PackageReference Include="LeanCode.Logging" />

<PackageReference Include="MassTransit.RabbitMQ" />
<PackageReference Include="MassTransit.SignalR" />
</ItemGroup>

Expand Down
7 changes: 5 additions & 2 deletions publisher/src/LeanCode.Pipe/LeanPipeSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ private async Task ExecuteAsync(SubscriptionEnvelope envelope, OperationType typ
await NotifyResultAsync(new(envelope.Id, subscriptionStatus, type));
}

public Task AddToGroupsAsync(IEnumerable<string> groupKeys, CancellationToken ct)
Task ISubscribeContext.AddToGroupsAsync(IEnumerable<string> groupKeys, CancellationToken ct)
{
var tasks = groupKeys.Select(key => Groups.AddToGroupAsync(Context.ConnectionId, key, ct));

return Task.WhenAll(tasks);
}

public Task RemoveFromGroupsAsync(IEnumerable<string> groupKeys, CancellationToken ct)
Task ISubscribeContext.RemoveFromGroupsAsync(
IEnumerable<string> groupKeys,
CancellationToken ct
)
{
var tasks = groupKeys.Select(
key => Groups.RemoveFromGroupAsync(Context.ConnectionId, key, ct)
Expand Down

0 comments on commit 3697a43

Please sign in to comment.