Skip to content

Commit

Permalink
fix(tsc): fix record body bug in tracing (#538)
Browse files Browse the repository at this point in the history
* fix: fix large stream async read error ;
     add trace body max size , if body size large than num , will write body to log

* chore: update

* chore: update param name

* chore: update

* chore: update

* fix: add white space repalce, update code

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update
  • Loading branch information
Qinyouzeng authored Apr 7, 2023
1 parent a3873a9 commit ad165fb
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,58 @@
// Licensed under the Apache License. See LICENSE.txt in the project root for license information.

// ReSharper disable once CheckNamespace

namespace System.IO;

internal static class StreamExtensions
{
private static readonly Encoding _defaultEncoding = Encoding.UTF8;

public static async Task<string?> ReadAsStringAsync(this Stream stream, Encoding? encoding = null,int bufferSize=1024)
public static async Task<(long, string?)> ReadAsStringAsync(this Stream stream, Encoding? encoding = null, int bufferSize = 4096)
{
if (stream == null)
return null;

if (!stream.CanRead)
return "cann't read";
if (stream == null || !stream.CanRead || !stream.CanSeek)
return (-1, null);

if (!stream.CanSeek)
return "cann't seek";
var start = stream.Position;

var start = (int)stream.Position;
List<byte> data = new();
var buffer = new byte[bufferSize];
do
try
{
var count = await stream.ReadAsync(buffer, 0, buffer.Length);
if (count <= 0)
break;
if (buffer.Length - count == 0)
List<byte> data = new();
var buffer = new byte[bufferSize];
stream.Seek(0, SeekOrigin.Begin);

do
{
var count = await stream.ReadAsync(buffer.AsMemory(0, bufferSize));
if (count <= 0)
break;

if (bufferSize - count > 0)
{
data.AddRange(buffer[0..count]);
break;
}

data.AddRange(buffer);
}
else
} while (true);

if (data.Count > 0)
{
data.AddRange(buffer[0..count]);
break;
}
} while (true);
if (data.Count - OpenTelemetryInstrumentationOptions.MaxBodySize > 0)
return (data.Count, Convert.ToBase64String(data.ToArray()));

if (data.Count > 0)
return (data.Count, (encoding ?? _defaultEncoding).GetString(data.ToArray()));
}
}
catch (Exception ex)
{
OpenTelemetryInstrumentationOptions.Logger?.LogError(ex, "ReadAsStringAsync Error");
}
finally
{
stream.Seek(start, SeekOrigin.Begin);
return (encoding ?? _defaultEncoding).GetString(data.ToArray());
}

