Skip to content

Commit

Permalink
Merge pull request #436 from SMI/feature/attachertidy
Browse files Browse the repository at this point in the history
Tidy attacher syntax/logic a bit
  • Loading branch information
JFriel authored Sep 26, 2024
2 parents bbe74e4 + d44cf76 commit 0a2da99
Showing 1 changed file with 51 additions and 71 deletions.
122 changes: 51 additions & 71 deletions Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ This Grouping will be used to extract the Modality code when deciding which tabl

readonly Dictionary<string, bool> _columnNamesRoutedSuccesfully = new(StringComparer.CurrentCultureIgnoreCase);

readonly Stopwatch _sw = new();
Dictionary<DataTable, string> _modalityMap;
private Dictionary<DataTable, string> _modalityMap;

protected AutoRoutingAttacher(bool requestsExternalDatabaseCreation) : base(requestsExternalDatabaseCreation) //Derived classes can change mind about RAW creation
{
Expand All @@ -62,11 +61,9 @@ public override ExitCodeType Attach(IDataLoadJob job,GracefulCancellationToken t
Job = job;

//if we have an explicit payload to run instead (this is how you inject explicit files/archives/directories to be loaded without touching the disk
if (job.Payload != null)
if (job.Payload is IDicomWorklist worklist)
{
var useCase = new AutoRoutingAttacherPipelineUseCase(this, (IDicomWorklist)job.Payload);
var engine = useCase.GetEngine(LoadPipeline, Job);
engine.ExecutePipeline(token);
new AutoRoutingAttacherPipelineUseCase(this, worklist).GetEngine(LoadPipeline, Job).ExecutePipeline(token);
}
else
{
Expand All @@ -78,29 +75,26 @@ public override ExitCodeType Attach(IDataLoadJob job,GracefulCancellationToken t

//no explicit injected payload, so use the ForLoading directory to generate the list of dicom/zip files to process
foreach (var filesToLoad in job.LoadDirectory.ForLoading.GetFiles(ListPattern))
{
var useCase = new AutoRoutingAttacherPipelineUseCase(this, new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad)));
var engine = useCase.GetEngine(LoadPipeline, job);
engine.ExecutePipeline(token);
}
new AutoRoutingAttacherPipelineUseCase(this,
new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad)))
.GetEngine(LoadPipeline, job)
.ExecutePipeline(token);
}

var unmatchedColumns = string.Join($",{Environment.NewLine}", _columnNamesRoutedSuccesfully.Where(kvp => kvp.Value == false).Select(k => k.Key));
var unmatchedColumns = string.Join($",{Environment.NewLine}",
_columnNamesRoutedSuccesfully.Where(static kvp => kvp.Value == false).Select(static k => k.Key));

if (!string.IsNullOrWhiteSpace(unmatchedColumns))
{
//for each column see in an input table that was not succesfully routed somewhere
job.OnNotify(this,new NotifyEventArgs(ProgressEventType.Warning,
$"Ignored input columns:{unmatchedColumns}"));
}


return ExitCodeType.Success;
}

