From 506406f560a00e3758d778cb776ba4d5fc6d84b8 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 23 Oct 2023 04:31:32 -0500 Subject: [PATCH 1/6] Adds overload to BulkStateItem and GetBulkStateAsync to perform SDK-based deserialization of returned values instead of strictly returning serialized strings. Signed-off-by: Whit Waldo --- src/Dapr.Client/BulkStateItem.cs | 37 ++++++++++++++++++++ src/Dapr.Client/DaprClient.cs | 11 ++++++ src/Dapr.Client/DaprClientGrpc.cs | 49 +++++++++++++++++++++++++++ test/Dapr.Client.Test/StateApiTest.cs | 24 +++++++++++++ 4 files changed, 121 insertions(+) diff --git a/src/Dapr.Client/BulkStateItem.cs b/src/Dapr.Client/BulkStateItem.cs index fb717e1a8..6e7bf7923 100644 --- a/src/Dapr.Client/BulkStateItem.cs +++ b/src/Dapr.Client/BulkStateItem.cs @@ -49,4 +49,41 @@ public BulkStateItem(string key, string value, string etag) /// public string ETag { get; } } + + /// + /// Represents a state object returned from a bulk get state operation. + /// + public readonly struct BulkStateItem + { + /// + /// Initializes a new instance of the class. + /// + /// The state key. + /// The value. + /// The ETag. + /// + /// Application code should not need to create instances of . + /// + public BulkStateItem(string key, TValue value, string etag) + { + this.Key = key; + this.Value = value; + this.ETag = etag; + } + + /// + /// Gets the state key. + /// + public string Key { get; } + + /// + /// Gets the value. + /// + public TValue Value { get; } + + /// + /// Get the ETag. + /// + public string ETag { get; } + } } diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 2a7299c78..fbe43db72 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -714,6 +714,17 @@ public abstract Task InvokeMethodGrpcAsync( /// A that will return the list of values when the operation has completed. public abstract Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); + /// + /// Gets a list of deserialized values associated with the from the Dapr state store. + /// + /// The name of state store to read from. + /// The list of keys to get values for. + /// The number of concurrent get operations the Dapr runtime will issue to the state store. a value equal to or smaller than 0 means max parallelism. + /// A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used. + /// A that can be used to cancel the operation. + /// A that will return the list of deserialized values when the operation has completed. + public abstract Task>> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); + /// /// Saves a list of to the Dapr state store. /// diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index 75df09323..c370981a9 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -640,6 +640,55 @@ public override async Task> GetBulkStateAsync(strin return bulkResponse; } + /// + public override async Task>> GetBulkStateAsync(string storeName, + IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + if (keys.Count == 0) + throw new ArgumentException("keys do not contain any elements"); + + var envelope = new Autogenerated.GetBulkStateRequest() + { + StoreName = storeName, Parallelism = parallelism ?? default + }; + + if (metadata != null) + { + foreach (var kvp in metadata) + { + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + envelope.Keys.AddRange(keys); + + var options = CreateCallOptions(headers: null, cancellationToken); + Autogenerated.GetBulkStateResponse response; + + try + { + response = await client.GetBulkStateAsync(envelope, options); + } + catch (RpcException ex) + { + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); + } + + var bulkResponse = new List>(); + foreach (var item in response.Items) + { + var deserializedValue = + TypeConverters.FromJsonByteString(item.Data, this.JsonSerializerOptions); + bulkResponse.Add(new BulkStateItem(item.Key, deserializedValue, item.Etag)); + } + + return bulkResponse; + } + /// public override async Task GetStateAsync( string storeName, diff --git a/test/Dapr.Client.Test/StateApiTest.cs b/test/Dapr.Client.Test/StateApiTest.cs index 90c06e6b1..12eee9a66 100644 --- a/test/Dapr.Client.Test/StateApiTest.cs +++ b/test/Dapr.Client.Test/StateApiTest.cs @@ -75,6 +75,30 @@ public async Task GetBulkStateAsync_CanReadState() state.Should().HaveCount(1); } + [Fact] + public async Task GetBulkStateAsync_CanReadDeserializedState() + { + await using var client = TestClient.CreateForDaprClient(); + + var key = "test"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + return await daprClient.GetBulkStateAsync("testStore", new List() {key}, null); + }); + + // Create Response & Respond + const string size = "small"; + const string color = "yellow"; + var data = new Widget() {Size = size, Color = color}; + var envelope = MakeGetBulkStateResponse(key, data); + var state = await request.CompleteWithMessageAsync(envelope); + + // Get response and validate + state.Should().HaveCount(1); + state[0].Value.Size.Should().Match(size); + state[0].Value.Color.Should().Match(color); + } + [Fact] public async Task GetBulkStateAsync_WrapsRpcException() { From 213aa7fca1ce7455cfe913f61cc2ec194ab8904a Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 23 Oct 2023 16:02:15 -0500 Subject: [PATCH 2/6] Updated method summary to better direct user towards one method or the other (typed or not) Signed-off-by: Whit Waldo --- src/Dapr.Client/DaprClient.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index fbe43db72..6fe225e40 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -715,7 +715,10 @@ public abstract Task InvokeMethodGrpcAsync( public abstract Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); /// - /// Gets a list of deserialized values associated with the from the Dapr state store. + /// Gets a list of deserialized values associated with the from the Dapr state store. This overload should be used + /// if you expect the values of all the retrieved items to match the shape of the indicated . If you expect that + /// the values may differ in type from one another, do not specify the type parameter and instead use the original method + /// so the serialized string values will be returned instead. /// /// The name of state store to read from. /// The list of keys to get values for. From ce6ce113a22361b281617d70c101924180759455 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 23 Oct 2023 16:04:04 -0500 Subject: [PATCH 3/6] Added comments to the typed BulkStateItem to better reflect the deserialized nature of the value. Signed-off-by: Whit Waldo --- src/Dapr.Client/BulkStateItem.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Dapr.Client/BulkStateItem.cs b/src/Dapr.Client/BulkStateItem.cs index 6e7bf7923..503e6877a 100644 --- a/src/Dapr.Client/BulkStateItem.cs +++ b/src/Dapr.Client/BulkStateItem.cs @@ -51,7 +51,8 @@ public BulkStateItem(string key, string value, string etag) } /// - /// Represents a state object returned from a bulk get state operation. + /// Represents a state object returned from a bulk get state operation where the value has + /// been deserialized to the specified type. /// public readonly struct BulkStateItem { @@ -59,7 +60,7 @@ public readonly struct BulkStateItem /// Initializes a new instance of the class. /// /// The state key. - /// The value. + /// The typed value. /// The ETag. /// /// Application code should not need to create instances of . @@ -77,7 +78,7 @@ public BulkStateItem(string key, TValue value, string etag) public string Key { get; } /// - /// Gets the value. + /// Gets the deserialized value of the indicated type. /// public TValue Value { get; } From 2898481465933c9d402c8a66255edacc571dfa1e Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 23 Oct 2023 16:24:08 -0500 Subject: [PATCH 4/6] Refactored GetBulkStateAsync method to a shared private method so both the non-generic and generic public methods can deserialize the data once as necessary. Signed-off-by: Whit Waldo --- src/Dapr.Client/DaprClientGrpc.cs | 63 +++++++++++++------------------ 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index c370981a9..d86a0605e 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -597,51 +597,44 @@ public override async Task InvokeMethodGrpcAsync #region State Apis + /// public override async Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); - if (keys.Count == 0) - throw new ArgumentException("keys do not contain any elements"); + var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken); - var envelope = new Autogenerated.GetBulkStateRequest() - { - StoreName = storeName, - Parallelism = parallelism ?? default - }; - - if (metadata != null) + var bulkResponse = new List(); + foreach (var item in rawBulkState) { - foreach (var kvp in metadata) - { - envelope.Metadata.Add(kvp.Key, kvp.Value); - } + bulkResponse.Add(new BulkStateItem(item.Key, item.Value.ToStringUtf8(), item.Etag)); } - envelope.Keys.AddRange(keys); - - var options = CreateCallOptions(headers: null, cancellationToken); - Autogenerated.GetBulkStateResponse response; - - try - { - response = await client.GetBulkStateAsync(envelope, options); - } - catch (RpcException ex) - { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); - } + return bulkResponse; + } + + /// + public override async Task>> GetBulkStateAsync(string storeName, + IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken); - var bulkResponse = new List(); - foreach (var item in response.Items) + var bulkResponse = new List>(); + foreach (var item in rawBulkState) { - bulkResponse.Add(new BulkStateItem(item.Key, item.Data.ToStringUtf8(), item.Etag)); + var deserializedValue = + TypeConverters.FromJsonByteString(item.Value, this.JsonSerializerOptions); + bulkResponse.Add(new BulkStateItem(item.Key, deserializedValue, item.Etag)); } return bulkResponse; } - /// - public override async Task>> GetBulkStateAsync(string storeName, + /// + /// Retrieves the bulk state data, but rather than deserializing the values, leaves the specific handling + /// to the public callers of this method to avoid duplicate deserialization. + /// + private async Task> GetBulkStateRawAsync( + string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { @@ -678,12 +671,10 @@ public override async Task>> GetBulkStateAsy ex); } - var bulkResponse = new List>(); + var bulkResponse = new List<(string Key, string Etag, ByteString Value)>(); foreach (var item in response.Items) { - var deserializedValue = - TypeConverters.FromJsonByteString(item.Data, this.JsonSerializerOptions); - bulkResponse.Add(new BulkStateItem(item.Key, deserializedValue, item.Etag)); + bulkResponse.Add((item.Key, item.Etag, item.Data)); } return bulkResponse; From ce003269130e9001cfde12de4fa3f533376cf88b Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 23 Oct 2023 16:26:10 -0500 Subject: [PATCH 5/6] Removed excessive space in comment. Signed-off-by: Whit Waldo --- src/Dapr.Client/BulkStateItem.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dapr.Client/BulkStateItem.cs b/src/Dapr.Client/BulkStateItem.cs index 503e6877a..5b30ddf21 100644 --- a/src/Dapr.Client/BulkStateItem.cs +++ b/src/Dapr.Client/BulkStateItem.cs @@ -78,7 +78,7 @@ public BulkStateItem(string key, TValue value, string etag) public string Key { get; } /// - /// Gets the deserialized value of the indicated type. + /// Gets the deserialized value of the indicated type. /// public TValue Value { get; } From 003c0ca11457affb7e9f081ccffcf969d0348743 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 10 Jan 2024 16:00:50 -0600 Subject: [PATCH 6/6] Formatting: If we're separating parameters to separate lines, convention requires each have their own line. Signed-off-by: Whit Waldo --- src/Dapr.Client/DaprClientGrpc.cs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index d86a0605e..4b720485a 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -612,8 +612,11 @@ public override async Task> GetBulkStateAsync(strin } /// - public override async Task>> GetBulkStateAsync(string storeName, - IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, + public override async Task>> GetBulkStateAsync( + string storeName, + IReadOnlyList keys, + int? parallelism, + IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken); @@ -621,8 +624,7 @@ public override async Task>> GetBulkStateAsy var bulkResponse = new List>(); foreach (var item in rawBulkState) { - var deserializedValue = - TypeConverters.FromJsonByteString(item.Value, this.JsonSerializerOptions); + var deserializedValue = TypeConverters.FromJsonByteString(item.Value, this.JsonSerializerOptions); bulkResponse.Add(new BulkStateItem(item.Key, deserializedValue, item.Etag)); } @@ -635,7 +637,9 @@ public override async Task>> GetBulkStateAsy /// private async Task> GetBulkStateRawAsync( string storeName, - IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, + IReadOnlyList keys, + int? parallelism, + IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); @@ -644,7 +648,8 @@ public override async Task>> GetBulkStateAsy var envelope = new Autogenerated.GetBulkStateRequest() { - StoreName = storeName, Parallelism = parallelism ?? default + StoreName = storeName, + Parallelism = parallelism ?? default }; if (metadata != null)