Skip to content

Commit

Permalink
Proposal for sending failure items using the new failure event hooks …
Browse files Browse the repository at this point in the history
…for instant notification (#3)

* Proposal for sending failure items using the new failure event hooks for instant notification

* Bugfixes

* Add a background worker task with retries for pushover failure reports.

Co-authored-by: Stefan Berg <[email protected]>
  • Loading branch information
isbeorn and isbeorn authored Jul 26, 2022
1 parent 895e9e7 commit ffb2db7
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 65 deletions.
4 changes: 2 additions & 2 deletions Email/EmailCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public async Task SendEmail(MimeMessage message, CancellationToken ct) {
await smtp.DisconnectAsync(true, ct);
} catch (SocketException ex) {
Logger.Error($"SmtpEmail: Connection to {SmtpHostName}:{SmtpHostPort} failed: {ex.SocketErrorCode}: {ex.Message}");
throw ex;
throw;
} catch (AuthenticationException ex) {
Logger.Error($"SendEmail: User {SmtpUsername} failed to authenticate with {SmtpHostName}:{SmtpHostPort}");
throw ex;
throw;
}
}

Expand Down
22 changes: 11 additions & 11 deletions Ground Station.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -178,37 +178,37 @@
<Version>3.1.1</Version>
</PackageReference>
<PackageReference Include="NINA.Astrometry">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Core">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Equipment">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Image">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.MGEN">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.PlateSolving">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Plugin">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Profile">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.Sequencer">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINA.WPF.Base">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="NINACustomControlLibrary">
<Version>2.0.0.2050-preview3</Version>
<Version>2.0.1.2012-beta</Version>
</PackageReference>
<PackageReference Include="PushoverNET">
<Version>1.0.28</Version>
Expand Down
4 changes: 2 additions & 2 deletions Ground Station.sln
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{738D56B5-DF32-4932-9344-883E9731D3AB}.Debug|Any CPU.ActiveCfg = Release|Any CPU
{738D56B5-DF32-4932-9344-883E9731D3AB}.Debug|Any CPU.Build.0 = Release|Any CPU
{738D56B5-DF32-4932-9344-883E9731D3AB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{738D56B5-DF32-4932-9344-883E9731D3AB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{738D56B5-DF32-4932-9344-883E9731D3AB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{738D56B5-DF32-4932-9344-883E9731D3AB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
Expand Down
10 changes: 5 additions & 5 deletions MQTT/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public async Task Connect(IMqttClientOptions clientOptions, CancellationToken ct
return;
} catch (Exception ex) {
Logger.Error($"Error connecting to broker: {ex.Message}");
throw ex;
throw;
}
}

Expand All @@ -151,7 +151,7 @@ public async Task<MqttClientPublishResult> Publish(CancellationToken ct) {
}
} catch (Exception ex) {
Logger.Error($"Error publishing to broker: {ex.Message}");
throw ex;
throw;
}

return result;
Expand All @@ -166,7 +166,7 @@ public async Task Ping(CancellationToken ct) {
}
} catch (Exception ex) {
Logger.Error($"Error pinging broker: {ex.Message}");
throw ex;
throw;
}
}

Expand All @@ -182,7 +182,7 @@ public async Task Disconnect(CancellationToken ct) {
}
} catch (Exception ex) {
Logger.Error($"Error disconnecting from broker: {ex.Message}");
throw ex;
throw;
}
}

Expand All @@ -204,7 +204,7 @@ public async Task Subscribe(CancellationToken ct) {
}
} catch (Exception ex) {
Logger.Error($"Error subscribing to topic \"{Topic}\": {ex.Message}");
throw ex;
throw;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion MQTT/MqttCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task PublishMessage(string topic, string message, int qos, Cancella
await mqttClient.Disconnect(ct);
} catch (Exception ex) {
Logger.Error($"Error sending to MQTT broker: {ex.Message}");
throw ex;
throw;
}
}

Expand Down
8 changes: 4 additions & 4 deletions Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

// [MANDATORY] The assembly versioning
//Should be incremented for each new release build of a plugin
[assembly: AssemblyVersion("1.11.0.0")]
[assembly: AssemblyFileVersion("1.11.0.0")]
[assembly: AssemblyVersion("1.11.1.0")]
[assembly: AssemblyFileVersion("1.11.1.0")]

// [MANDATORY] The name of your plugin
[assembly: AssemblyTitle("Ground Station")]
Expand All @@ -23,7 +23,7 @@
[assembly: AssemblyCopyright("Copyright © 2022 Dale Ghent")]

