Skip to content

Commit

Permalink
Implement Stream api support (#341)
Browse files Browse the repository at this point in the history
* client-to-server spike logic; not tested

* server bindings (and binding tests, no integration test yet)

* most basic of basic integration tests

* optimize TryFastParse; needs tests

* verify behaviour in all expected scenarios

* for compat: don't demand the trailer

Signed-off-by: Marc Gravell <[email protected]>

* nit

* marshaller validation

* release notes

* fix StreamRewriteBasicTest (timing brittleness)

---------

Signed-off-by: Marc Gravell <[email protected]>
  • Loading branch information
mgravell authored Oct 14, 2024
1 parent bd09eed commit d90ecd4
Show file tree
Hide file tree
Showing 29 changed files with 1,749 additions and 120 deletions.
7 changes: 5 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
<PackageVersion Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageVersion Include="Nerdbank.GitVersioning" Version="3.6.143" />
<PackageVersion Include="System.IO.Pipelines" Version="8.0.0" />
<PackageVersion Include="System.Memory" Version="4.5.5" />
<PackageVersion Include="System.Reactive" Version="6.0.1" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<PackageVersion Include="System.ServiceModel.Primitives" Version="8.0.0" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
<PackageVersion Include="System.ServiceModel.Primitives" Version="8.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<PackageVersion Include="System.ValueTuple" Version="4.5.0" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="TaskBuilder.fs" Version="2.1.0" />
Expand Down
6 changes: 6 additions & 0 deletions docs/releasenotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

## unreleased

## 1.2.0

- support `[Value]Task<Stream>` as a return value, rewriting via [`stream BytesValue`](https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/wrappers.proto) - first
step in [#340](https://github.com/protobuf-net/protobuf-net.Grpc/issues/340)
- update library references and TFMs
- improve handling of `IAsyncDisposable`
- improve error message when binding methods ([#331](https://github.com/protobuf-net/protobuf-net.Grpc/pull/331) via BasConijn)

## 1.1.1

Expand Down
4 changes: 2 additions & 2 deletions examples/pb-net/JustProtos/SomeType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ static void Foo()
{
// the point here being: these types *exist*, despite
// not appearing as local .cs files
Type[] types = {
Type[] types = [
typeof(DescriptorProto),
typeof(TimeResult),
typeof(MultiplyRequest),
};
];
_ = types;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/protobuf-net.Grpc.Reflection/SchemaGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public string GetSchema<TService>()
/// this method need to remain for backward compatibility for client which will get this updated version, without recompilation.
/// Thus, this method mustn't be deleted.</remarks>
public string GetSchema(Type contractType)
=> GetSchema(new [] {contractType});
=> GetSchema([contractType]);

/// <summary>
/// Get the .proto schema associated with multiple service contracts
Expand Down
2 changes: 1 addition & 1 deletion src/protobuf-net.Grpc/Configuration/BinderConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace ProtoBuf.Grpc.Configuration
public sealed class BinderConfiguration
{
// this *must* stay above Default - .cctor order is file order
static readonly MarshallerFactory[] s_defaultFactories = new MarshallerFactory[] { ProtoBufMarshallerFactory.Default, ProtoBufMarshallerFactory.GoogleProtobuf };
static readonly MarshallerFactory[] s_defaultFactories = [ProtoBufMarshallerFactory.Default, ProtoBufMarshallerFactory.GoogleProtobuf];

/// <summary>
/// Use the default MarshallerFactory and ServiceBinder
Expand Down
13 changes: 4 additions & 9 deletions src/protobuf-net.Grpc/Configuration/ClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,17 @@ public virtual GrpcClient CreateClient(CallInvoker channel, Type contractType)
=> new GrpcClient(channel, contractType, BinderConfiguration);


private sealed class ConfiguredClientFactory : ClientFactory
private sealed class ConfiguredClientFactory(BinderConfiguration? binderConfiguration) : ClientFactory
{
protected override BinderConfiguration BinderConfiguration { get; }

public ConfiguredClientFactory(BinderConfiguration? binderConfiguration)
{
BinderConfiguration = binderConfiguration ?? BinderConfiguration.Default;
}
protected override BinderConfiguration BinderConfiguration { get; } = binderConfiguration ?? BinderConfiguration.Default;

private readonly ConcurrentDictionary<Type, object> _proxyCache = new ConcurrentDictionary<Type, object>();

[MethodImpl(MethodImplOptions.NoInlining)]
private TService SlowCreateClient<TService>(CallInvoker channel)
where TService : class
{
var factory = ProxyEmitter.CreateFactory<TService>(BinderConfiguration);
var factory = ProxyEmitter.CreateFactory<TService>(BinderConfiguration, null);
var key = typeof(TService);

if (!_proxyCache.TryAdd(key, factory)) factory = (Func<CallInvoker, TService>)_proxyCache[key];
Expand All @@ -78,7 +73,7 @@ public override TService CreateClient<TService>(CallInvoker channel)

internal static class DefaultProxyCache<TService> where TService : class
{
internal static readonly Func<CallInvoker, TService> Create = ProxyEmitter.CreateFactory<TService>(BinderConfiguration.Default);
internal static readonly Func<CallInvoker, TService> Create = ProxyEmitter.CreateFactory<TService>(BinderConfiguration.Default, null);
}

private sealed class DefaultClientFactory : ClientFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ protected internal override Marshaller<T> CreateMarshaller<T>()
parser.ParseFrom(context.PayloadAsReadOnlySequence()
*/
var context = Expression.Parameter(typeof(global::Grpc.Core.DeserializationContext), "context");
var parseFrom = parser.PropertyType.GetMethod("ParseFrom", new Type[] { typeof(ReadOnlySequence<byte>) })!;
var parseFrom = parser.PropertyType.GetMethod("ParseFrom", [typeof(ReadOnlySequence<byte>)])!;
Expression body = Expression.Call(Expression.Constant(parser.GetValue(null), parser.PropertyType),
parseFrom, Expression.Call(context, nameof(DeserializationContext.PayloadAsReadOnlySequence), Type.EmptyTypes));
deserializer = Expression.Lambda<Func<DeserializationContext, T>>(body, context).Compile();

var message = Expression.Parameter(typeof(T), "message");
context = Expression.Parameter(typeof(global::Grpc.Core.SerializationContext), "context");
var setPayloadLength = typeof(global::Grpc.Core.SerializationContext).GetMethod(nameof(global::Grpc.Core.SerializationContext.SetPayloadLength), new Type[] { typeof(int) })!;
var setPayloadLength = typeof(global::Grpc.Core.SerializationContext).GetMethod(nameof(global::Grpc.Core.SerializationContext.SetPayloadLength), [typeof(int)])!;
var calculateSize = iMessage.GetMethod("CalculateSize", Type.EmptyTypes)!;
var writeTo = me.GetMethod("WriteTo", new Type[] { iMessage, typeof(IBufferWriter<byte>) })!;
var writeTo = me.GetMethod("WriteTo", [iMessage, typeof(IBufferWriter<byte>)])!;
body = Expression.Block(
Expression.Call(context, setPayloadLength, Expression.Call(message, calculateSize)),
Expression.Call(writeTo, message, Expression.Call(context, "GetBufferWriter", Type.EmptyTypes)),
Expand All @@ -92,16 +92,16 @@ protected internal override Marshaller<T> CreateMarshaller<T>()
*/

var context = Expression.Parameter(typeof(global::Grpc.Core.DeserializationContext), "context");
var parseFrom = parser.PropertyType.GetMethod("ParseFrom", new Type[] { typeof(byte[]) })!;
var parseFrom = parser.PropertyType.GetMethod("ParseFrom", [typeof(byte[])])!;
Expression body = Expression.Call(Expression.Constant(parser.GetValue(null), parser.PropertyType),
parseFrom, Expression.Call(context, nameof(DeserializationContext.PayloadAsNewBuffer), Type.EmptyTypes));
deserializer = Expression.Lambda<Func<DeserializationContext, T>>(body, context).Compile();

var message = Expression.Parameter(typeof(T), "message");
context = Expression.Parameter(typeof(global::Grpc.Core.SerializationContext), "context");
var toByteArray = me.GetMethod("ToByteArray", new Type[] { iMessage })!;
var toByteArray = me.GetMethod("ToByteArray", [iMessage])!;
var complete = typeof(global::Grpc.Core.SerializationContext).GetMethod(
nameof(global::Grpc.Core.SerializationContext.Complete), new Type[] { typeof(byte[]) })!;
nameof(global::Grpc.Core.SerializationContext.Complete), [typeof(byte[])])!;
body = Expression.Call(context, complete, Expression.Call(toByteArray, message));
serializer = Expression.Lambda<Action<T, global::Grpc.Core.SerializationContext>>(body, message, context).Compile();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public enum Options
// note: these are the same *object*, but pre-checked for optional API support, for efficiency
// (the minimum .NET object size means that the extra fields don't cost anything)
private readonly IMeasuredProtoOutput<IBufferWriter<byte>>? _measuredWriterModel;
#pragma warning disable CA1859 // change type of field for performance - but actually this is a speculative test
private readonly IProtoInput<ReadOnlySequence<byte>>? _squenceReaderModel;
#pragma warning restore CA1859

/// <summary>
/// Create a new factory using a specific protobuf-net model
Expand Down
4 changes: 2 additions & 2 deletions src/protobuf-net.Grpc/Configuration/ServerBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public int Bind(object state, Type serviceType, BinderConfiguration? binderConfi
{
int totalCount = 0;
object?[]? argsBuffer = null;
Type[] typesBuffer = Array.Empty<Type>();
Type[] typesBuffer = [];
binderConfiguration ??= BinderConfiguration.Default;
var potentialServiceContracts = typeof(IGrpcService).IsAssignableFrom(serviceType)
? new HashSet<Type> {serviceType}
Expand Down Expand Up @@ -92,7 +92,7 @@ bool AddMethod(string? serviceName, Type @in, Type @out, string on, MethodInfo m
{
if (typesBuffer.Length == 0)
{
typesBuffer = new Type[] {serviceType, typeof(void), typeof(void)};
typesBuffer = [serviceType, typeof(void), typeof(void)];
}

typesBuffer[1] = @in;
Expand Down
Loading

0 comments on commit d90ecd4

Please sign in to comment.