Skip to content

Commit

Permalink
Merge pull request #121 from SMI/release/v1.4.2
Browse files Browse the repository at this point in the history
Release v1.4.2
  • Loading branch information
jas88 authored Feb 19, 2020
2 parents b7ced10 + 4f4eedc commit 0015d06
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 30 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

...

## [1.4.2] - 2020-02-18

### Added

- Added unit test for AccessionDirectoryLister as part of DicomDirectoryProcessor tests

### Changed

- Make performance counters in RedisSwapper atomic for thread-safety
- Clean up threads when using threaded mode in RabbitMQAdapter
- Use explicit threads rather than Task queueing in IdentifierMapper

## [1.4.1] - 2020-02-17

### Added
Expand Down Expand Up @@ -158,7 +170,8 @@ First stable release after importing the repository from the private [SMIPlugin]
- Anonymous `MappingTableName` must now be fully specified to pass validation (e.g. `mydb.mytbl`). Previously skipping database portion was supported.


[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.1...develop
[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.2...develop
[1.4.2]: https://github.com/SMI/SmiServices/compare/v1.4.1...v1.4.2
[1.4.1]: https://github.com/SMI/SmiServices/compare/v1.4.0...v1.4.1
[1.4.0]: https://github.com/SMI/SmiServices/compare/v1.3.1...v1.4.0
[1.3.1]: https://github.com/SMI/SmiServices/compare/v1.3.0...v1.3.1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
![GitHub](https://img.shields.io/github/license/SMI/SmiServices)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/SMI/SmiServices.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/SMI/SmiServices/alerts/)

Version: `1.4.1`
Version: `1.4.2`

# SMI Services

Expand Down
6 changes: 3 additions & 3 deletions src/SharedAssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
[assembly: AssemblyCulture("")]

// These should be overwritten by release builds
[assembly: AssemblyVersion("1.4.1")]
[assembly: AssemblyFileVersion("1.4.1")]
[assembly: AssemblyInformationalVersion("1.4.1")] // This one can have the extra build info after it
[assembly: AssemblyVersion("1.4.2")]
[assembly: AssemblyFileVersion("1.4.2")]
[assembly: AssemblyInformationalVersion("1.4.2")] // This one can have the extra build info after it
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public override void SearchForDicomDirectories(string accessionsList)
IsProcessing = true;
TotalSent = 0;

using (var reader = new StreamReader(accessionsList))
using (StreamReader reader = FileSystem.File.OpenText(accessionsList))
{
while (!reader.EndOfStream && !TokenSource.IsCancellationRequested)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ Specified by the `-f` CLI argument. Options are:
- [Default](Execution/DirectoryFinders/BasicDicomDirectoryFinder.cs). Performs a general recursive scan for files. The program does not look inside any subdirectories of directories which contain at least 1 file. This scan mode searches for DICOM files with the extension specified in the `FileSystemOptions.DicomSearchPattern` config option.

- [PACS](Execution/DirectoryFinders/PacsDirectoryFinder.cs). Performs a scan which assumes files are located inside a particular directory structure. The `PACS` directory structure is of the form `<any root>/YYYY/MM/DD/ACC/<dicom>`. `ACC` represents accession directories. For each directory found which matches this pattern, an `AccessionDirectoryMessage` is produced. Note that this scan mode does not actually assert that there are any files inside the accession directories.

- [List](Execution/DirectoryFinders/AccessionDirectoryLister.cs). Receives a file containing a list of accession directory paths. The accession directory path structure is expected to be of the form `<any root>/YYYY/MM/DD/ACC/`. For each path that matches this pattern, the existence of the directory and whether it contains DICOM files is checked. If the path meets all of the requirements, an `AccessionDirectoryMessage` is produced. Note that the input file path is passed in the same way as directory paths are passed to other operational modes.
2 changes: 1 addition & 1 deletion src/common/Smi.Common/Execution/MicroserviceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected MicroserviceHost(GlobalOptions globals, bool loadSmiLogConfig = true)
throw new ApplicationException("Incorrect fo-dicom version for the current platform");

HostProcessID = Process.GetCurrentProcess().Id;
Logger.Info("Started " + HostProcessName + ":" + HostProcessID);
Logger.Info($"Started {HostProcessName}:{HostProcessID} on host {Environment.MachineName}");

Globals = globals;
Logger.Debug("Loaded global options:\n" + globals);
Expand Down
27 changes: 21 additions & 6 deletions src/common/Smi.Common/RabbitMQAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -59,7 +60,7 @@ public bool HasConsumers
private readonly bool _threaded;

/// <summary>
///
///
/// </summary>
/// <param name="options">Connection parameters to a RabbitMQ server</param>
/// <param name="hostId">Identifier for this host instance</param>
Expand All @@ -75,8 +76,10 @@ public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler ho
int minWorker, minIOC;
ThreadPool.GetMinThreads(out minWorker, out minIOC);
int workers = Math.Max(50, minWorker);
ThreadPool.SetMaxThreads(workers, 50);
_logger.Info($"Set Rabbit event concurrency to {workers}");
if (ThreadPool.SetMaxThreads(workers, 50))
_logger.Info($"Set Rabbit event concurrency to ({workers},50)");
else
_logger.Warn($"Failed to set Rabbit event concurrency to ({workers},50)");
}

_factory = new ConnectionFactory
Expand Down Expand Up @@ -220,7 +223,7 @@ public Guid StartConsumer(ConsumerOptions consumerOptions, IConsumer consumer, b
}

/// <summary>
///
///
/// </summary>
/// <param name="taskId"></param>
/// <param name="timeout"></param>
Expand Down Expand Up @@ -395,19 +398,31 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio
{
IModel m = subscription.Model;
consumer.SetModel(m);
ConcurrentDictionary<Thread,int> threads=new ConcurrentDictionary<Thread,int>();

while (m.IsOpen && !cancellationToken.IsCancellationRequested && !ShutdownCalled)
{
BasicDeliverEventArgs e;

if (subscription.Next(500, out e))
{
if (_threaded)
Task.Run(() => consumer.ProcessMessage(e));
if (_threaded) {
Thread t = new Thread(() => {
consumer.ProcessMessage(e);
threads.TryRemove(Thread.CurrentThread,out _);
});
threads.TryAdd(t,1);
t.Start();
}
else
consumer.ProcessMessage(e);
}
}
if (_threaded) {
foreach (Thread t in threads.Keys) {
t.Join();
}
}

string reason = "unknown";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ public override void Start()

if (_processor.TotalProcessed == 0)
Logger.Warn("Nothing reprocessed");

Logger.Info("Total messages sent: " + _processor.TotalProcessed);
Logger.Info("Total failed to reprocess : " + _processor.TotalFailed);

else
_processor.LogProgress();

if (queryTime != default)
Logger.Info("Average documents processed per second: " + Convert.ToInt32(_processor.TotalProcessed / queryTime.TotalSeconds));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public async Task<TimeSpan> RunQuery(string query, IDocumentProcessor processor,

_logger.Info("Starting reprocess operation");
start = DateTime.Now;
var totalBatches = 0;

//Note: Can only check for the cancellation request every time we start to process a new batch
while (await cursor.MoveNextAsync() && !_tokenSource.IsCancellationRequested)
Expand All @@ -114,6 +115,9 @@ public async Task<TimeSpan> RunQuery(string query, IDocumentProcessor processor,

processor.SendMessages();

if (++totalBatches % 100 == 0)
processor.LogProgress();

_logger.Debug($"Batch processed, sleeping for {options.SleepTime.TotalMilliseconds}ms");
Thread.Sleep(options.SleepTime);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


using Dicom;
using DicomTypeTranslation;
using MongoDB.Bson;
Expand Down Expand Up @@ -129,6 +129,8 @@ public void SendMessages()
}
}

public void LogProgress() => _logger.Info($"Total messages sent: {TotalProcessed}. Total failed to reprocess: {TotalFailed}");

private void LogUnprocessedDocument(string documentId, Exception e)
{
_logger.Error(e, "Error when processing document with _id " + documentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ public interface IDocumentProcessor
///
/// </summary>
void SendMessages();

void LogProgress();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Smi.Common.Messaging;
using Smi.Common.Messaging;
using Smi.Common.Options;
using MongoDB.Bson;
using NLog;
Expand Down Expand Up @@ -49,5 +49,10 @@ public void SendMessages()
{
throw new NotImplementedException();
}

public void LogProgress()
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public override string GetSubstitutionFor(string toSwap, out string reason)
if (val.HasValue)
{
result = val.ToString();
CacheHit++;
Interlocked.Increment(ref CacheHit);
}
else
{
//we have no cached answer from Redis
CacheMiss++;
Interlocked.Increment(ref CacheMiss);

//Go to the hosted swapper
lock(_hostedSwapper)
Expand All @@ -85,7 +85,7 @@ public override string GetSubstitutionFor(string toSwap, out string reason)
}
else
{
CacheHit++;
Interlocked.Increment(ref CacheHit);
}

if (string.Equals(NullString, result))
Expand All @@ -95,9 +95,9 @@ public override string GetSubstitutionFor(string toSwap, out string reason)
}

if (result == null)
Fail++;
Interlocked.Increment(ref Fail);
else
Success++;
Interlocked.Increment(ref Success);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ namespace Microservices.IdentifierMapper.Execution.Swappers
{
public abstract class SwapIdentifiers: ISwapIdentifiers
{
public int CacheHit { get; protected set; }
public int CacheMiss { get; protected set;}
public int Success { get; protected set;}
public int Fail { get; protected set;}
public int CacheHit;
public int CacheMiss;

public int Success;
public int Fail;
public int Invalid { get; protected set;}

public Stopwatch DatabaseStopwatch { get; } = new Stopwatch();

public abstract void Setup(IMappingTableOptions mappingTableOptions);

public abstract string GetSubstitutionFor(string toSwap, out string reason);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@

using Applications.DicomDirectoryProcessor.Execution.DirectoryFinders;
using Moq;
using NUnit.Framework;
using Smi.Common.Messages;
using Smi.Common.Messaging;
using Smi.Common.Tests;
using System.Collections.Generic;
using System;
using System.Text;
using System.IO;
using System.IO.Abstractions.TestingHelpers;


namespace Applications.DicomDirectoryProcessor.Tests
Expand All @@ -20,5 +29,56 @@ public void OneTimeSetUp()
// TODO(rkm 2020-02-12) Things to test
// - Valid CSV file
// - CSVs with various invalid data / lines
private String GetListContent()
{
StringBuilder accessionList = new StringBuilder();

accessionList.AppendLine("/PACS/2018/01/01/AAA,"); // exists and has dicom files - pass
accessionList.AppendLine("/PACS/2018/01/01/AAA/,"); // exists and has dicom files - pass
accessionList.AppendLine("/PACS/2018/01/01/BBB,"); // does exist but has no dicom files - fail
accessionList.AppendLine("/PACS/2018/01/01/CCC/,"); // does not exist - fail
accessionList.AppendLine("/PACS/2018/01/01/,"); // not pointing to accession directory - fail
accessionList.AppendLine("/PACS/2018/01/01/testDicom.dcm,"); // not pointing to accession directory - fail
accessionList.AppendLine(" "); // not pointing to accession directory - fail
accessionList.AppendLine("NULL"); // not pointing to accession directory - fail
accessionList.AppendLine(",,,,"); // not pointing to accession directory - fail

return accessionList.ToString();
}

[Test]
public void TestAccessionDirectoryLister()
{
// Mock file system referenced in accession list
string rootDir = Path.GetFullPath("/PACS");
MockFileSystem mockFilesystem = new MockFileSystem();

string testDicom = Path.GetFullPath(Path.Combine(rootDir, "2018/01/01/AAA/test.dcm"));
mockFilesystem.AddFile(testDicom, MockFileData.NullObject);

string testBad = Path.GetFullPath(Path.Combine(rootDir, "2018/01/01/BBB/test.txt"));
mockFilesystem.AddFile(testBad, MockFileData.NullObject);

// Mock input file
string accessionList = Path.GetFullPath(Path.Combine(rootDir, "accessions.csv"));
var mockInputFile = new MockFileData(GetListContent());
mockFilesystem.AddFile(accessionList, mockInputFile);

// Mock producer
var totalSent = 0;

Mock<IProducerModel> mockProducerModel = new Mock<IProducerModel>();
mockProducerModel
.Setup(x => x.SendMessage(It.IsAny<IMessage>(),
null,
""))
.Callback(() => ++totalSent);

AccessionDirectoryLister accessionLister = new AccessionDirectoryLister(rootDir, mockFilesystem, "*.dcm", mockProducerModel.Object);

accessionLister.SearchForDicomDirectories(accessionList);

Assert.AreEqual(totalSent, 2);
}
}
}

0 comments on commit 0015d06

Please sign in to comment.