Skip to content

Commit

Permalink
Activity source (#2119)
Browse files Browse the repository at this point in the history
* Track Spawning
* Track EventStream Publish
* Track EventStream Subscription
  • Loading branch information
rogeralsing authored Apr 19, 2024
1 parent 22d88fd commit d300571
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 171 deletions.
3 changes: 3 additions & 0 deletions src/Proto.Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand All @@ -31,6 +32,8 @@ public sealed class ActorSystem : IAsyncDisposable
#pragma warning restore CS0618 // Type or member is obsolete
private string _host = NoHost;
private int _port;

public static readonly ActivitySource ActivitySource = new("Proto.Actor");

public ActorSystem() : this(new ActorSystemConfig())
{
Expand Down
17 changes: 11 additions & 6 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ namespace Proto.Context;

public class ActorContext : IMessageInvoker, IContext, ISupervisor
{
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<ActorContext>();
#pragma warning restore CS0618 // Type or member is obsolete
private static readonly ImmutableHashSet<PID> EmptyChildren = ImmutableHashSet<PID>.Empty;
private readonly IMailbox _mailbox;
private readonly KeyValuePair<string, object?>[] _metricTags = Array.Empty<KeyValuePair<string, object?>>();
Expand All @@ -32,13 +34,13 @@ public class ActorContext : IMessageInvoker, IContext, ISupervisor
private object? _messageOrEnvelope;
private ContextState _state;

private ShouldThrottle shouldThrottleStartLogs = Throttle.Create(1000,TimeSpan.FromSeconds(1), droppedLogs =>
private readonly ShouldThrottle _shouldThrottleStartLogs = Throttle.Create(1000,TimeSpan.FromSeconds(1), droppedLogs =>
{
Logger.LogInformation("[ActorContext] Throttled {LogCount} logs", droppedLogs);
} );


public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMailbox mailbox)

private ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMailbox mailbox)
{
System = system;
_props = props;
Expand All @@ -50,6 +52,9 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai
Self = self;

Actor = IncarnateActor();
using var publishActivity = ActorSystem.ActivitySource.StartActivity($"Spawn {self} {Actor.GetType().Name}");
publishActivity?.AddTag(ProtoTags.ActorType, Actor.GetType().Name);
publishActivity?.AddTag(ProtoTags.ActionType, "Spawn");

if (System.Metrics.Enabled)
{
Expand Down Expand Up @@ -199,7 +204,7 @@ public void Forward(PID target)

public void Request(PID target, object message, PID? sender)
{
var messageEnvelope = MessageEnvelope.WithSender(message, sender);
var messageEnvelope = MessageEnvelope.WithSender(message, sender!);
SendUserMessage(target, messageEnvelope);
}

Expand Down Expand Up @@ -390,7 +395,7 @@ public Task InvokeSystemMessageAsync(SystemMessage msg)
{
return msg switch
{
Started s => HandleStartedAsync(),
Started => HandleStartedAsync(),
Stop _ => HandleStopAsync(),
Terminated t => HandleTerminatedAsync(t),
Watch w => HandleWatch(w),
Expand Down Expand Up @@ -428,7 +433,7 @@ async Task Await()
sw.Stop();
if (sw.Elapsed > _props.StartDeadline)
{
if (shouldThrottleStartLogs().IsOpen())
if (_shouldThrottleStartLogs().IsOpen())
{
Logger.LogCritical(
"Actor {Self} took too long to start, deadline is {Deadline}, actual start time is {ActualStart}, your system might suffer from incorrect design, please consider reaching out to https://proto.actor/docs/training/ for help",
Expand Down
61 changes: 33 additions & 28 deletions src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand All @@ -24,7 +27,9 @@ namespace Proto;
[PublicAPI]
public class EventStream : EventStream<object>
{
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<EventStream>();
#pragma warning restore CS0618 // Type or member is obsolete

internal EventStream(ActorSystem system)
{
Expand Down Expand Up @@ -82,8 +87,9 @@ internal EventStream()
/// </summary>
/// <param name="action">Synchronous message handler</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <param name="caller">passed by the compiler</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispatcher = null)
public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispatcher = null, [CallerMemberName] string? caller = null)
{
var sub = new EventStreamSubscription<T>(
this,
Expand All @@ -94,7 +100,7 @@ public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispa

return Task.CompletedTask;
}
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -107,13 +113,13 @@ public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispa
/// <param name="channel">a Channel which receives the event</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dispatcher = null)
public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dispatcher = null, [CallerMemberName] string? caller = null)
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); }
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -126,7 +132,7 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis
/// <param name="channel">a Channel which receives the event</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(Channel<TMsg> channel, IDispatcher? dispatcher = null) where TMsg:T
public EventStreamSubscription<T> Subscribe<TMsg>(Channel<TMsg> channel, IDispatcher? dispatcher = null, [CallerMemberName] string? caller = null) where TMsg:T
{
var sub = new EventStreamSubscription<T>(
this,
Expand All @@ -138,34 +144,20 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Channel<TMsg> channel, IDispat
await channel.Writer.WriteAsync(tc).ConfigureAwait(false);
}
}
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

return sub;
}

/// <summary>
/// Subscribe to messages with an asynchronous handler
/// </summary>
/// <param name="action">Asynchronous message handler</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe(Func<T, Task> action, IDispatcher? dispatcher = null)
{
var sub = new EventStreamSubscription<T>(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action);
_subscriptions.TryAdd(sub.Id, sub);

return sub;
}

/// <summary>
/// Subscribe to a message type, which is a derived type from <see cref="T" />
/// </summary>
/// <param name="action">Synchronous message handler</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatcher? dispatcher = null)
public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatcher? dispatcher = null, [CallerMemberName] string? caller = null)
where TMsg : T
{
var sub = new EventStreamSubscription<T>(
Expand All @@ -180,7 +172,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatch

return Task.CompletedTask;
}
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -198,7 +190,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
Func<TMsg, bool> predicate,
Action<TMsg> action,
IDispatcher? dispatcher = null
) where TMsg : T
, [CallerMemberName] string? caller = null) where TMsg : T
{
var sub = new EventStreamSubscription<T>(
this,
Expand All @@ -212,7 +204,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(

return Task.CompletedTask;
}
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -227,6 +219,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params PID[] pids) where TMsg : T
{
var caller = pids.First().ToDiagnosticString();
var sub = new EventStreamSubscription<T>(
this,
Dispatchers.SynchronousDispatcher,
Expand All @@ -242,7 +235,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params

return Task.CompletedTask;
}
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -254,15 +247,16 @@ public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params
/// </summary>
/// <param name="action">Asynchronous message handler</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <param name="caller"></param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDispatcher? dispatcher = null)
public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDispatcher? dispatcher = null, [CallerMemberName] string? caller = null)
where TMsg : T
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg => msg is TMsg typed ? action(typed) : Task.CompletedTask
);
,caller ?? "Unknown");

_subscriptions.TryAdd(sub.Id, sub);

Expand All @@ -275,13 +269,21 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDisp
/// <param name="msg">A message to publish</param>
public void Publish(T msg)
{
var parent = Activity.Current;
using var publishActivity = ActorSystem.ActivitySource.StartActivity($"{nameof(EventStream)} {msg?.GetType().Name??"null"}",ActivityKind.Internal,parent?.Id);
publishActivity?.AddTag(ProtoTags.MessageType, msg?.GetType().Name??"null");

foreach (var sub in _subscriptions.Values)
{
sub.Dispatcher.Schedule(
() =>
{
try
{
using var subscriberActivity = ActorSystem.ActivitySource.StartActivity($"Subscriber {msg?.GetType().Name??"null"} {sub.Name}",ActivityKind.Internal,publishActivity?.Id);
subscriberActivity?
.AddTag(ProtoTags.MessageType, msg?.GetType().Name??"null")
.AddTag(ProtoTags.EventSubscriber, sub.Name);
sub.Action(msg);
}
catch (Exception ex)
Expand Down Expand Up @@ -319,14 +321,17 @@ public class EventStreamSubscription<T>
{
private readonly EventStream<T> _eventStream;

public EventStreamSubscription(EventStream<T> eventStream, IDispatcher dispatcher, Func<T, Task> action)
public EventStreamSubscription(EventStream<T> eventStream, IDispatcher dispatcher, Func<T, Task> action, string name)
{
Id = Guid.NewGuid();
_eventStream = eventStream;
Dispatcher = dispatcher;
Action = action;
Name = name;
}

public string Name { get; }

public Guid Id { get; }
public IDispatcher Dispatcher { get; }
public Func<T, Task> Action { get; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Diagnostics;

namespace Proto.OpenTelemetry;
namespace Proto;

/// <summary>
/// Proto.actor specific tags on the <see cref="Activity" />
Expand Down Expand Up @@ -48,4 +48,7 @@ public static class ProtoTags
/// Name of the current action
/// </summary>
public const string ActionType = "proto.action";

public const string EventSubscriber = "proto.eventsubscriber";
public const string TargetName = "proto.targetname";
}
Loading

0 comments on commit d300571

Please sign in to comment.