Skip to content

Commit

Permalink
Add .NET client for pub/sub support - streaming subscriptions (#1381)
Browse files Browse the repository at this point in the history
* Building out Dapr.Messaging and test project for streaming pubsub subscriptions

Signed-off-by: Whit Waldo <[email protected]>

* Added copyright notices

Signed-off-by: Whit Waldo <[email protected]>

* Minor stylistic updates

Signed-off-by: Whit Waldo <[email protected]>

* Added generic client builder to support publish/subscribe client builder

Signed-off-by: Whit Waldo <[email protected]>

* Tweaked XML comment

Signed-off-by: Whit Waldo <[email protected]>

* Added several unit tests for the generic client builder

Signed-off-by: Whit Waldo <[email protected]>

* Updated to include latest review changes:
- Added lock so that while we guarantee the method is called only once, it should be thread-safe now
- Marked PublishSubscribeReceiver as internal so its members aren't part of the public API
- Updated TopicMessage to use IReadOnlyDictionary

Signed-off-by: Whit Waldo <[email protected]>

* Switched to interlock exchange instead of lock to slightly simplify code

Signed-off-by: Whit Waldo <[email protected]>

* Added sample project

Signed-off-by: Whit Waldo <[email protected]>

* Minor changes to unit test

Signed-off-by: Whit Waldo <[email protected]>

* Deleted protos folder

Signed-off-by: Whit Waldo <[email protected]>

* Using lowercase protos dir name

Signed-off-by: Whit Waldo <[email protected]>

* Added registration extension methods

Signed-off-by: Whit Waldo <[email protected]>

* Updated example to use DI registration

Signed-off-by: Whit Waldo <[email protected]>

* Added default cancellation token

Signed-off-by: Whit Waldo <[email protected]>

* Passing stream into method instead of creating it twice

Signed-off-by: Whit Waldo <[email protected]>

---------

Signed-off-by: Whit Waldo <[email protected]>
  • Loading branch information
WhitWaldo authored Nov 5, 2024
1 parent 682df6f commit 91ee78a
Show file tree
Hide file tree
Showing 19 changed files with 1,011 additions and 49 deletions.
98 changes: 49 additions & 49 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,51 +1,51 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="FluentAssertions" Version="5.9.0" />
<PackageVersion Include="GitHubActionsTestLogger" Version="1.1.2" />
<PackageVersion Include="Google.Api.CommonProtos" Version="2.2.0" />
<PackageVersion Include="Google.Protobuf" Version="3.28.2" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.66.0" />
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.66.0" />
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.35" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="6.0.35" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing.XUnit" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.3.0" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.3.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Http" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
<PackageVersion Include="MinVer" Version="2.3.0" />
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="protobuf-net.Grpc.AspNetCore" Version="1.2.2" />
<PackageVersion Include="Serilog.AspNetCore" Version="6.1.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageVersion Include="Serilog.Sinks.File" Version="5.0.0" />
<PackageVersion Include="System.Formats.Asn1" Version="6.0.1" />
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.extensibility.core" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="FluentAssertions" Version="5.9.0" />
<PackageVersion Include="GitHubActionsTestLogger" Version="1.1.2" />
<PackageVersion Include="Google.Api.CommonProtos" Version="2.2.0" />
<PackageVersion Include="Google.Protobuf" Version="3.28.2" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.66.0" />
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.66.0" />
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.35" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="6.0.35" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing.XUnit" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.3.0" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.3.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.4" />
<PackageVersion Include="Microsoft.Extensions.Http" Version="6.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
<PackageVersion Include="MinVer" Version="2.3.0" />
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="protobuf-net.Grpc.AspNetCore" Version="1.2.2" />
<PackageVersion Include="Serilog.AspNetCore" Version="6.1.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageVersion Include="Serilog.Sinks.File" Version="5.0.0" />
<PackageVersion Include="System.Formats.Asn1" Version="6.0.1" />
<PackageVersion Include="System.Text.Json" Version="6.0.10" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.extensibility.core" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
</Project>
20 changes: 20 additions & 0 deletions all.sln
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs", "src\Dapr.Jobs\Dapr.Jobs.csproj", "{C8BB6A85-A7EA-40C0-893D-F36F317829B3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs.Test", "test\Dapr.Jobs.Test\Dapr.Jobs.Test.csproj", "{BF9828E9-5597-4D42-AA6E-6E6C12214204}"
Expand Down Expand Up @@ -311,6 +316,18 @@ Global
{CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -379,6 +396,9 @@ Global
{DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
{C8BB6A85-A7EA-40C0-893D-F36F317829B3} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{BF9828E9-5597-4D42-AA6E-6E6C12214204} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{D9697361-232F-465D-A136-4561E0E88488} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Text;
using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient();
var app = builder.Build();

//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessageAsync, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
</ItemGroup>

</Project>
22 changes: 22 additions & 0 deletions src/Dapr.Messaging/Dapr.Messaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>This package contains the reference assemblies for developing messaging services using Dapr.</Description>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>Dapr.Messaging</PackageId>
<Title>Dapr Messaging SDK</Title>
<Description>Dapr Messaging SDK for building applications that utilize messaging components.</Description>
<VersionSuffix>alpha</VersionSuffix>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Dapr.Common\Dapr.Common.csproj" />
<ProjectReference Include="..\Dapr.Protos\Dapr.Protos.csproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// The base implementation of a Dapr pub/sub client.
/// </summary>
public abstract class DaprPublishSubscribeClient
{
/// <summary>
/// Dynamically subscribes to a Publish/Subscribe component and topic.
/// </summary>
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
/// <param name="topicName">The name of the topic to subscribe to.</param>
/// <param name="options">Configuration options.</param>
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Common;
using Microsoft.Extensions.Configuration;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// Builds a <see cref="DaprPublishSubscribeClient"/>.
/// </summary>
public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder<DaprPublishSubscribeClient>
{
/// <summary>
/// Used to initialize a new instance of the <see cref="DaprPublishSubscribeClientBuilder"/>.
/// </summary>
/// <param name="configuration">An optional instance of <see cref="IConfiguration"/>.</param>
public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration)
{
}

/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
/// <returns>The Dapr client instance.</returns>
/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
public override DaprPublishSubscribeClient Build()
{
var daprClientDependencies = BuildDaprClientDependencies();
var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel);

return new DaprPublishSubscribeGrpcClient(client);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using P = Dapr.Client.Autogen.Grpc.v1.Dapr;

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
{
private readonly P.DaprClient daprClient;

/// <summary>
/// Creates a new instance of a <see cref="DaprPublishSubscribeGrpcClient"/>
/// </summary>
public DaprPublishSubscribeGrpcClient(P.DaprClient client)
{
daprClient = client;
}

/// <summary>
/// Dynamically subscribes to a Publish/Subscribe component and topic.
/// </summary>
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
/// <param name="topicName">The name of the topic to subscribe to.</param>
/// <param name="options">Configuration options.</param>
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
{
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
}

Loading

0 comments on commit 91ee78a

Please sign in to comment.