private void CreateTableUploaders()
{
_uploaders = new Dictionary<string, Tuple<SqlBulkInsertDestination, ITableLoadInfo>>(StringComparer.CurrentCultureIgnoreCase);
_uploaders = new Dictionary<string, Tuple<SqlBulkInsertDestination, ITableLoadInfo>>(StringComparer.OrdinalIgnoreCase);
foreach (var t in Job.RegularTablesToLoad)
{
var tblName = t.GetRuntimeName(LoadBubble.Raw,Job.Configuration.DatabaseNamer);
Expand All @@ -117,42 +111,32 @@ private void CreateTableUploaders()
private void RefreshUploadDictionary()
{
//Each column name can exist in multiple TableInfos (e.g. foreign keys with the same name) so we can route a column to multiple destination tables
_columnNameToTargetTablesDictionary = new Dictionary<string, HashSet<DataTable>>(StringComparer.CurrentCultureIgnoreCase);
_columnNameToTargetTablesDictionary = new Dictionary<string, HashSet<DataTable>>(StringComparer.OrdinalIgnoreCase);

foreach (var tableInfo in Job.RegularTablesToLoad)
{
var dt = new DataTable(tableInfo.GetRuntimeName(LoadBubble.Raw, Job.Configuration.DatabaseNamer));
dt.BeginLoadData();

foreach (var columnInfo in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw))
foreach (var colName in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw)
.Select(static columnInfo => columnInfo.GetRuntimeName(LoadStage.AdjustRaw)))
{
var colName = columnInfo.GetRuntimeName(LoadStage.AdjustRaw);

//add the column to the DataTable that will be uploaded
dt.Columns.Add(colName);

//add it to the routing dictionary
if (!_columnNameToTargetTablesDictionary.ContainsKey(colName))
_columnNameToTargetTablesDictionary.Add(colName, new HashSet<DataTable>());

if (!_columnNameToTargetTablesDictionary[colName].Contains(dt))
_columnNameToTargetTablesDictionary[colName].Add(dt);
if (!_columnNameToTargetTablesDictionary.TryGetValue(colName, out var targets))
_columnNameToTargetTablesDictionary.Add(colName, targets = new HashSet<DataTable>());
targets.Add(dt);
}
}
}

public override void Check(ICheckNotifier notifier)
{
if(LoadPipeline != null)
{
new PipelineChecker(LoadPipeline).Check(notifier);

//don't check this since we are our own Fixed source for the engine so we just end up in a loop! but do instantiate it incase there are construction/context errors
if (LoadPipeline != null) new PipelineChecker(LoadPipeline).Check(notifier);

PipelineChecker c = new(LoadPipeline);
c.Check(notifier);
}

if (ModalityMatchingRegex != null && !ModalityMatchingRegex.ToString().Contains('('))
if (ModalityMatchingRegex?.ToString().Contains('(') == false)
notifier.OnCheckPerformed(
new CheckEventArgs(
$"Expected ModalityMatchingRegex '{ModalityMatchingRegex}' to contain a group matching for extracting modality e.g. '^(.*)_.*$'", CheckResult.Fail));
Expand All @@ -172,7 +156,7 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener
{
MySqlBulkCopy.BulkInsertBatchTimeoutInSeconds = int.MaxValue; //forever

_sw.Start();
var sw = Stopwatch.StartNew();

RefreshUploadDictionary();

Expand All @@ -182,26 +166,22 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener

AddRows(toProcess);

Exception ex = null;
try
{
BulkInsert(cancellationToken);

}
catch(Exception exception)
catch (Exception exception)
{
ex = exception;
DisposeUploaders(exception);
throw new Exception("Error occurred during upload",exception);
}

DisposeUploaders(ex);
DisposeUploaders(null);

if (ex != null)
throw new Exception("Error occurred during upload",ex);

_sw.Stop();
sw.Stop();

listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
$"ProcessPipelineData (Upload) cumulative total time is {_sw.ElapsedMilliseconds}ms"));
$"ProcessPipelineData (Upload) cumulative total time is {sw.ElapsedMilliseconds}ms"));

return null;
}
Expand All @@ -214,7 +194,7 @@ private void CreateModalityMap()

_modalityMap = new Dictionary<DataTable, string>();

foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(v=>v).Distinct())
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static v=>v).Distinct())
{
var m = ModalityMatchingRegex.Match(dt.TableName);

Expand All @@ -229,9 +209,11 @@ private void CreateModalityMap()

private void BulkInsert(GracefulCancellationToken token)
{
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(hs => hs).Distinct())
if (dt.Rows.Count > 0)
_uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token);
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static hs => hs).Distinct().Where(static dt => dt.Rows.Count > 0))
{
dt.EndLoadData();
_uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token);
}
}

