Skip to content

Commit

Permalink
Merge pull request #156 from HicServices/threshold-assoc
Browse files Browse the repository at this point in the history
Threshold assoc
  • Loading branch information
tznind authored Jul 20, 2021
2 parents ccf0a88 + c820cdf commit 96110f1
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Added

- Added support for customising `DicomClient` client settings in PACSSource caching component (e.g. `AssociationLingerTimeoutInMs`)
- Added logging of association requests in PACSSource
- Added new field MaximumAllowableAssociationEventsPerMinute for shutting down the executing process if the number of Association events crosses the given threshold

## [3.0.0] 2021-06-05

Expand Down
84 changes: 84 additions & 0 deletions Rdmp.Dicom.Tests/PressureGaugeTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using NUnit.Framework;
using Rdmp.Dicom.Cache.Pipeline;
using ReusableLibraryCode.Progress;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Rdmp.Dicom.Tests
{
class PressureGaugeTests
{
[Test]
public void TestGauge_NotReached()
{
bool someFact = false;

var g = new PressureGauge();
g.ThresholdBeatsPerMinute = 4;
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
}
[Test]
public void TestGauge_NotReached_OverTime()
{
bool someFact = false;

var g = new PressureGauge();
g.ThresholdBeatsPerMinute = 1;

// events are 1 minute appart so does not trigger
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 02, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 03, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
}
[Test]
public void TestGauge_Reached()
{
bool someFact = false;

var g = new PressureGauge();
g.ThresholdBeatsPerMinute = 4;
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsTrue(someFact);
}

[Test]
public void TestGauge_Reached_OverTime()
{
bool someFact = false;

var g = new PressureGauge();
g.ThresholdBeatsPerMinute = 1;
g.Tick(new DateTime(2001, 01, 01, 01, 01, 01), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 01, 30), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsTrue(someFact);
}
[Test]
public void TestGauge_Reached_OverTime_Boundary()
{
bool someFact = false;

var g = new PressureGauge();
g.ThresholdBeatsPerMinute = 1;
g.Tick(new DateTime(2001, 01, 01, 01, 01, 30), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsFalse(someFact);
g.Tick(new DateTime(2001, 01, 01, 01, 02, 29), new ThrowImmediatelyDataLoadEventListener(), () => someFact = true);
Assert.IsTrue(someFact);
}
}
}
22 changes: 21 additions & 1 deletion Rdmp.Dicom/Cache/Pipeline/PACSSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,15 @@ public class PACSSource : SMICacheSource
[DemandsInitialization("The maximum number of DICOM requests that are allowed to be sent over one single association. When this limit is reached, the DICOM client will wait for pending requests to complete, and then open a new association to send the remaining requests, if any. If not provided then int.MaxValue is used (i.e. keep reusing association)")]
public int? MaximumNumberOfRequestsPerAssociation { get; set; }

[DemandsInitialization("The maximum number of Association related events that can be permitted per minute before the system exits",DefaultValue = 30)]
public int MaximumAllowableAssociationEventsPerMinute { get; set; } = 30;

public static PressureGauge gauge = new PressureGauge() { ThresholdBeatsPerMinute = 30 };

public override SMIDataChunk DoGetChunk(ICacheFetchRequest cacheRequest, IDataLoadEventListener listener,GracefulCancellationToken cancellationToken)
{
gauge.ThresholdBeatsPerMinute = MaximumAllowableAssociationEventsPerMinute;

listener.OnNotify(this,new NotifyEventArgs(ProgressEventType.Information,$"PACSSource version is {typeof(PACSSource).Assembly.GetName().Version}. Assembly is {typeof(PACSSource).Assembly} " ));
listener.OnNotify(this,new NotifyEventArgs(ProgressEventType.Information,$"Fo-Dicom version is {typeof(DicomClient).Assembly.GetName().Version}. Assembly is {typeof(DicomClient).Assembly} " ));

Expand Down Expand Up @@ -80,8 +87,21 @@ public override SMIDataChunk DoGetChunk(ICacheFetchRequest cacheRequest, IDataLo
using (var server = (DicomServer<CachingSCP>) DicomServer.Create<CachingSCP>(dicomConfiguration.LocalAetUri.Port))
{
DicomClient client = new DicomClient(dicomConfiguration.RemoteAetUri.Host, dicomConfiguration.RemoteAetUri.Port, false, dicomConfiguration.LocalAetTitle, dicomConfiguration.RemoteAetTitle);
client.AssociationAccepted += (s, e) => {
gauge.Tick(listener, () => Process.GetCurrentProcess().Kill());
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, "AssociationAccepted"));
};
client.AssociationReleased += (s, e) => {
gauge.Tick(listener, () => Process.GetCurrentProcess().Kill());
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"AssociationReleased"));
};
client.AssociationRejected += (s, e) =>
{
gauge.Tick(listener,()=>Process.GetCurrentProcess().Kill());
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"AssociationRejected"));
};

if(AssociationLingerTimeoutInMs != null && AssociationLingerTimeoutInMs > 0)
if (AssociationLingerTimeoutInMs != null && AssociationLingerTimeoutInMs > 0)
client.AssociationLingerTimeoutInMs = AssociationLingerTimeoutInMs.Value;

if(AssociationReleaseTimeoutInMs != null && AssociationReleaseTimeoutInMs > 0)
Expand Down
62 changes: 62 additions & 0 deletions Rdmp.Dicom/Cache/Pipeline/PressureGauge.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using ReusableLibraryCode.Progress;
using System;
using System.Collections.Generic;
using System.Linq;

namespace Rdmp.Dicom.Cache.Pipeline
{
/// <summary>
/// Tracks unspecified events and performs a delegate action when the frequency exceeds the <see cref="ThresholdBeatsPerMinute"/>
/// </summary>
public class PressureGauge
{
/// <summary>
/// Number of events that are allowable per minute
/// </summary>
public long ThresholdBeatsPerMinute { get; set; }

List<DateTime> collection = new List<DateTime>();
object oLock = new object();

public PressureGauge()
{
}

/// <summary>
/// Marks that an event happened at the current date time
/// </summary>
/// <param name="listener"></param>
/// <param name="pressureTooHigh">Delegate to invoke if <see cref="ThresholdBeatsPerMinute"/> is exceeded</param>
public void Tick(IDataLoadEventListener listener, Action pressureTooHigh)
{
Tick(DateTime.Now, listener, pressureTooHigh);
}

/// <summary>
/// Marks that an event happened at <paramref name="eventDate"/>
/// </summary>
/// <param name="eventDate">Time of event. Must be greater than any previous event dates</param>
/// <param name="listener"></param>
/// <param name="pressureTooHigh">Delegate to invoke if <see cref="ThresholdBeatsPerMinute"/> is exceeded</param>
public void Tick(DateTime eventDate, IDataLoadEventListener listener, Action pressureTooHigh)
{
bool exceeded = false;
lock (oLock)
{
// filter collection to only recent events
collection = collection.Where(c => eventDate.Subtract(c) < TimeSpan.FromMinutes(1)).ToList();

collection.Add(eventDate);

exceeded = collection.Count > ThresholdBeatsPerMinute;
}

if(exceeded)
{
// Important to use log level Information here and not Error in case the listener breaks flow control e.g. ThrowImmediately listener
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, "ThresholdBeatsPerMinute exceeded, invoking delegate"));
pressureTooHigh();
}
}
}
}

0 comments on commit 96110f1

Please sign in to comment.