// The minimum Version of N.I.N.A. that this plugin is compatible with
[assembly: AssemblyMetadata("MinimumApplicationVersion", "2.0.0.2050")]
[assembly: AssemblyMetadata("MinimumApplicationVersion", "2.0.1.2012")]

// The license your plugin code is using
[assembly: AssemblyMetadata("License", "MPL-2.0")]
Expand Down Expand Up @@ -87,4 +87,4 @@ Information about your session or any failures may be inserted into the messages
// [Unused]
[assembly: AssemblyTrademark("")]
// [Unused]
[assembly: AssemblyCulture("")]
[assembly: AssemblyCulture("")]
155 changes: 118 additions & 37 deletions Pushover/FailuresToPushoverTrigger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This Source Code Form is subject to the terms of the Mozilla Public
using NINA.Sequencer.SequenceItem;
using NINA.Sequencer.Trigger;
using NINA.Sequencer.Validations;
using NINA.Sequencer.Utility;
using PushoverClient;
using System;
using System.Collections.Generic;
Expand All @@ -29,6 +30,7 @@ This Source Code Form is subject to the terms of the Mozilla Public
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

namespace DaleGhent.NINA.GroundStation.FailuresToPushoverTrigger {

Expand All @@ -40,10 +42,12 @@ namespace DaleGhent.NINA.GroundStation.FailuresToPushoverTrigger {
[JsonObject(MemberSerialization.OptIn)]
public class FailuresToPushoverTrigger : SequenceTrigger, IValidatable {
private PushoverCommon pushover;
private ISequenceItem previousItem;
private Priority priority;
private NotificationSound notificationSound;

private CancellationTokenSource workerCts;
private AsyncProducerConsumerQueue<SequenceEntityFailureEventArgs> messageQueue;

[ImportingConstructor]
public FailuresToPushoverTrigger() {
pushover = new PushoverCommon();
Expand All @@ -60,6 +64,116 @@ public FailuresToPushoverTrigger(FailuresToPushoverTrigger copyMe) : this() {
CopyMetaData(copyMe);
}

private ISequenceRootContainer failureHook;

public override void Initialize() {
workerCts = new CancellationTokenSource();
_ = RunMessageQueueWorker();
}

public override void Teardown() {
try {
// Cancel running worker
workerCts?.Dispose();
messageQueue.CompleteAdding();
} catch (Exception) { }
}

public override void AfterParentChanged() {
var root = ItemUtility.GetRootContainer(this.Parent);
if (root == null && failureHook != null) {
// When trigger is removed from sequence, unregister event handler
// This could potentially be skipped by just using weak events instead
failureHook.FailureEvent -= Root_FailureEvent;
failureHook = null;
} else if (root != null && root != failureHook && this.Parent.Status == SequenceEntityStatus.RUNNING) {
try {
// Cancel running worker
workerCts?.Dispose();
} catch (Exception) { }
// When dragging the item into the sequence while the sequence is already running
// Make sure to register the event handler as "SequenceBlockInitialized" is already done
failureHook = root;
failureHook.FailureEvent += Root_FailureEvent;
workerCts = new CancellationTokenSource();
_ = RunMessageQueueWorker();
}
base.AfterParentChanged();
}

public override void SequenceBlockInitialize() {
// Register failure event when the parent context starts
failureHook = ItemUtility.GetRootContainer(this.Parent);
if (failureHook != null) {
failureHook.FailureEvent += Root_FailureEvent;
}
base.SequenceBlockInitialize();
}

public override void SequenceBlockTeardown() {
// Unregister failure event when the parent context ends
failureHook = ItemUtility.GetRootContainer(this.Parent);
if (failureHook != null) {
failureHook.FailureEvent -= Root_FailureEvent;
}
}

private async Task Root_FailureEvent(object arg1, SequenceEntityFailureEventArgs arg2) {
if (arg2.Entity == null) {
// An exception without context has occurred. Not sure when this can happen
// Todo: Might be worthwile to send in a different style
return;
}

if (arg2.Entity is FailuresToPushoverTrigger || arg2.Entity is SendToPushover.SendToPushover) {
// Prevent pushover items to send pushover failures
return;
}

await messageQueue.EnqueueAsync(arg2);
}

private async Task RunMessageQueueWorker() {
try {
messageQueue = new AsyncProducerConsumerQueue<SequenceEntityFailureEventArgs>(1000);
while (await messageQueue.OutputAvailableAsync(workerCts.Token)) {
try {
var item = await messageQueue.DequeueAsync(workerCts.Token);

var failedItem = FailedItem.FromEntity(item.Entity, item.Exception);

var title = Utilities.Utilities.ResolveTokens(PushoverFailureTitleText, item.Entity);
var message = Utilities.Utilities.ResolveTokens(PushoverFailureBodyText, item.Entity);

title = Utilities.Utilities.ResolveFailureTokens(title, failedItem);
message = Utilities.Utilities.ResolveFailureTokens(message, failedItem);

var attempts = 3; // Todo: Make it configurable?
for (int i = 0; i < attempts; i++) {
try {
var newCts = CancellationTokenSource.CreateLinkedTokenSource(workerCts.Token);
using (workerCts.Token.Register(() => newCts.CancelAfter(TimeSpan.FromSeconds(Utilities.Utilities.cancelTimeout)))) {
workerCts.Token.ThrowIfCancellationRequested();
await pushover.PushMessage(title, message, Priority, NotificationSound, newCts.Token);
// When successful break the retry loop
break;
}
} catch (Exception ex) {
Logger.Error($"Pushover failed to send message. Attempt {i + 1}/{attempts}", ex);
}
}
} catch (OperationCanceledException) {
throw;
} catch (Exception ex) {
Logger.Error(ex);
}
}
} catch (OperationCanceledException) {
} catch (Exception ex) {
Logger.Error(ex);
}
}

[JsonProperty]
public Priority Priority {
get => priority;
Expand All @@ -81,47 +195,16 @@ public NotificationSound NotificationSound {
public Priority[] Priorities => Enum.GetValues(typeof(Priority)).Cast<Priority>().Where(p => p != Priority.Emergency).ToArray();
public NotificationSound[] NotificationSounds => Enum.GetValues(typeof(NotificationSound)).Cast<NotificationSound>().Where(p => p != NotificationSound.NotSet).ToArray();

public override async Task Execute(ISequenceContainer context, IProgress<ApplicationStatus> progress, CancellationToken ct) {
foreach (var failedItem in FailedItems) {
var title = Utilities.Utilities.ResolveTokens(PushoverFailureTitleText, previousItem);
var message = Utilities.Utilities.ResolveTokens(PushoverFailureBodyText, previousItem);

title = Utilities.Utilities.ResolveFailureTokens(title, failedItem);
message = Utilities.Utilities.ResolveFailureTokens(message, failedItem);

var newCts = new CancellationTokenSource();
using (ct.Register(() => newCts.CancelAfter(TimeSpan.FromSeconds(Utilities.Utilities.cancelTimeout)))) {
await pushover.PushMessage(title, message, Priority, NotificationSound, newCts.Token);
}
}

FailedItems.Clear();
public override Task Execute(ISequenceContainer context, IProgress<ApplicationStatus> progress, CancellationToken ct) {
return Task.CompletedTask;
}

public override bool ShouldTrigger(ISequenceItem previousItem, ISequenceItem nextItem) {
return false;
}

public override bool ShouldTriggerAfter(ISequenceItem previousItem, ISequenceItem nextItem) {
if (previousItem == null) {
Logger.Debug("Previous item is null. Asserting false");
return false;
}

this.previousItem = previousItem;

this.previousItem.Name = this.previousItem.Name ?? this.previousItem.ToString();
this.previousItem.Category = this.previousItem.Category ?? this.previousItem.ToString();

if (this.previousItem.Name.Contains("Pushover") && this.previousItem.Category.Equals(Category)) {
Logger.Debug("Previous item is related. Asserting false");
return false;
}

FailedItems.Clear();
FailedItems = Utilities.Utilities.GetFailedItems(this.previousItem);

return FailedItems.Count > 0;
return false;
}

public IList<string> Issues { get; set; } = new ObservableCollection<string>();
Expand All @@ -148,8 +231,6 @@ public override string ToString() {
return $"Category: {Category}, Item: {nameof(FailuresToPushoverTrigger)}";
}

private List<FailedItem> FailedItems { get; set; } = new List<FailedItem>();

private string PushoverFailureTitleText { get; set; }
private string PushoverFailureBodyText { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion Pushover/PushoverCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task PushMessage(string title, string message, Priority priority, N
}
} catch (Exception ex) {
Logger.Error($"Error sending to Pushover: {ex.Message}");
throw ex;
throw;
}
}

Expand Down
2 changes: 1 addition & 1 deletion Telegram/TelegramCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task SendTelegram(string message, bool doNotNotify, CancellationTok
await bclient.SendTextMessageAsync(TelegramChatId, message, disableNotification: doNotNotify, cancellationToken: ct);
} catch (Exception ex) {
Logger.Error($"Error sending to Telegram: {ex.Message}");
throw ex;
throw;
}
}

Expand Down
Loading

0 comments on commit ffb2db7

Please sign in to comment.