return null;
return (-1, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace System.Diagnostics;

public static class ActivityExtension
{
public static async Task<Activity> AddMasaSupplement(this Activity activity, HttpRequest httpRequest)
public static Activity AddMasaSupplement(this Activity activity, HttpRequest httpRequest)
{
activity.SetTag(OpenTelemetryAttributeName.Http.FLAVOR, httpRequest.Protocol);
activity.SetTag(OpenTelemetryAttributeName.Http.SCHEME, httpRequest.Scheme);
Expand All @@ -16,26 +16,24 @@ public static async Task<Activity> AddMasaSupplement(this Activity activity, Htt
{
if (!httpRequest.Body.CanSeek)
httpRequest.EnableBuffering();
activity.SetTag(OpenTelemetryAttributeName.Http.REQUEST_CONTENT_BODY, await httpRequest.Body.ReadAsStringAsync(GetHttpRequestEncoding(httpRequest)));
SetActivityBody(activity, httpRequest.Body, GetHttpRequestEncoding(httpRequest)).ConfigureAwait(false).GetAwaiter().GetResult();
}
activity.SetTag(OpenTelemetryAttributeName.Host.NAME, Dns.GetHostName());

return activity;
}

public static async Task<Activity> AddMasaSupplement(this Activity activity, HttpRequestMessage httpRequest)
public static Activity AddMasaSupplement(this Activity activity, HttpRequestMessage httpRequest)
{
activity.SetTag(OpenTelemetryAttributeName.Http.SCHEME, httpRequest.RequestUri?.Scheme);
activity.SetTag(OpenTelemetryAttributeName.Host.NAME, Dns.GetHostName());

if (httpRequest.Content is not null)
{
var st = await httpRequest.Content.ReadAsStreamAsync();
activity.SetTag(OpenTelemetryAttributeName.Http.REQUEST_CONTENT_BODY, await st.ReadAsStringAsync(GetHttpRequestMessageEncoding(httpRequest)));
SetActivityBody(activity, httpRequest.Content.ReadAsStream(), GetHttpRequestMessageEncoding(httpRequest)).ConfigureAwait(false).GetAwaiter().GetResult();
}

return activity;
}
}

public static Activity AddMasaSupplement(this Activity activity, HttpResponse httpResponse)
{
Expand Down Expand Up @@ -87,4 +85,20 @@ public static Activity AddMasaSupplement(this Activity activity, HttpResponseMes

return null;
}

private static async Task SetActivityBody(Activity activity, Stream inputSteam, Encoding? encoding = null)
{
(long length, string? body) = await inputSteam.ReadAsStringAsync(encoding);

if (length <= 0)
return;
if (length - OpenTelemetryInstrumentationOptions.MaxBodySize > 0)
{
OpenTelemetryInstrumentationOptions.Logger?.LogInformation("Request body in base64 encode: {Body}", body);
}
else
{
activity.SetTag(OpenTelemetryAttributeName.Http.REQUEST_CONTENT_BODY, body);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Masa.Contrib.StackSdks.Tsc.Tracing.Handler;

public class AspNetCoreInstrumentationHandler : ExceptionHandler
{
public virtual async void OnHttpRequest(Activity activity, HttpRequest httpRequest)
public virtual void OnHttpRequest(Activity activity, HttpRequest httpRequest)
{
await activity.AddMasaSupplement(httpRequest);
activity.AddMasaSupplement(httpRequest);
HttpMetricProviders.AddHttpRequestMetric(httpRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Masa.Contrib.StackSdks.Tsc.Tracing.Handler;

public class HttpClientInstrumentHandler : ExceptionHandler
{
public virtual async void OnHttpRequestMessage(Activity activity, HttpRequestMessage httpRequestMessage)
public virtual void OnHttpRequestMessage(Activity activity, HttpRequestMessage httpRequestMessage)
{
await activity.AddMasaSupplement(httpRequestMessage);
activity.AddMasaSupplement(httpRequestMessage);
HttpMetricProviders.AddHttpRequestMessageMetric(httpRequestMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static IServiceCollection AddMasaTracing(this IServiceCollection services
services.AddOpenTelemetry().WithTracing(builder =>
{
builder.SetSampler(new AlwaysOnSampler());
var option = new OpenTelemetryInstrumentationOptions();
var option = new OpenTelemetryInstrumentationOptions(services.BuildServiceProvider());
if (configure != null)
configure.Invoke(option);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ namespace Microsoft.Extensions.DependencyInjection;

public class OpenTelemetryInstrumentationOptions
{
public OpenTelemetryInstrumentationOptions(IServiceProvider serviceProvider)
{
if (Logger != null)
Logger = serviceProvider.GetRequiredService<ILogger<OpenTelemetryInstrumentationOptions>>();
}

private readonly static AspNetCoreInstrumentationHandler aspNetCoreInstrumentationHandler = new();
private readonly static HttpClientInstrumentHandler httpClientInstrumentHandler = new();
internal static ILogger Logger { get; private set; }
internal static long MaxBodySize { get; private set; } = 200 * 1 << 10;

/// <summary>
/// Default record all data. You can replace it or set null
Expand Down Expand Up @@ -51,4 +59,37 @@ public class OpenTelemetryInstrumentationOptions
/// Build trace callback, allow to supplement the build process
/// </summary>
public Action<TracerProviderBuilder> BuildTraceCallback { get; set; }

public static void SetMaxBodySize(string maxValue)
{
var regex = new Regex(@"\s+", RegexOptions.None, TimeSpan.FromSeconds(1));
if (maxValue is not null)
maxValue = regex.Replace(maxValue, "");

if (string.IsNullOrEmpty(maxValue))
return;
var unit = maxValue[^1];
var isNum = int.TryParse(maxValue[..(maxValue.Length - 1)], out int num);
if (!isNum || num <= 0) return;
switch (unit)
{
case 'k':
case 'K':
MaxBodySize = num * 1 << 10;
break;
case 'm':
case 'M':
MaxBodySize = num * 1 << 20;
break;
default:
MaxBodySize = num;
break;
}
}

public static void SetMaxBodySize(long maxByteValue)
{
if (maxByteValue > 0)
MaxBodySize = maxByteValue;
}
}
2 changes: 2 additions & 0 deletions src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc/_Imports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
global using Masa.Contrib.StackSdks.Tsc.Tracing.Handler;
global using Microsoft.AspNetCore.Http;
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Logging;
global using OpenTelemetry;
global using OpenTelemetry.Contrib.Instrumentation.ElasticsearchClient;
Expand All @@ -34,3 +35,4 @@
global using System.Runtime.CompilerServices;
global using System.Text;
global using System.Text.Json;
global using System.Text.RegularExpressions;
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public async Task EncodingTest(string text, string? charCode = default)

using var ms = new MemoryStream(bytes);

var str = await ms.ReadAsStringAsync(encoding: coding, bufferSize: 16);
(long _, string? str) = await ms.ReadAsStringAsync(encoding: coding, bufferSize: 16);

Debug.WriteLine(str);

if (string.IsNullOrEmpty(text))
Assert.IsNull(str);
Expand All @@ -43,7 +45,7 @@ public async Task EncodingTest(string text, string? charCode = default)
[TestMethod]
public async Task NullStreamReadTest()
{
var text = await default(Stream)!.ReadAsStringAsync();
(long _, string? text) = await default(Stream)!.ReadAsStringAsync();
Assert.IsNull(text);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public void Initialize()
}

[TestMethod]
public async Task HttpRequestMessageAddTagsTest()
public void HttpRequestMessageAddTagsTest()
{
HttpRequestMessage request = new();
request.Method = HttpMethod.Post;
Expand All @@ -22,7 +22,7 @@ public async Task HttpRequestMessageAddTagsTest()
request.RequestUri = new Uri("http://localhost");

var activity = new Activity("tets");
await activity.AddMasaSupplement(request);
activity.AddMasaSupplement(request);
Assert.AreEqual(body, activity.GetTagItem(OpenTelemetryAttributeName.Http.REQUEST_CONTENT_BODY) as string);
}

Expand All @@ -31,7 +31,7 @@ public void HttpResponseMessageAddTagsTest()
{
HttpResponseMessage response = new()
{
StatusCode = System.Net.HttpStatusCode.OK,
StatusCode = HttpStatusCode.OK,
Content = new StringContent("OK")
};

Expand All @@ -41,7 +41,7 @@ public void HttpResponseMessageAddTagsTest()
}

[TestMethod]
public async Task HttpRequestAddTagsTest()
public void HttpRequestAddTagsTest()
{
Mock<HttpRequest> mock = new();
mock.Setup(request => request.Method).Returns("post");
Expand All @@ -59,7 +59,7 @@ public async Task HttpRequestAddTagsTest()
mock.Setup(request => request.ContentLength).Returns(ms.Length);

var activity = new Activity("tets");
await activity.AddMasaSupplement(mock.Object);
activity.AddMasaSupplement(mock.Object);
Assert.IsNotNull(activity);
Assert.AreEqual("http", activity.GetTagItem(OpenTelemetryAttributeName.Http.SCHEME) as string);
Assert.AreEqual("http1.1", activity.GetTagItem(OpenTelemetryAttributeName.Http.FLAVOR) as string);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void MasaTraceTest()
if (uri != null)
options.Endpoint = uri;
});
builder.AddInMemoryExporter(logRecords);
builder.AddInMemoryExporter(logRecords);
isConfigureCalled = true;
});
});
Expand Down

0 comments on commit ad165fb

Please sign in to comment.