Skip to content

Commit

Permalink
change tags for better data support (#3)
Browse files Browse the repository at this point in the history
* change tags for better data support

* fix assertions

* fix mappings

* fix mapping

* fix message
  • Loading branch information
dicko2 authored Dec 22, 2023
1 parent 95a514a commit 4052124
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public async Task WhenSendPostToHealthCheckEndpoint_ShouldReturnOkResponse()
result.IsSuccessStatusCode.ShouldBeTrue();
Thread.Sleep(5000);

Check warning on line 63 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Prevent the use of Thread.Sleep (https://agoda-com.github.io/standards-c-sharp/async/avoid-blocking.html)

Check warning on line 63 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

Do not use 'Thread.Sleep()' in a test. (https://rules.sonarsource.com/csharp/RSPEC-2925)
dataStore.Count.ShouldBe(7);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka-Producer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka-Consumer").ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);

Check warning on line 66 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyKind' is obsolete: 'Use Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Produce")).ShouldBe(3);

Check warning on line 67 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyTypeName' is obsolete: 'Renamed to Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).Type == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyKind == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);

Check warning on line 69 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyKind' is obsolete: 'Use Type'
dataStore.Where(y => y is DependencyTelemetry).Count(x => ((DependencyTelemetry)x).DependencyTypeName == "Kafka" && ((DependencyTelemetry)x).Data.StartsWith("Consume")).ShouldBe(3);

Check warning on line 70 in src/KafkaFlow.ApplicationInsights.Tests/MainIntegrationTestFixture.cs

View workflow job for this annotation

GitHub Actions / Build Package

'DependencyTelemetry.DependencyTypeName' is obsolete: 'Renamed to Type'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public class AppInsightsConsumerEventsHandler
{
public static Task OnConsumeStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
internal static Task OnConsumeStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
{
eventContextMessageContext.Items.Add("timer", Stopwatch.StartNew());
eventContextMessageContext.Items.Add("telemetryClient", telemetryClient);
Expand All @@ -22,33 +22,23 @@ public static Task OnConsumeError(IMessageContext eventContextMessageContext, Ex
{
{"topic" , eventContextMessageContext.ConsumerContext.Topic},
{"partition" , eventContextMessageContext.ConsumerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ConsumerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
telemetryClient.TrackDependency("Kafka-Consumer",
eventContextMessageContext.ConsumerContext.Topic,
eventContextMessageContext.ConsumerContext.Partition.ToString(),
eventContextMessageContext.ConsumerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "500", false);
telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Error", false, theTimer.Elapsed);
return Task.CompletedTask;
}

public static Task OnConsumeCompleted(IMessageContext eventContextMessageContext)
internal static Task OnConsumeCompleted(IMessageContext eventContextMessageContext)
{
eventContextMessageContext.Items.TryGetValue("telemetryClient", out var telemetryClientOut);
var telemetryClient = (TelemetryClient)telemetryClientOut;
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
telemetryClient.TrackDependency("Kafka-Consumer",
eventContextMessageContext.ConsumerContext.Topic,
eventContextMessageContext.ConsumerContext.Partition.ToString(),
eventContextMessageContext.ConsumerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "200", true);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Ok", true, theTimer.Elapsed);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public class AppInsightsProducerEventsHandler
{
public static Task OnProducerStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
internal static Task OnProducerStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
{
eventContextMessageContext.Items.Add("timer", Stopwatch.StartNew());
eventContextMessageContext.Items.Add("telemetryClient", telemetryClient);
Expand All @@ -22,18 +22,12 @@ public static Task OnProducerError(IMessageContext eventContextMessageContext, E
{
{"topic" , eventContextMessageContext.ProducerContext.Topic},
{"partition" , eventContextMessageContext.ProducerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ProducerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
var timeComponent = theTimer.ElapsedMilliseconds;
telemetryClient.TrackDependency("Kafka-Producer",
eventContextMessageContext.ProducerContext.Topic,
eventContextMessageContext.ProducerContext.Partition.ToString(),
eventContextMessageContext.ProducerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "500", false);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Error", false, theTimer.Elapsed);
return Task.CompletedTask;
}

Expand All @@ -44,13 +38,8 @@ public static Task OnProducerCompleted(IMessageContext eventContextMessageContex
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
var timeComponent = theTimer.ElapsedMilliseconds;
telemetryClient.TrackDependency("Kafka-Producer",
eventContextMessageContext.ProducerContext.Topic,
eventContextMessageContext.ProducerContext.Partition.ToString(),
eventContextMessageContext.ProducerContext.Offset.ToString(),
DateTimeOffset.UtcNow,
theTimer.Elapsed, "200", true);

telemetryClient.TrackKafkaDependency(eventContextMessageContext, "Ok", true, theTimer.Elapsed);
return Task.CompletedTask;
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.ApplicationInsights/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
internal static class Constants
{
internal const string DependencyType = "Kafka";
internal const string ConsumerType = "Consume";
internal const string ProducerType = "Produce";
}
42 changes: 41 additions & 1 deletion src/KafkaFlow.ApplicationInsights/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using KafkaFlow;
using System;
using KafkaFlow;
using KafkaFlow.Configuration;
using Microsoft.ApplicationInsights;

Expand Down Expand Up @@ -31,4 +32,43 @@ public static IKafkaConfigurationBuilder AddAppInsightsInstrumentation(this IKaf

return builder;
}

internal static string GetDependencyName(this IMessageContext eventContextMessageContext)
{
return eventContextMessageContext.ConsumerContext?.Topic ??
eventContextMessageContext.ProducerContext?.Topic;
}

internal static string GetData(this IMessageContext eventContextMessageContext)
{
var type = Constants.ConsumerType;
if (eventContextMessageContext.ConsumerContext == null)
{
type = Constants.ProducerType;
}

var partition = eventContextMessageContext.ConsumerContext?.Partition.ToString() ??
eventContextMessageContext.ProducerContext?.Partition.ToString();

return $"{type}/{partition}";
}

internal static string GetTarget(this IMessageContext eventContextMessageContext)
{
return string.Join(",", eventContextMessageContext.Brokers);
}

internal static void TrackKafkaDependency(this TelemetryClient telemetryClient, IMessageContext eventContextMessageContext, string resultCode,
bool success, TimeSpan elapsedTime)
{
telemetryClient.TrackDependency(Constants.DependencyType,
eventContextMessageContext.GetTarget(),
eventContextMessageContext.GetDependencyName(),
eventContextMessageContext.GetData(),
DateTimeOffset.UtcNow,
elapsedTime,
resultCode,
success);
}
}

0 comments on commit 4052124

Please sign in to comment.