private void AddRows(DataTable toProcess)
Expand All @@ -242,28 +224,27 @@ private void AddRows(DataTable toProcess)
//for every row in the input table
foreach (DataRow inputRow in toProcess.Rows)
{
Dictionary<DataTable,DataRow> newDestinationRows = new();
Dictionary<DataTable,DataRow> newDestinationRows = new();

//get the modality of the current record (if we care)
string modality = null;

if (_modalityMap != null)
modality = inputRow["Modality"].ToString();
var modality = _modalityMap == null ? null : inputRow["Modality"].ToString();

var addedToAtLeastOneTable = false;

//for every input cell
foreach (DataColumn column in toProcess.Columns)
{
//if there is a destination for that DataTable
if (!_columnNameToTargetTablesDictionary.ContainsKey(column.ColumnName)) continue;
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var destinations)) continue;
//there is a matching destination column in one or more destination tables in RAW
foreach (var destinationTable in _columnNameToTargetTablesDictionary[column.ColumnName])
foreach (var destinationTable in destinations)
{
//if we are mapping modalities to tables and this table isn't an ALL table
if (_modalityMap != null && !_modalityMap[destinationTable].Equals("ALL",StringComparison.CurrentCultureIgnoreCase))
if(!string.Equals(_modalityMap[destinationTable], modality,StringComparison.CurrentCultureIgnoreCase))
continue; //skip it
var tableModality = _modalityMap?[destinationTable];
if (tableModality?.Equals("ALL", StringComparison.OrdinalIgnoreCase) == false &&
tableModality?.Equals(modality, StringComparison.OrdinalIgnoreCase) == false)
continue; //skip it

AddCellValue(inputRow, column, destinationTable, newDestinationRows);
addedToAtLeastOneTable = true;
Expand All @@ -273,36 +254,35 @@ private void AddRows(DataTable toProcess)
}

//we didn't add the row to any tables yet
if( _modalityMap != null && !addedToAtLeastOneTable)
{
if (_modalityMap != null && !addedToAtLeastOneTable)
//Try again but put it in OTHER
foreach (DataColumn column in toProcess.Columns)
{
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName,out var tables)) continue;
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var tables)) continue;
//there is a matching destination column in one or more destination tables in RAW
foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER",StringComparison.CurrentCultureIgnoreCase)))
foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER", StringComparison.OrdinalIgnoreCase)))
{
AddCellValue(inputRow, column, destinationTable, newDestinationRows);
addedToAtLeastOneTable = true;
}

_columnNamesRoutedSuccesfully[column.ColumnName] = true;
}
}

if(!addedToAtLeastOneTable && _modalityMap != null)
if (!addedToAtLeastOneTable && _modalityMap != null)
throw new Exception(
$"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(kvp => $"{kvp.Key.TableName}={kvp.Value}"))}");
$"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(static kvp => $"{kvp.Key.TableName}={kvp.Value}"))}");
}
}

private void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary<DataTable, DataRow> newDestinationRows)
private static void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary<DataTable, DataRow> newDestinationRows)
{
//if destination table doesn't have a new row yet add one
if (!newDestinationRows.ContainsKey(destinationTable))
newDestinationRows.Add(destinationTable, destinationTable.Rows.Add());
if (!newDestinationRows.TryGetValue(destinationTable, out var destRow))
newDestinationRows.Add(destinationTable, destRow = destinationTable.Rows.Add());

//copy the value into the new row
newDestinationRows[destinationTable][column.ColumnName] = inputRow[column.ColumnName];
destRow[column.ColumnName] = inputRow[column.ColumnName];
}

public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
Expand All @@ -318,7 +298,7 @@ private void DisposeUploaders(Exception exception)
item2.CloseAndArchive();
}

foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(v => v.Value).Distinct())
foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(static v => v.Value).Distinct())
dt.Dispose();

_columnNameToTargetTablesDictionary = null;
Expand Down

0 comments on commit 0a2da99

Please sign in to comment.