Skip to content

Commit

Permalink
Support IObservable input/output (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgravell authored Apr 13, 2022
1 parent 7e93f20 commit 530cd4c
Show file tree
Hide file tree
Showing 10 changed files with 1,023 additions and 245 deletions.
2 changes: 1 addition & 1 deletion src/protobuf-net.Grpc/Configuration/ServerBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public int Bind(object state, Type serviceType, BinderConfiguration? binderConfi
var bindCtx = new ServiceBindContext(serviceContract, serviceType, state, binderConfiguration.Binder);
foreach (var op in ContractOperation.FindOperations(binderConfiguration, serviceContract, this))
{
if (ServerInvokerLookup.TryGetValue(op.MethodType, op.Context, op.Result, op.Void, out var invoker)
if (ServerInvokerLookup.TryGetValue(op.MethodType, op.Context, op.Arg, op.Result, op.Void, out var invoker)
&& AddMethod(op.From, op.To, op.Name, op.Method, op.MethodType, invoker, bindCtx,
serviceContractSimplifiedExceptions || op.Method.IsDefined(typeof(SimpleRpcExceptionsAttribute))
))
Expand Down
223 changes: 132 additions & 91 deletions src/protobuf-net.Grpc/Internal/ContractOperation.cs

Large diffs are not rendered by default.

16 changes: 11 additions & 5 deletions src/protobuf-net.Grpc/Internal/MetadataContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ internal void SetTrailers(RpcException fault)
}
}

internal void SetTrailers<T>(T call, Func<T, Status> getStatus, Func<T, Metadata> getMetadata)
internal void SetTrailers<T>(T? call, Func<T, Status> getStatus, Func<T, Metadata> getMetadata)
where T : class
{
if (call is null) return;
try
{
_trailers = getMetadata(call) ?? Metadata.Empty;
Expand All @@ -88,10 +89,15 @@ internal void SetTrailers<T>(T call, Func<T, Status> getStatus, Func<T, Metadata
}
}

internal ValueTask SetHeadersAsync(Task<Metadata> headers)
internal ValueTask SetHeadersAsync(Task<Metadata>? headers)
{
if (headers is null) return default;
var tcs = Interlocked.CompareExchange(ref _headersTaskOrSource, headers, null) as TaskCompletionSource<Metadata>;
if (headers.RanToCompletion())
if (tcs is null)
{
return new ValueTask(headers);
}
else if (headers.RanToCompletion())
{
// headers are sync; update TCS if one
tcs?.TrySetResult(headers.Result);
Expand All @@ -103,11 +109,11 @@ internal ValueTask SetHeadersAsync(Task<Metadata> headers)
return Awaited(this, tcs, headers);
}

static async ValueTask Awaited(MetadataContext context, TaskCompletionSource<Metadata>? tcs, Task<Metadata> headers)
static async ValueTask Awaited(MetadataContext context, TaskCompletionSource<Metadata> tcs, Task<Metadata> headers)
{
try
{
tcs?.TrySetResult(await headers.ConfigureAwait(false));
tcs.TrySetResult(await headers.ConfigureAwait(false));
}
catch (RpcException fault)
{
Expand Down
Loading

0 comments on commit 530cd4c

Please sign in to comment.