diff --git a/src/WART-Client/WART-Client.csproj b/src/WART-Client/WART-Client.csproj index 11b0635..bef0a81 100755 --- a/src/WART-Client/WART-Client.csproj +++ b/src/WART-Client/WART-Client.csproj @@ -24,7 +24,7 @@ - + diff --git a/src/WART-Client/WartTestClient.cs b/src/WART-Client/WartTestClient.cs index 168e3b6..5bfb666 100755 --- a/src/WART-Client/WartTestClient.cs +++ b/src/WART-Client/WartTestClient.cs @@ -53,7 +53,7 @@ public static async Task ConnectAsync(string wartHubUrl) } catch (Exception e) { - Console.WriteLine(e); + Console.WriteLine(e.Message); } await Task.CompletedTask; diff --git a/src/WART-Client/WartTestClientJwt.cs b/src/WART-Client/WartTestClientJwt.cs index c94bdb8..7dfdd73 100755 --- a/src/WART-Client/WartTestClientJwt.cs +++ b/src/WART-Client/WartTestClientJwt.cs @@ -58,7 +58,7 @@ public static async Task ConnectAsync(string wartHubUrl, string key) } catch (Exception e) { - Console.WriteLine(e); + Console.WriteLine(e.Message); } await Task.CompletedTask; diff --git a/src/WART-Core/Authentication/JWT/JwtServiceCollectionExtension.cs b/src/WART-Core/Authentication/JWT/JwtServiceCollectionExtension.cs index 835fe3f..2686146 100755 --- a/src/WART-Core/Authentication/JWT/JwtServiceCollectionExtension.cs +++ b/src/WART-Core/Authentication/JWT/JwtServiceCollectionExtension.cs @@ -11,6 +11,8 @@ using Microsoft.Extensions.Logging; using Microsoft.IdentityModel.Tokens; using System.Linq; +using WART_Core.Hubs; +using WART_Core.Services; namespace WART_Core.Authentication.JWT { @@ -79,6 +81,12 @@ public static IServiceCollection AddJwtMiddleware(this IServiceCollection servic }; }); + // Register WART event queue as a singleton service. + services.AddSingleton(); + + // Register the WART event worker as a hosted service. + services.AddHostedService>(); + // Configure SignalR options, including error handling and timeouts services.AddSignalR(options => { diff --git a/src/WART-Core/Controllers/WartBaseController.cs b/src/WART-Core/Controllers/WartBaseController.cs index c45b999..26439f9 100644 --- a/src/WART-Core/Controllers/WartBaseController.cs +++ b/src/WART-Core/Controllers/WartBaseController.cs @@ -4,12 +4,11 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using WART_Core.Entity; using WART_Core.Filters; +using Microsoft.Extensions.DependencyInjection; +using WART_Core.Services; namespace WART_Core.Controllers { @@ -19,6 +18,8 @@ public abstract class WartBaseController : Controller where THub : Hub private readonly IHubContext _hubContext; private const string RouteDataKey = "REQUEST"; + private WartEventQueueService _eventQueue; + protected WartBaseController(IHubContext hubContext, ILogger logger) { _hubContext = hubContext; @@ -39,7 +40,7 @@ public override void OnActionExecuting(ActionExecutingContext context) /// Processes the executed action and sends the event to the SignalR hub if applicable. /// /// The action executed context. - public override async void OnActionExecuted(ActionExecutedContext context) + public override void OnActionExecuted(ActionExecutedContext context) { if (context?.Result is ObjectResult objectResult) { @@ -52,45 +53,13 @@ public override async void OnActionExecuted(ActionExecutedContext context) var response = objectResult.Value; var wartEvent = new WartEvent(request, response, httpMethod, httpPath, remoteAddress); - await SendToHub(wartEvent, [.. context.Filters]); + + _eventQueue = context.HttpContext?.RequestServices.GetService(); + _eventQueue?.Enqueue(new WartEventWithFilters(wartEvent, [.. context.Filters])); } } base.OnActionExecuted(context); } - - /// - /// Sends the current event to the SignalR hub. - /// - /// The current WartEvent. - /// The list of filters applied to the request. - private async Task SendToHub(WartEvent wartEvent, List filters) - { - try - { - if (filters.Any(f => f.GetType().Name == nameof(GroupWartAttribute))) - { - var wartGroup = filters.OfType().FirstOrDefault(); - var groups = wartGroup?.GroupNames; - if (groups != null) - { - foreach (var group in groups) - { - await _hubContext.Clients.Group(group).SendAsync("Send", wartEvent.ToString()); - _logger?.LogInformation($"Group: {group}, WartEvent: {wartEvent}"); - } - } - } - else - { - await _hubContext.Clients.All.SendAsync("Send", wartEvent.ToString()); - _logger?.LogInformation("Event: {EventName}, Details: {EventDetails}", nameof(WartEvent), wartEvent.ToString()); - } - } - catch (Exception ex) - { - _logger?.LogError(ex, "Error sending WartEvent to clients"); - } - } } } diff --git a/src/WART-Core/Entity/WartEventWithFilters.cs b/src/WART-Core/Entity/WartEventWithFilters.cs new file mode 100644 index 0000000..7e3ca99 --- /dev/null +++ b/src/WART-Core/Entity/WartEventWithFilters.cs @@ -0,0 +1,35 @@ +// (c) 2024 Francesco Del Re +// This code is licensed under MIT license (see LICENSE.txt for details) +using Microsoft.AspNetCore.Mvc.Filters; +using System.Collections.Generic; + +namespace WART_Core.Entity +{ + /// + /// Represents an event that contains additional filter metadata. + /// + public class WartEventWithFilters + { + /// + /// The main WartEvent object. + /// + public WartEvent WartEvent { get; set; } + + /// + /// A list of filters applied to the event. + /// + public List Filters { get; set; } + + /// + /// Initializes a new instance of the WartEventWithFilters class. + /// + /// The WartEvent to associate with the filters. + /// The list of filters applied to the event. + public WartEventWithFilters(WartEvent wartEvent, List filters) + { + // Initialize the WartEvent and Filters properties + WartEvent = wartEvent; + Filters = filters; + } + } +} \ No newline at end of file diff --git a/src/WART-Core/Hubs/WartHubBase.cs b/src/WART-Core/Hubs/WartHubBase.cs index 0a55722..a1a76b3 100644 --- a/src/WART-Core/Hubs/WartHubBase.cs +++ b/src/WART-Core/Hubs/WartHubBase.cs @@ -122,5 +122,10 @@ public static int GetConnectionsCount() { return _connections.Count; } + + /// + /// Returns a value indicating whether there are connected clients. + /// + public static bool HasConnectedClients => !_connections.IsEmpty; } } \ No newline at end of file diff --git a/src/WART-Core/Middleware/WartServiceCollectionExtension.cs b/src/WART-Core/Middleware/WartServiceCollectionExtension.cs index 66ec4a8..52bbb27 100755 --- a/src/WART-Core/Middleware/WartServiceCollectionExtension.cs +++ b/src/WART-Core/Middleware/WartServiceCollectionExtension.cs @@ -9,6 +9,8 @@ using System.Linq; using WART_Core.Authentication.JWT; using WART_Core.Enum; +using WART_Core.Hubs; +using WART_Core.Services; namespace WART_Core.Middleware { @@ -32,6 +34,12 @@ public static IServiceCollection AddWartMiddleware(this IServiceCollection servi // Add console logging. services.AddLogging(configure => configure.AddConsole()); + // Register WART event queue as a singleton service. + services.AddSingleton(); + + // Register the WART event worker as a hosted service. + services.AddHostedService>(); + // Configure SignalR with custom options. services.AddSignalR(options => { diff --git a/src/WART-Core/Services/WartEventQueueService.cs b/src/WART-Core/Services/WartEventQueueService.cs new file mode 100644 index 0000000..a13bbcb --- /dev/null +++ b/src/WART-Core/Services/WartEventQueueService.cs @@ -0,0 +1,43 @@ +// (c) 2024 Francesco Del Re +// This code is licensed under MIT license (see LICENSE.txt for details) +using System.Collections.Concurrent; +using WART_Core.Entity; + +namespace WART_Core.Services +{ + /// + /// A service that manages a concurrent queue for WartEvent objects with filters. + /// This class provides methods for enqueuing and dequeuing events. + /// + public class WartEventQueueService + { + // A thread-safe queue to hold WartEvent objects along with their associated filters. + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + + /// + /// Enqueues a WartEventWithFilters object to the queue. + /// + /// The WartEventWithFilters object to enqueue. + public void Enqueue(WartEventWithFilters wartEventWithFilters) + { + // Adds the event with filters to the concurrent queue. + _queue.Enqueue(wartEventWithFilters); + } + + /// + /// Attempts to dequeue a WartEventWithFilters object from the queue. + /// + /// The dequeued WartEventWithFilters object. + /// True if an event was dequeued; otherwise, false. + public bool TryDequeue(out WartEventWithFilters wartEventWithFilters) + { + // Attempts to remove and return the event with filters from the queue. + return _queue.TryDequeue(out wartEventWithFilters); + } + + /// + /// Gets the current count of events in the queue. + /// + public int Count => _queue.Count; + } +} \ No newline at end of file diff --git a/src/WART-Core/Services/WartEventWorker.cs b/src/WART-Core/Services/WartEventWorker.cs new file mode 100644 index 0000000..5004965 --- /dev/null +++ b/src/WART-Core/Services/WartEventWorker.cs @@ -0,0 +1,167 @@ +// (c) 2024 Francesco Del Re +// This code is licensed under MIT license (see LICENSE.txt for details) +using Microsoft.AspNetCore.Mvc.Filters; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using WART_Core.Entity; +using WART_Core.Filters; +using WART_Core.Hubs; + +namespace WART_Core.Services +{ + /// + /// Background service that processes WartEvent objects from a queue and sends them to SignalR clients. + /// + public class WartEventWorker : BackgroundService where THub : Hub + { + private readonly WartEventQueueService _eventQueue; + private readonly IHubContext _hubContext; + private readonly ILogger> _logger; + + /// + /// Constructor that initializes the worker with the event queue, hub context, and logger. + /// + public WartEventWorker(WartEventQueueService eventQueue, IHubContext hubContext, ILogger> logger) + { + _eventQueue = eventQueue; + _hubContext = hubContext; + _logger = logger; + } + + /// + /// Method that runs in the background to dequeue events and send them to SignalR clients. + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("WartEventWorker started."); + + // The worker will keep running as long as the cancellation token is not triggered. + while (!stoppingToken.IsCancellationRequested) + { + // Check if there are any connected clients. + if (!WartHubBase.HasConnectedClients) + { + await Task.Delay(500, stoppingToken); + continue; + } + + // Dequeue events and process them. + while (_eventQueue.TryDequeue(out var wartEventWithFilters)) + { + try + { + // Extract the event and filters. + var wartEvent = wartEventWithFilters.WartEvent; + var filters = wartEventWithFilters.Filters; + + // Send the event to the SignalR hub. + await SendToHub(wartEvent, filters); + + _logger.LogInformation("Event sent: {Event}", wartEvent); + } + catch (Exception ex) + { + // Log any errors that occur while sending the event. + _logger.LogError(ex, "Error while sending event."); + + // Re-enqueue the event for retry + // We lost the order of the events, but we can't lose the events + _eventQueue.Enqueue(wartEventWithFilters); + } + } + + // Wait for 200 ms before checking for new events in the queue. + await Task.Delay(200, stoppingToken); + } + + _logger.LogInformation("WartEventWorker stopped."); + } + + /// + /// Sends the current event to the SignalR hub. + /// This method determines if the event should be sent to specific groups or all clients. + /// + private async Task SendToHub(WartEvent wartEvent, List filters) + { + try + { + // Retrieve the target groups based on the filters. + var groups = GetTargetGroups(filters); + + // If specific groups are defined, send the event to each group in parallel. + if (groups.Count != 0) + { + var tasks = groups.Select(group => SendEventToGroup(wartEvent, group)); + await Task.WhenAll(tasks); + } + else + { + // If no groups are defined, send the event to all clients. + await SendEventToAllClients(wartEvent); + } + } + catch (Exception ex) + { + // Log errors that occur while sending events to SignalR clients. + _logger?.LogError(ex, "Error sending WartEvent to clients"); + + throw; + } + } + + /// + /// Retrieves the list of groups that the WartEvent should be sent to, based on the provided filters. + /// + /// The list of filters that may contain group-related information. + /// A list of group names to send the WartEvent to. + private List GetTargetGroups(List filters) + { + var groups = new List(); + + // Check if there is a GroupWartAttribute filter indicating the groups. + if (filters.Any(f => f.GetType().Name == nameof(GroupWartAttribute))) + { + var wartGroup = filters.FirstOrDefault(f => f.GetType() == typeof(GroupWartAttribute)) as GroupWartAttribute; + if (wartGroup != null) + { + groups.AddRange(wartGroup.GroupNames); + } + } + + return groups; + } + + /// + /// Sends the WartEvent to a specific group of clients. + /// + private async Task SendEventToGroup(WartEvent wartEvent, string group) + { + // Send the event to the group using SignalR. + await _hubContext?.Clients + .Group(group) + .SendAsync("Send", wartEvent.ToString()); + + // Log the event sent to the group. + _logger?.LogInformation($"Group: {group}, WartEvent: {wartEvent}"); + } + + /// + /// Sends the WartEvent to all connected clients. + /// + private async Task SendEventToAllClients(WartEvent wartEvent) + { + // Send the event to all clients using SignalR. + await _hubContext?.Clients.All + .SendAsync("Send", wartEvent.ToString()); + + // Log the event sent to all clients. + _logger?.LogInformation("Event: {EventName}, Details: {EventDetails}", nameof(WartEvent), wartEvent.ToString()); + } + } +} \ No newline at end of file diff --git a/src/WART-WebApiRealTime/WART-Api.csproj b/src/WART-WebApiRealTime/WART-Api.csproj index 4a6f538..b4a5560 100755 --- a/src/WART-WebApiRealTime/WART-Api.csproj +++ b/src/WART-WebApiRealTime/WART-Api.csproj @@ -8,7 +8,7 @@ - +