Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change tags for better data support #3

Merged
merged 7 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
builder.UseEnvironment("IntegrationTests");
builder.ConfigureServices(services =>
{
//services.AddApplicationInsightsTelemetry("YOUR-KEY-FROM-ADP-MESSAGING-TEAM");

Check warning on line 47 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Remove this commented out code. (https://rules.sonarsource.com/csharp/RSPEC-125)
var sp = services.BuildServiceProvider();
var telemetryConfiguration = sp.GetService<TelemetryConfiguration>();
var tchan = new TestChannel(dataStore);
telemetryConfiguration.TelemetryChannel = tchan;

Check warning on line 51 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Dereference of a possibly null reference.
services.AddSingleton<ITelemetryChannel>(tchan);
});
});
Expand All @@ -60,13 +60,13 @@
var client = _factory.CreateClient();
var result = await client.GetAsync("/");
result.IsSuccessStatusCode.ShouldBeTrue();
Thread.Sleep(5000);

Check warning on line 63 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Prevent the use of Thread.Sleep (https://agoda-com.github.io/standards-c-sharp/async/avoid-blocking.html)

Check warning on line 63 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Do not use 'Thread.Sleep()' in a test. (https://rules.sonarsource.com/csharp/RSPEC-2925)
dataStore.Count.ShouldBe(7);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);

Check warning on line 66 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyKind' is obsolete: 'Use Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);

Check warning on line 67 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyTypeName' is obsolete: 'Renamed to Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);

Check warning on line 69 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyKind' is obsolete: 'Use Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);

Check warning on line 70 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyTypeName' is obsolete: 'Renamed to Type'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public class AppInsightsConsumerEventsHandler
{
public static Task OnConsumeStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
internal static Task OnConsumeStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
{
eventContextMessageContext.Items.Add("timer", Stopwatch.StartNew());
eventContextMessageContext.Items.Add("telemetryClient", telemetryClient);
Expand All @@ -22,33 +22,23 @@ public static Task OnConsumeError(IMessageContext eventContextMessageContext, Ex
{
{"topic" , eventContextMessageContext.ConsumerContext.Topic},
{"partition" , eventContextMessageContext.ConsumerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ConsumerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
telemetryClient.TrackDependency("Kafka-Consumer",
eventContextMessageContext.ConsumerContext.Topic,
eventContextMessageContext.ConsumerContext.Partition.ToString(),
eventContextMessageContext.ConsumerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "500", false);
telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Error", false, theTimer.Elapsed);
return Task.CompletedTask;
}

public static Task OnConsumeCompleted(IMessageContext eventContextMessageContext)
internal static Task OnConsumeCompleted(IMessageContext eventContextMessageContext)
{
eventContextMessageContext.Items.TryGetValue("telemetryClient", out var telemetryClientOut);
var telemetryClient = (TelemetryClient)telemetryClientOut;
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
telemetryClient.TrackDependency("Kafka-Consumer",
eventContextMessageContext.ConsumerContext.Topic,
eventContextMessageContext.ConsumerContext.Partition.ToString(),
eventContextMessageContext.ConsumerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "200", true);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Ok", true, theTimer.Elapsed);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public class AppInsightsProducerEventsHandler
{
public static Task OnProducerStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
internal static Task OnProducerStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
{
eventContextMessageContext.Items.Add("timer", Stopwatch.StartNew());
eventContextMessageContext.Items.Add("telemetryClient", telemetryClient);
Expand All @@ -22,18 +22,12 @@ public static Task OnProducerError(IMessageContext eventContextMessageContext, E
{
{"topic" , eventContextMessageContext.ProducerContext.Topic},
{"partition" , eventContextMessageContext.ProducerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ProducerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
var timeComponent = theTimer.ElapsedMilliseconds;
telemetryClient.TrackDependency("Kafka-Producer",
eventContextMessageContext.ProducerContext.Topic,
eventContextMessageContext.ProducerContext.Partition.ToString(),
eventContextMessageContext.ProducerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "500", false);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Error", false, theTimer.Elapsed);
return Task.CompletedTask;
}

Expand All @@ -44,13 +38,8 @@ public static Task OnProducerCompleted(IMessageContext eventContextMessageContex
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
var timeComponent = theTimer.ElapsedMilliseconds;
telemetryClient.TrackDependency("Kafka-Producer",
eventContextMessageContext.ProducerContext.Topic,
eventContextMessageContext.ProducerContext.Partition.ToString(),
eventContextMessageContext.ProducerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "200", true);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Ok", true, theTimer.Elapsed);
return Task.CompletedTask;
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.ApplicationInsights/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
internal static class Constants
{
internal const string DependencyType = "Kafka";
internal const string ConsumerType = "Consume";
internal const string ProducerType = "Produce";
}
42 changes: 41 additions & 1 deletion src/KafkaFlow.ApplicationInsights/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using KafkaFlow;
using System;
using KafkaFlow;
using KafkaFlow.Configuration;
using Microsoft.ApplicationInsights;

Expand Down Expand Up @@ -31,4 +32,43 @@ public static IKafkaConfigurationBuilder AddAppInsightsInstrumentation(this IKaf

return builder;
}

internal static string GetDependencyName(this IMessageContext eventContextMessageContext)
{
return eventContextMessageContext.ConsumerContext?.Topic ??
eventContextMessageContext.ProducerContext?.Topic;
}

internal static string GetData(this IMessageContext eventContextMessageContext)
{
var type = Constants.ConsumerType;
if (eventContextMessageContext.ConsumerContext == null)
{
type = Constants.ProducerType;
}

var partition = eventContextMessageContext.ConsumerContext?.Partition.ToString() ??
eventContextMessageContext.ProducerContext?.Partition.ToString();

return $"{type}/{partition}";
}

internal static string GetTarget(this IMessageContext eventContextMessageContext)
{
return string.Join(",", eventContextMessageContext.Brokers);
}

internal static void TrackKafkaDependency(this TelemetryClient telemetryClient, IMessageContext eventContextMessageContext, string resultCode,
bool success, TimeSpan elapsedTime)
{
telemetryClient.TrackDependency(Constants.DependencyType,
eventContextMessageContext.GetTarget(),
eventContextMessageContext.GetDependencyName(),
eventContextMessageContext.GetData(),
DateTimeOffset.UtcNow,
elapsedTime,
resultCode,
success);
}
}

Loading