diff --git a/Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs b/Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs index e83f8ea1..da36294a 100644 --- a/Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs +++ b/Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs @@ -45,8 +45,7 @@ This Grouping will be used to extract the Modality code when deciding which tabl readonly Dictionary _columnNamesRoutedSuccesfully = new(StringComparer.CurrentCultureIgnoreCase); - readonly Stopwatch _sw = new(); - Dictionary _modalityMap; + private Dictionary _modalityMap; protected AutoRoutingAttacher(bool requestsExternalDatabaseCreation) : base(requestsExternalDatabaseCreation) //Derived classes can change mind about RAW creation { @@ -62,11 +61,9 @@ public override ExitCodeType Attach(IDataLoadJob job,GracefulCancellationToken t Job = job; //if we have an explicit payload to run instead (this is how you inject explicit files/archives/directories to be loaded without touching the disk - if (job.Payload != null) + if (job.Payload is IDicomWorklist worklist) { - var useCase = new AutoRoutingAttacherPipelineUseCase(this, (IDicomWorklist)job.Payload); - var engine = useCase.GetEngine(LoadPipeline, Job); - engine.ExecutePipeline(token); + new AutoRoutingAttacherPipelineUseCase(this, worklist).GetEngine(LoadPipeline, Job).ExecutePipeline(token); } else { @@ -78,29 +75,26 @@ public override ExitCodeType Attach(IDataLoadJob job,GracefulCancellationToken t //no explicit injected payload, so use the ForLoading directory to generate the list of dicom/zip files to process foreach (var filesToLoad in job.LoadDirectory.ForLoading.GetFiles(ListPattern)) - { - var useCase = new AutoRoutingAttacherPipelineUseCase(this, new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad))); - var engine = useCase.GetEngine(LoadPipeline, job); - engine.ExecutePipeline(token); - } + new AutoRoutingAttacherPipelineUseCase(this, + new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad))) + .GetEngine(LoadPipeline, job) + .ExecutePipeline(token); } - var unmatchedColumns = string.Join($",{Environment.NewLine}", _columnNamesRoutedSuccesfully.Where(kvp => kvp.Value == false).Select(k => k.Key)); + var unmatchedColumns = string.Join($",{Environment.NewLine}", + _columnNamesRoutedSuccesfully.Where(static kvp => kvp.Value == false).Select(static k => k.Key)); if (!string.IsNullOrWhiteSpace(unmatchedColumns)) - { //for each column see in an input table that was not succesfully routed somewhere job.OnNotify(this,new NotifyEventArgs(ProgressEventType.Warning, $"Ignored input columns:{unmatchedColumns}")); - } - return ExitCodeType.Success; } private void CreateTableUploaders() { - _uploaders = new Dictionary>(StringComparer.CurrentCultureIgnoreCase); + _uploaders = new Dictionary>(StringComparer.OrdinalIgnoreCase); foreach (var t in Job.RegularTablesToLoad) { var tblName = t.GetRuntimeName(LoadBubble.Raw,Job.Configuration.DatabaseNamer); @@ -117,42 +111,32 @@ private void CreateTableUploaders() private void RefreshUploadDictionary() { //Each column name can exist in multiple TableInfos (e.g. foreign keys with the same name) so we can route a column to multiple destination tables - _columnNameToTargetTablesDictionary = new Dictionary>(StringComparer.CurrentCultureIgnoreCase); + _columnNameToTargetTablesDictionary = new Dictionary>(StringComparer.OrdinalIgnoreCase); foreach (var tableInfo in Job.RegularTablesToLoad) { var dt = new DataTable(tableInfo.GetRuntimeName(LoadBubble.Raw, Job.Configuration.DatabaseNamer)); + dt.BeginLoadData(); - foreach (var columnInfo in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw)) + foreach (var colName in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw) + .Select(static columnInfo => columnInfo.GetRuntimeName(LoadStage.AdjustRaw))) { - var colName = columnInfo.GetRuntimeName(LoadStage.AdjustRaw); - //add the column to the DataTable that will be uploaded dt.Columns.Add(colName); //add it to the routing dictionary - if (!_columnNameToTargetTablesDictionary.ContainsKey(colName)) - _columnNameToTargetTablesDictionary.Add(colName, new HashSet()); - - if (!_columnNameToTargetTablesDictionary[colName].Contains(dt)) - _columnNameToTargetTablesDictionary[colName].Add(dt); + if (!_columnNameToTargetTablesDictionary.TryGetValue(colName, out var targets)) + _columnNameToTargetTablesDictionary.Add(colName, targets = new HashSet()); + targets.Add(dt); } } } public override void Check(ICheckNotifier notifier) { - if(LoadPipeline != null) - { - new PipelineChecker(LoadPipeline).Check(notifier); - - //don't check this since we are our own Fixed source for the engine so we just end up in a loop! but do instantiate it incase there are construction/context errors + if (LoadPipeline != null) new PipelineChecker(LoadPipeline).Check(notifier); - PipelineChecker c = new(LoadPipeline); - c.Check(notifier); - } - - if (ModalityMatchingRegex != null && !ModalityMatchingRegex.ToString().Contains('(')) + if (ModalityMatchingRegex?.ToString().Contains('(') == false) notifier.OnCheckPerformed( new CheckEventArgs( $"Expected ModalityMatchingRegex '{ModalityMatchingRegex}' to contain a group matching for extracting modality e.g. '^(.*)_.*$'", CheckResult.Fail)); @@ -172,7 +156,7 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener { MySqlBulkCopy.BulkInsertBatchTimeoutInSeconds = int.MaxValue; //forever - _sw.Start(); + var sw = Stopwatch.StartNew(); RefreshUploadDictionary(); @@ -182,26 +166,22 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener AddRows(toProcess); - Exception ex = null; try { BulkInsert(cancellationToken); - } - catch(Exception exception) + catch (Exception exception) { - ex = exception; + DisposeUploaders(exception); + throw new Exception("Error occurred during upload",exception); } - DisposeUploaders(ex); + DisposeUploaders(null); - if (ex != null) - throw new Exception("Error occurred during upload",ex); - - _sw.Stop(); + sw.Stop(); listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, - $"ProcessPipelineData (Upload) cumulative total time is {_sw.ElapsedMilliseconds}ms")); + $"ProcessPipelineData (Upload) cumulative total time is {sw.ElapsedMilliseconds}ms")); return null; } @@ -214,7 +194,7 @@ private void CreateModalityMap() _modalityMap = new Dictionary(); - foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(v=>v).Distinct()) + foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static v=>v).Distinct()) { var m = ModalityMatchingRegex.Match(dt.TableName); @@ -229,9 +209,11 @@ private void CreateModalityMap() private void BulkInsert(GracefulCancellationToken token) { - foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(hs => hs).Distinct()) - if (dt.Rows.Count > 0) - _uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token); + foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static hs => hs).Distinct().Where(static dt => dt.Rows.Count > 0)) + { + dt.EndLoadData(); + _uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token); + } } private void AddRows(DataTable toProcess) @@ -242,13 +224,11 @@ private void AddRows(DataTable toProcess) //for every row in the input table foreach (DataRow inputRow in toProcess.Rows) { - Dictionary newDestinationRows = new(); + Dictionary newDestinationRows = new(); //get the modality of the current record (if we care) - string modality = null; - if (_modalityMap != null) - modality = inputRow["Modality"].ToString(); + var modality = _modalityMap == null ? null : inputRow["Modality"].ToString(); var addedToAtLeastOneTable = false; @@ -256,14 +236,15 @@ private void AddRows(DataTable toProcess) foreach (DataColumn column in toProcess.Columns) { //if there is a destination for that DataTable - if (!_columnNameToTargetTablesDictionary.ContainsKey(column.ColumnName)) continue; + if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var destinations)) continue; //there is a matching destination column in one or more destination tables in RAW - foreach (var destinationTable in _columnNameToTargetTablesDictionary[column.ColumnName]) + foreach (var destinationTable in destinations) { //if we are mapping modalities to tables and this table isn't an ALL table - if (_modalityMap != null && !_modalityMap[destinationTable].Equals("ALL",StringComparison.CurrentCultureIgnoreCase)) - if(!string.Equals(_modalityMap[destinationTable], modality,StringComparison.CurrentCultureIgnoreCase)) - continue; //skip it + var tableModality = _modalityMap?[destinationTable]; + if (tableModality?.Equals("ALL", StringComparison.OrdinalIgnoreCase) == false && + tableModality?.Equals(modality, StringComparison.OrdinalIgnoreCase) == false) + continue; //skip it AddCellValue(inputRow, column, destinationTable, newDestinationRows); addedToAtLeastOneTable = true; @@ -273,36 +254,35 @@ private void AddRows(DataTable toProcess) } //we didn't add the row to any tables yet - if( _modalityMap != null && !addedToAtLeastOneTable) - { + if (_modalityMap != null && !addedToAtLeastOneTable) //Try again but put it in OTHER foreach (DataColumn column in toProcess.Columns) { - if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName,out var tables)) continue; + if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var tables)) continue; //there is a matching destination column in one or more destination tables in RAW - foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER",StringComparison.CurrentCultureIgnoreCase))) + foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER", StringComparison.OrdinalIgnoreCase))) { AddCellValue(inputRow, column, destinationTable, newDestinationRows); addedToAtLeastOneTable = true; } + _columnNamesRoutedSuccesfully[column.ColumnName] = true; } - } - if(!addedToAtLeastOneTable && _modalityMap != null) + if (!addedToAtLeastOneTable && _modalityMap != null) throw new Exception( - $"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(kvp => $"{kvp.Key.TableName}={kvp.Value}"))}"); + $"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(static kvp => $"{kvp.Key.TableName}={kvp.Value}"))}"); } } - private void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary newDestinationRows) + private static void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary newDestinationRows) { //if destination table doesn't have a new row yet add one - if (!newDestinationRows.ContainsKey(destinationTable)) - newDestinationRows.Add(destinationTable, destinationTable.Rows.Add()); + if (!newDestinationRows.TryGetValue(destinationTable, out var destRow)) + newDestinationRows.Add(destinationTable, destRow = destinationTable.Rows.Add()); //copy the value into the new row - newDestinationRows[destinationTable][column.ColumnName] = inputRow[column.ColumnName]; + destRow[column.ColumnName] = inputRow[column.ColumnName]; } public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny) @@ -318,7 +298,7 @@ private void DisposeUploaders(Exception exception) item2.CloseAndArchive(); } - foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(v => v.Value).Distinct()) + foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(static v => v.Value).Distinct()) dt.Dispose(); _columnNameToTargetTablesDictionary = null;