Skip to content

Commit

Permalink
add memory performance profiler test rig, and run; oops, missed one! (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mgravell authored Oct 14, 2024
1 parent d90ecd4 commit 1416650
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 4 deletions.
6 changes: 5 additions & 1 deletion docs/releasenotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

## unreleased

## 1.2.0
## 1.2.2

- fix missing memory recycling in `Stream` scenario

## 1.2.1

- support `[Value]Task<Stream>` as a return value, rewriting via [`stream BytesValue`](https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/wrappers.proto) - first
step in [#340](https://github.com/protobuf-net/protobuf-net.Grpc/issues/340)
Expand Down
1 change: 1 addition & 0 deletions src/protobuf-net.Grpc/Internal/Reshape.ByteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async static Task ReadByteValueSequenceToPipeWriter(AsyncServerStreamingCall<Byt
var chunk = source.Current;
var result = await destination.WriteAsync(chunk.Memory, cancellationToken).ConfigureAwait(false);
actualLength += chunk.Length;
chunk.Recycle();

if (result.IsCanceled)
{
Expand Down
15 changes: 14 additions & 1 deletion toys/PlayClient/MyContracts.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using Grpc.Core;
using ProtoBuf;
using ProtoBuf.Grpc;
using ProtoBuf.Grpc.Configuration;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Threading.Tasks;
Expand Down Expand Up @@ -60,5 +61,17 @@ public class BidiStreamingResponse
public interface IBidiStreamingService
{
IAsyncEnumerable<BidiStreamingResponse> TestAsync(IAsyncEnumerable<BidiStreamingRequest> request, CallContext options);

ValueTask<Stream> TestStreamAsync(TestStreamRequest request, CallContext options = default);
}

[ProtoContract]
public class TestStreamRequest
{
[ProtoMember(1)]
public int Seed { get; set; }

[ProtoMember(2)]
public long Length { get; set; }
}
}
63 changes: 61 additions & 2 deletions toys/PlayClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ static class Program
{
static async Task Main()
{
await TestChannel();
while (true)
{
long length = 1000000L;
int seed = 12345;
await TestStreamUnmanagedAsync(length, seed);
#if HTTPCLIENT
await TestHttpClient();
await TestStreamManagedAsync(length, seed);
#endif
}
}

static async Task TestCalculator(ICalculator calculator, [CallerMemberName] string? caller = null)
Expand Down Expand Up @@ -81,6 +86,60 @@ static async IAsyncEnumerable<MultiplyRequest> Rand(int count, TimeSpan delay, [
Console.WriteLine("[client all done sending!]");
}


#if HTTPCLIENT
static async Task TestStreamManagedAsync(long length, int seed)
{
GrpcClientFactory.AllowUnencryptedHttp2 = true;
using var http = Grpc.Net.Client.GrpcChannel.ForAddress("http://localhost:10042");
await TestStreamAsync(http, length, seed);
}
#endif

static async Task TestStreamUnmanagedAsync(long length, int seed)
{
var channel = new Grpc.Core.Channel("localhost", 10042, ChannelCredentials.Insecure);
try
{
await TestStreamAsync(channel, length, seed);
}
finally
{
await channel.ShutdownAsync();
}
}
static async Task TestStreamAsync(ChannelBase channel, long length, int seed)
{
var random = new Random(seed);
Console.WriteLine("Creating proxy...");
var proxy = channel.CreateGrpcService<IBidiStreamingService>();
using var stream = await proxy.TestStreamAsync(new TestStreamRequest { Length = length, Seed = seed });

byte[] buffer = new byte[1024];
long totalRead = 0;
int read;
Console.WriteLine("Initializing...");
while ((read = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
totalRead += Check(random, new ReadOnlySpan<byte>(buffer, 0, read));
Console.WriteLine($"Bytes communicated: {totalRead}");
}
if (totalRead != length)
{
Throw();
}

static int Check(Random random, ReadOnlySpan<byte> buffer)
{
foreach (byte b in buffer)
{
if (b != random.Next(256)) Throw();
}
return buffer.Length;
}
static void Throw() => throw new InvalidOperationException("data fail");
}

static async Task TestChannel()
{
var channel = new Channel("localhost", 10042, ChannelCredentials.Insecure);
Expand Down
49 changes: 49 additions & 0 deletions toys/PlayServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using Shared_CS;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -95,4 +97,51 @@ public async IAsyncEnumerable<BidiStreamingResponse> TestAsync(IAsyncEnumerable<
//static bool Always() => true;
yield break;
}

public ValueTask<Stream> TestStreamAsync(TestStreamRequest request, CallContext options = default)
{
Console.WriteLine("Creating pipe...");
var pipe = new Pipe();
_ = Task.Run(async () =>
{
Exception? ex = null;
try
{
Console.WriteLine($"Starting stream of length {request.Length}...");
long remaining = request.Length;
var rand = new Random(request.Seed);
byte[] buffer = new byte[4096];

while (remaining > 0)
{
int chunkLen = (int)Math.Min(remaining, buffer.Length);
var chunk = new Memory<byte>(buffer, 0, chunkLen);
Console.WriteLine($"Sending {chunkLen}...");
Fill(rand, chunk.Span);
await pipe.Writer.WriteAsync(chunk, options.CancellationToken);
remaining -= chunkLen;
}
}
catch (Exception fault)
{
Console.WriteLine("Fault: " + fault.Message);
ex = fault;
}
finally
{
Console.WriteLine("Completing...");
await pipe.Writer.CompleteAsync(ex);
}

});
return new(pipe.Reader.AsStream());

static void Fill(Random rand, Span<byte> buffer)
{
foreach (ref byte b in buffer)
{
b = (byte)rand.Next(0, 256);
}
}
}
}

0 comments on commit 1416650

Please sign in to comment.