diff --git a/CHANGELOG.md b/CHANGELOG.md index ad189f1af..c97e60852 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased] +... + +## [1.4.2] - 2020-02-18 + +### Added + - Added unit test for AccessionDirectoryLister as part of DicomDirectoryProcessor tests +### Changed + +- Make performance counters in RedisSwapper atomic for thread-safety +- Clean up threads when using threaded mode in RabbitMQAdapter +- Use explicit threads rather than Task queueing in IdentifierMapper + ## [1.4.1] - 2020-02-17 ### Added @@ -158,7 +170,8 @@ First stable release after importing the repository from the private [SMIPlugin] - Anonymous `MappingTableName` must now be fully specified to pass validation (e.g. `mydb.mytbl`). Previously skipping database portion was supported. -[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.1...develop +[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.2...develop +[1.4.2]: https://github.com/SMI/SmiServices/compare/v1.4.1...v1.4.2 [1.4.1]: https://github.com/SMI/SmiServices/compare/v1.4.0...v1.4.1 [1.4.0]: https://github.com/SMI/SmiServices/compare/v1.3.1...v1.4.0 [1.3.1]: https://github.com/SMI/SmiServices/compare/v1.3.0...v1.3.1 diff --git a/README.md b/README.md index 4c3a7983c..be96afee3 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ![GitHub](https://img.shields.io/github/license/SMI/SmiServices) [![Total alerts](https://img.shields.io/lgtm/alerts/g/SMI/SmiServices.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/SMI/SmiServices/alerts/) -Version: `1.4.1` +Version: `1.4.2` # SMI Services diff --git a/src/SharedAssemblyInfo.cs b/src/SharedAssemblyInfo.cs index 9b25899ff..c3117a893 100644 --- a/src/SharedAssemblyInfo.cs +++ b/src/SharedAssemblyInfo.cs @@ -7,6 +7,6 @@ [assembly: AssemblyCulture("")] // These should be overwritten by release builds -[assembly: AssemblyVersion("1.4.1")] -[assembly: AssemblyFileVersion("1.4.1")] -[assembly: AssemblyInformationalVersion("1.4.1")] // This one can have the extra build info after it +[assembly: AssemblyVersion("1.4.2")] +[assembly: AssemblyFileVersion("1.4.2")] +[assembly: AssemblyInformationalVersion("1.4.2")] // This one can have the extra build info after it diff --git a/src/applications/Applications.DicomDirectoryProcessor/Execution/DirectoryFinders/AccessionDirectoryLister.cs b/src/applications/Applications.DicomDirectoryProcessor/Execution/DirectoryFinders/AccessionDirectoryLister.cs index 3fdaeffbe..2d8d2b3a3 100644 --- a/src/applications/Applications.DicomDirectoryProcessor/Execution/DirectoryFinders/AccessionDirectoryLister.cs +++ b/src/applications/Applications.DicomDirectoryProcessor/Execution/DirectoryFinders/AccessionDirectoryLister.cs @@ -27,7 +27,7 @@ public override void SearchForDicomDirectories(string accessionsList) IsProcessing = true; TotalSent = 0; - using (var reader = new StreamReader(accessionsList)) + using (StreamReader reader = FileSystem.File.OpenText(accessionsList)) { while (!reader.EndOfStream && !TokenSource.IsCancellationRequested) { diff --git a/src/applications/Applications.DicomDirectoryProcessor/README.md b/src/applications/Applications.DicomDirectoryProcessor/README.md index 385411ed1..d4425cd53 100644 --- a/src/applications/Applications.DicomDirectoryProcessor/README.md +++ b/src/applications/Applications.DicomDirectoryProcessor/README.md @@ -49,3 +49,5 @@ Specified by the `-f` CLI argument. Options are: - [Default](Execution/DirectoryFinders/BasicDicomDirectoryFinder.cs). Performs a general recursive scan for files. The program does not look inside any subdirectories of directories which contain at least 1 file. This scan mode searches for DICOM files with the extension specified in the `FileSystemOptions.DicomSearchPattern` config option. - [PACS](Execution/DirectoryFinders/PacsDirectoryFinder.cs). Performs a scan which assumes files are located inside a particular directory structure. The `PACS` directory structure is of the form `/YYYY/MM/DD/ACC/`. `ACC` represents accession directories. For each directory found which matches this pattern, an `AccessionDirectoryMessage` is produced. Note that this scan mode does not actually assert that there are any files inside the accession directories. + +- [List](Execution/DirectoryFinders/AccessionDirectoryLister.cs). Receives a file containing a list of accession directory paths. The accession directory path structure is expected to be of the form `/YYYY/MM/DD/ACC/`. For each path that matches this pattern, the existence of the directory and whether it contains DICOM files is checked. If the path meets all of the requirements, an `AccessionDirectoryMessage` is produced. Note that the input file path is passed in the same way as directory paths are passed to other operational modes. diff --git a/src/common/Smi.Common/Execution/MicroserviceHost.cs b/src/common/Smi.Common/Execution/MicroserviceHost.cs index fabad1fa7..d1f1a4bb9 100644 --- a/src/common/Smi.Common/Execution/MicroserviceHost.cs +++ b/src/common/Smi.Common/Execution/MicroserviceHost.cs @@ -90,7 +90,7 @@ protected MicroserviceHost(GlobalOptions globals, bool loadSmiLogConfig = true) throw new ApplicationException("Incorrect fo-dicom version for the current platform"); HostProcessID = Process.GetCurrentProcess().Id; - Logger.Info("Started " + HostProcessName + ":" + HostProcessID); + Logger.Info($"Started {HostProcessName}:{HostProcessID} on host {Environment.MachineName}"); Globals = globals; Logger.Debug("Loaded global options:\n" + globals); diff --git a/src/common/Smi.Common/RabbitMQAdapter.cs b/src/common/Smi.Common/RabbitMQAdapter.cs index ab7175b76..dfffa5390 100644 --- a/src/common/Smi.Common/RabbitMQAdapter.cs +++ b/src/common/Smi.Common/RabbitMQAdapter.cs @@ -1,5 +1,6 @@  using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -59,7 +60,7 @@ public bool HasConsumers private readonly bool _threaded; /// - /// + /// /// /// Connection parameters to a RabbitMQ server /// Identifier for this host instance @@ -75,8 +76,10 @@ public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler ho int minWorker, minIOC; ThreadPool.GetMinThreads(out minWorker, out minIOC); int workers = Math.Max(50, minWorker); - ThreadPool.SetMaxThreads(workers, 50); - _logger.Info($"Set Rabbit event concurrency to {workers}"); + if (ThreadPool.SetMaxThreads(workers, 50)) + _logger.Info($"Set Rabbit event concurrency to ({workers},50)"); + else + _logger.Warn($"Failed to set Rabbit event concurrency to ({workers},50)"); } _factory = new ConnectionFactory @@ -220,7 +223,7 @@ public Guid StartConsumer(ConsumerOptions consumerOptions, IConsumer consumer, b } /// - /// + /// /// /// /// @@ -395,6 +398,7 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio { IModel m = subscription.Model; consumer.SetModel(m); + ConcurrentDictionary threads=new ConcurrentDictionary(); while (m.IsOpen && !cancellationToken.IsCancellationRequested && !ShutdownCalled) { @@ -402,12 +406,23 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio if (subscription.Next(500, out e)) { - if (_threaded) - Task.Run(() => consumer.ProcessMessage(e)); + if (_threaded) { + Thread t = new Thread(() => { + consumer.ProcessMessage(e); + threads.TryRemove(Thread.CurrentThread,out _); + }); + threads.TryAdd(t,1); + t.Start(); + } else consumer.ProcessMessage(e); } } + if (_threaded) { + foreach (Thread t in threads.Keys) { + t.Join(); + } + } string reason = "unknown"; diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs b/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs index dce6962dd..a77f27e39 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs @@ -66,10 +66,9 @@ public override void Start() if (_processor.TotalProcessed == 0) Logger.Warn("Nothing reprocessed"); - - Logger.Info("Total messages sent: " + _processor.TotalProcessed); - Logger.Info("Total failed to reprocess : " + _processor.TotalFailed); - + else + _processor.LogProgress(); + if (queryTime != default) Logger.Info("Average documents processed per second: " + Convert.ToInt32(_processor.TotalProcessed / queryTime.TotalSeconds)); diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/MongoDbReader.cs b/src/microservices/Microservices.DicomReprocessor/Execution/MongoDbReader.cs index 89faec34f..923598d9a 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/MongoDbReader.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/MongoDbReader.cs @@ -94,6 +94,7 @@ public async Task RunQuery(string query, IDocumentProcessor processor, _logger.Info("Starting reprocess operation"); start = DateTime.Now; + var totalBatches = 0; //Note: Can only check for the cancellation request every time we start to process a new batch while (await cursor.MoveNextAsync() && !_tokenSource.IsCancellationRequested) @@ -114,6 +115,9 @@ public async Task RunQuery(string query, IDocumentProcessor processor, processor.SendMessages(); + if (++totalBatches % 100 == 0) + processor.LogProgress(); + _logger.Debug($"Batch processed, sleeping for {options.SleepTime.TotalMilliseconds}ms"); Thread.Sleep(options.SleepTime); } diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/DicomFileProcessor.cs b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/DicomFileProcessor.cs index cc361f6d3..ee348fd4f 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/DicomFileProcessor.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/DicomFileProcessor.cs @@ -1,4 +1,4 @@ - + using Dicom; using DicomTypeTranslation; using MongoDB.Bson; @@ -129,6 +129,8 @@ public void SendMessages() } } + public void LogProgress() => _logger.Info($"Total messages sent: {TotalProcessed}. Total failed to reprocess: {TotalFailed}"); + private void LogUnprocessedDocument(string documentId, Exception e) { _logger.Error(e, "Error when processing document with _id " + documentId); diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/IDocumentProcessor.cs b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/IDocumentProcessor.cs index 1d5175a63..1aef425f8 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/IDocumentProcessor.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/IDocumentProcessor.cs @@ -29,5 +29,7 @@ public interface IDocumentProcessor /// /// void SendMessages(); + + void LogProgress(); } } diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/TagPromotionProcessor.cs b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/TagPromotionProcessor.cs index 0b5f78abf..2771e20b8 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/Processors/TagPromotionProcessor.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/Processors/TagPromotionProcessor.cs @@ -1,4 +1,4 @@ -using Smi.Common.Messaging; +using Smi.Common.Messaging; using Smi.Common.Options; using MongoDB.Bson; using NLog; @@ -49,5 +49,10 @@ public void SendMessages() { throw new NotImplementedException(); } + + public void LogProgress() + { + throw new NotImplementedException(); + } } } diff --git a/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/RedisSwapper.cs b/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/RedisSwapper.cs index fb31c4e90..22129319e 100644 --- a/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/RedisSwapper.cs +++ b/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/RedisSwapper.cs @@ -56,12 +56,12 @@ public override string GetSubstitutionFor(string toSwap, out string reason) if (val.HasValue) { result = val.ToString(); - CacheHit++; + Interlocked.Increment(ref CacheHit); } else { //we have no cached answer from Redis - CacheMiss++; + Interlocked.Increment(ref CacheMiss); //Go to the hosted swapper lock(_hostedSwapper) @@ -85,7 +85,7 @@ public override string GetSubstitutionFor(string toSwap, out string reason) } else { - CacheHit++; + Interlocked.Increment(ref CacheHit); } if (string.Equals(NullString, result)) @@ -95,9 +95,9 @@ public override string GetSubstitutionFor(string toSwap, out string reason) } if (result == null) - Fail++; + Interlocked.Increment(ref Fail); else - Success++; + Interlocked.Increment(ref Success); return result; } diff --git a/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/SwapIdentifiers.cs b/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/SwapIdentifiers.cs index 76e8e7a76..c89016a63 100644 --- a/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/SwapIdentifiers.cs +++ b/src/microservices/Microservices.IdentifierMapper/Execution/Swappers/SwapIdentifiers.cs @@ -6,15 +6,15 @@ namespace Microservices.IdentifierMapper.Execution.Swappers { public abstract class SwapIdentifiers: ISwapIdentifiers { - public int CacheHit { get; protected set; } - public int CacheMiss { get; protected set;} - - public int Success { get; protected set;} - public int Fail { get; protected set;} + public int CacheHit; + public int CacheMiss; + + public int Success; + public int Fail; public int Invalid { get; protected set;} public Stopwatch DatabaseStopwatch { get; } = new Stopwatch(); - + public abstract void Setup(IMappingTableOptions mappingTableOptions); public abstract string GetSubstitutionFor(string toSwap, out string reason); diff --git a/tests/applications/Applications.DicomDirectoryProcessor.Tests/AccessionDirectoryListerTest.cs b/tests/applications/Applications.DicomDirectoryProcessor.Tests/AccessionDirectoryListerTest.cs index 0cf56c321..46039410d 100644 --- a/tests/applications/Applications.DicomDirectoryProcessor.Tests/AccessionDirectoryListerTest.cs +++ b/tests/applications/Applications.DicomDirectoryProcessor.Tests/AccessionDirectoryListerTest.cs @@ -1,6 +1,15 @@  +using Applications.DicomDirectoryProcessor.Execution.DirectoryFinders; +using Moq; using NUnit.Framework; +using Smi.Common.Messages; +using Smi.Common.Messaging; using Smi.Common.Tests; +using System.Collections.Generic; +using System; +using System.Text; +using System.IO; +using System.IO.Abstractions.TestingHelpers; namespace Applications.DicomDirectoryProcessor.Tests @@ -20,5 +29,56 @@ public void OneTimeSetUp() // TODO(rkm 2020-02-12) Things to test // - Valid CSV file // - CSVs with various invalid data / lines + private String GetListContent() + { + StringBuilder accessionList = new StringBuilder(); + + accessionList.AppendLine("/PACS/2018/01/01/AAA,"); // exists and has dicom files - pass + accessionList.AppendLine("/PACS/2018/01/01/AAA/,"); // exists and has dicom files - pass + accessionList.AppendLine("/PACS/2018/01/01/BBB,"); // does exist but has no dicom files - fail + accessionList.AppendLine("/PACS/2018/01/01/CCC/,"); // does not exist - fail + accessionList.AppendLine("/PACS/2018/01/01/,"); // not pointing to accession directory - fail + accessionList.AppendLine("/PACS/2018/01/01/testDicom.dcm,"); // not pointing to accession directory - fail + accessionList.AppendLine(" "); // not pointing to accession directory - fail + accessionList.AppendLine("NULL"); // not pointing to accession directory - fail + accessionList.AppendLine(",,,,"); // not pointing to accession directory - fail + + return accessionList.ToString(); + } + + [Test] + public void TestAccessionDirectoryLister() + { + // Mock file system referenced in accession list + string rootDir = Path.GetFullPath("/PACS"); + MockFileSystem mockFilesystem = new MockFileSystem(); + + string testDicom = Path.GetFullPath(Path.Combine(rootDir, "2018/01/01/AAA/test.dcm")); + mockFilesystem.AddFile(testDicom, MockFileData.NullObject); + + string testBad = Path.GetFullPath(Path.Combine(rootDir, "2018/01/01/BBB/test.txt")); + mockFilesystem.AddFile(testBad, MockFileData.NullObject); + + // Mock input file + string accessionList = Path.GetFullPath(Path.Combine(rootDir, "accessions.csv")); + var mockInputFile = new MockFileData(GetListContent()); + mockFilesystem.AddFile(accessionList, mockInputFile); + + // Mock producer + var totalSent = 0; + + Mock mockProducerModel = new Mock(); + mockProducerModel + .Setup(x => x.SendMessage(It.IsAny(), + null, + "")) + .Callback(() => ++totalSent); + + AccessionDirectoryLister accessionLister = new AccessionDirectoryLister(rootDir, mockFilesystem, "*.dcm", mockProducerModel.Object); + + accessionLister.SearchForDicomDirectories(accessionList); + + Assert.AreEqual(totalSent, 2); + } } }