From 14166502e6a2d9ae65ff8424034fc8f823ae7fd8 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 14 Oct 2024 14:56:52 +0100 Subject: [PATCH] add memory performance profiler test rig, and run; oops, missed one! (#342) --- docs/releasenotes.md | 6 +- .../Internal/Reshape.ByteStream.cs | 1 + toys/PlayClient/MyContracts.cs | 15 ++++- toys/PlayClient/Program.cs | 63 ++++++++++++++++++- toys/PlayServer/Program.cs | 49 +++++++++++++++ 5 files changed, 130 insertions(+), 4 deletions(-) diff --git a/docs/releasenotes.md b/docs/releasenotes.md index eb1ba07b..c261e5e4 100644 --- a/docs/releasenotes.md +++ b/docs/releasenotes.md @@ -2,7 +2,11 @@ ## unreleased -## 1.2.0 +## 1.2.2 + +- fix missing memory recycling in `Stream` scenario + +## 1.2.1 - support `[Value]Task` as a return value, rewriting via [`stream BytesValue`](https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/wrappers.proto) - first step in [#340](https://github.com/protobuf-net/protobuf-net.Grpc/issues/340) diff --git a/src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs b/src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs index 36ea33fd..0b6772ce 100644 --- a/src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs +++ b/src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs @@ -105,6 +105,7 @@ async static Task ReadByteValueSequenceToPipeWriter(AsyncServerStreamingCall TestAsync(IAsyncEnumerable request, CallContext options); + + ValueTask TestStreamAsync(TestStreamRequest request, CallContext options = default); + } + + [ProtoContract] + public class TestStreamRequest + { + [ProtoMember(1)] + public int Seed { get; set; } + + [ProtoMember(2)] + public long Length { get; set; } } } diff --git a/toys/PlayClient/Program.cs b/toys/PlayClient/Program.cs index 783b8c38..22ff86ea 100644 --- a/toys/PlayClient/Program.cs +++ b/toys/PlayClient/Program.cs @@ -16,10 +16,15 @@ static class Program { static async Task Main() { - await TestChannel(); + while (true) + { + long length = 1000000L; + int seed = 12345; + await TestStreamUnmanagedAsync(length, seed); #if HTTPCLIENT - await TestHttpClient(); + await TestStreamManagedAsync(length, seed); #endif + } } static async Task TestCalculator(ICalculator calculator, [CallerMemberName] string? caller = null) @@ -81,6 +86,60 @@ static async IAsyncEnumerable Rand(int count, TimeSpan delay, [ Console.WriteLine("[client all done sending!]"); } + +#if HTTPCLIENT + static async Task TestStreamManagedAsync(long length, int seed) + { + GrpcClientFactory.AllowUnencryptedHttp2 = true; + using var http = Grpc.Net.Client.GrpcChannel.ForAddress("http://localhost:10042"); + await TestStreamAsync(http, length, seed); + } +#endif + + static async Task TestStreamUnmanagedAsync(long length, int seed) + { + var channel = new Grpc.Core.Channel("localhost", 10042, ChannelCredentials.Insecure); + try + { + await TestStreamAsync(channel, length, seed); + } + finally + { + await channel.ShutdownAsync(); + } + } + static async Task TestStreamAsync(ChannelBase channel, long length, int seed) + { + var random = new Random(seed); + Console.WriteLine("Creating proxy..."); + var proxy = channel.CreateGrpcService(); + using var stream = await proxy.TestStreamAsync(new TestStreamRequest { Length = length, Seed = seed }); + + byte[] buffer = new byte[1024]; + long totalRead = 0; + int read; + Console.WriteLine("Initializing..."); + while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + totalRead += Check(random, new ReadOnlySpan(buffer, 0, read)); + Console.WriteLine($"Bytes communicated: {totalRead}"); + } + if (totalRead != length) + { + Throw(); + } + + static int Check(Random random, ReadOnlySpan buffer) + { + foreach (byte b in buffer) + { + if (b != random.Next(256)) Throw(); + } + return buffer.Length; + } + static void Throw() => throw new InvalidOperationException("data fail"); + } + static async Task TestChannel() { var channel = new Channel("localhost", 10042, ChannelCredentials.Insecure); diff --git a/toys/PlayServer/Program.cs b/toys/PlayServer/Program.cs index 89511981..fc2c05e2 100644 --- a/toys/PlayServer/Program.cs +++ b/toys/PlayServer/Program.cs @@ -4,6 +4,8 @@ using Shared_CS; using System; using System.Collections.Generic; +using System.IO; +using System.IO.Pipelines; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -95,4 +97,51 @@ public async IAsyncEnumerable TestAsync(IAsyncEnumerable< //static bool Always() => true; yield break; } + + public ValueTask TestStreamAsync(TestStreamRequest request, CallContext options = default) + { + Console.WriteLine("Creating pipe..."); + var pipe = new Pipe(); + _ = Task.Run(async () => + { + Exception? ex = null; + try + { + Console.WriteLine($"Starting stream of length {request.Length}..."); + long remaining = request.Length; + var rand = new Random(request.Seed); + byte[] buffer = new byte[4096]; + + while (remaining > 0) + { + int chunkLen = (int)Math.Min(remaining, buffer.Length); + var chunk = new Memory(buffer, 0, chunkLen); + Console.WriteLine($"Sending {chunkLen}..."); + Fill(rand, chunk.Span); + await pipe.Writer.WriteAsync(chunk, options.CancellationToken); + remaining -= chunkLen; + } + } + catch (Exception fault) + { + Console.WriteLine("Fault: " + fault.Message); + ex = fault; + } + finally + { + Console.WriteLine("Completing..."); + await pipe.Writer.CompleteAsync(ex); + } + + }); + return new(pipe.Reader.AsStream()); + + static void Fill(Random rand, Span buffer) + { + foreach (ref byte b in buffer) + { + b = (byte)rand.Next(0, 256); + } + } + } } \ No newline at end of file