Skip to content

Commit

Permalink
Release v1.4.5 (#134)
Browse files Browse the repository at this point in the history
* Fix line endings

* Add Shutdown call to messaging consumers, defaulting to doing nothing
Call this from Rabbit adapter when shutting down in threaded mode
Use that to clean up and drain the Ack queue in IdentifierMapper

Co-authored-by: James A Sutherland <[email protected]>
  • Loading branch information
rkm and jas88 authored Feb 26, 2020
1 parent f685ef2 commit 47cf2c6
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 20 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

...

## [1.4.5] - 2020-02-26

- Add clean shutdown hook for IdentifierMapper to clean up the worker threads

## [1.4.4] - 2020-02-25

- Update Travis config and Java library install shell script to resolve some Travis stability issues
Expand Down Expand Up @@ -181,7 +185,8 @@ First stable release after importing the repository from the private [SMIPlugin]
- Anonymous `MappingTableName` must now be fully specified to pass validation (e.g. `mydb.mytbl`). Previously skipping database portion was supported.


[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.4...develop
[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.5...develop
[1.4.5]: https://github.com/SMI/SmiServices/compare/v1.4.4...v1.4.5
[1.4.4]: https://github.com/SMI/SmiServices/compare/v1.4.3...v1.4.4
[1.4.3]: https://github.com/SMI/SmiServices/compare/v1.4.2...v1.4.3
[1.4.2]: https://github.com/SMI/SmiServices/compare/v1.4.1...v1.4.2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
![GitHub](https://img.shields.io/github/license/SMI/SmiServices)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/SMI/SmiServices.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/SMI/SmiServices/alerts/)

Version: `1.4.4`
Version: `1.4.5`

# SMI Services

Expand Down
6 changes: 3 additions & 3 deletions src/SharedAssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
[assembly: AssemblyCulture("")]

// These should be overwritten by release builds
[assembly: AssemblyVersion("1.4.4")]
[assembly: AssemblyFileVersion("1.4.4")]
[assembly: AssemblyInformationalVersion("1.4.4")] // This one can have the extra build info after it
[assembly: AssemblyVersion("1.4.5")]
[assembly: AssemblyFileVersion("1.4.5")]
[assembly: AssemblyInformationalVersion("1.4.5")] // This one can have the extra build info after it
4 changes: 4 additions & 0 deletions src/common/Smi.Common/Messaging/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public abstract class Consumer : IConsumer

protected IModel Model;

public virtual void Shutdown()
{

}

protected Consumer()
{
Expand Down
5 changes: 5 additions & 0 deletions src/common/Smi.Common/Messaging/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ public interface IConsumer
///
/// </summary>
event ConsumerFatalHandler OnFatal;

/// <summary>
/// Trigger a clean shutdown of worker threads etc
/// </summary>
void Shutdown();
}
}
22 changes: 21 additions & 1 deletion src/common/Smi.Common/RabbitMQAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ public void Shutdown(int timeout = 5000)
/// <param name="cancellationToken"></param>
private void Consume(ISubscription subscription, IConsumer consumer, CancellationToken cancellationToken)
{
ReaderWriterLockSlim worklock = new ReaderWriterLockSlim();
IModel m = subscription.Model;
consumer.SetModel(m);

Expand All @@ -409,13 +410,32 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio
{
Task.Run(() =>
{
consumer.ProcessMessage(e);
worklock.EnterReadLock();
try
{
consumer.ProcessMessage(e);
}
finally
{
worklock.ExitReadLock();
}
},cancellationToken);
}
else
consumer.ProcessMessage(e);
}
}
if (_threaded)
{
// Taking a write lock means waiting for all read locks to
// release, i.e. all workers have finished
worklock.EnterWriteLock();

// Now there are no *new* messages being processed, flush the queue
consumer.Shutdown();
worklock.ExitWriteLock();
}
worklock.Dispose();

string reason = "unknown";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,50 @@ public IdentifierMapperQueueConsumer(IProducerModel producer, ISwapIdentifiers s
_swapper = swapper;
acker=new Thread(() =>
{
while (true)
try
{
List<Tuple<IMessageHeader, BasicDeliverEventArgs>> done = new List<Tuple<IMessageHeader, BasicDeliverEventArgs>>();
Tuple<DicomFileMessage, IMessageHeader, BasicDeliverEventArgs> t;
t = msgq.Take();
lock (_producer)
while (true)
{
_producer.SendMessage(t.Item1, t.Item2, "");
done.Add(new Tuple<IMessageHeader, BasicDeliverEventArgs>(t.Item2, t.Item3));
while (msgq.TryTake(out t))
List<Tuple<IMessageHeader, BasicDeliverEventArgs>> done = new List<Tuple<IMessageHeader, BasicDeliverEventArgs>>();
Tuple<DicomFileMessage, IMessageHeader, BasicDeliverEventArgs> t;
t = msgq.Take();
lock (_producer)
{
_producer.SendMessage(t.Item1, t.Item2, "");
done.Add(new Tuple<IMessageHeader, BasicDeliverEventArgs>(t.Item2, t.Item3));
}
_producer.WaitForConfirms();
foreach (var ack in done)
{
Ack(ack.Item1, ack.Item2);
while (msgq.TryTake(out t))
{
_producer.SendMessage(t.Item1, t.Item2, "");
done.Add(new Tuple<IMessageHeader, BasicDeliverEventArgs>(t.Item2, t.Item3));
}
_producer.WaitForConfirms();
foreach (var ack in done)
{
Ack(ack.Item1, ack.Item2);
}
}
}
}
catch (InvalidOperationException)
{
// The BlockingCollection will throw this exception when closed by Shutdown()
return;
}
});
acker.IsBackground = true;
acker.Start();
}

/// <summary>
/// Cleanly shut this process down, draining the Ack queue and ending that thread
/// </summary>
public override void Shutdown()
{
msgq.CompleteAdding();
acker.Join();
}

protected override void ProcessMessageImpl(IMessageHeader header, BasicDeliverEventArgs deliverArgs)
{
DicomFileMessage msg;
Expand Down

0 comments on commit 47cf2c6

Please sign in to comment.