From 9a028650b37857d6d89628993c16e852a399cb64 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 20 Nov 2020 07:41:46 +0100 Subject: [PATCH] enh: finalize writting outputs as a standalone workflow + tests Resolves: #26. --- sdcflows/workflows/fit/tests/test_pepolar.py | 54 +++--- sdcflows/workflows/fit/tests/test_phdiff.py | 63 +++---- sdcflows/workflows/outputs.py | 174 ++++++++++++++++--- 3 files changed, 202 insertions(+), 89 deletions(-) diff --git a/sdcflows/workflows/fit/tests/test_pepolar.py b/sdcflows/workflows/fit/tests/test_pepolar.py index 1451517d1d..842ed3f4c5 100644 --- a/sdcflows/workflows/fit/tests/test_pepolar.py +++ b/sdcflows/workflows/fit/tests/test_pepolar.py @@ -3,11 +3,10 @@ from pathlib import Path from json import loads import pytest -from niworkflows.interfaces.bids import DerivativesDataSink from niworkflows.interfaces.images import IntraModalMerge from nipype.pipeline import engine as pe -from ..pepolar import Workflow, init_topup_wf +from ..pepolar import init_topup_wf @pytest.mark.skipif(os.getenv("TRAVIS") == "true", reason="this is TravisCI") @@ -29,7 +28,7 @@ def test_topup_wf(tmpdir, datadir, workdir, outdir, epi_path): epi_path = [datadir / f for f in epi_path] in_data = [str(f.absolute()) for f in epi_path] - wf = Workflow( + wf = pe.Workflow( name=f"topup_{epi_path[0].name.replace('.nii.gz', '').replace('-', '_')}" ) @@ -37,9 +36,10 @@ def test_topup_wf(tmpdir, datadir, workdir, outdir, epi_path): merge.inputs.in_files = in_data topup_wf = init_topup_wf(omp_nthreads=2, debug=True) - topup_wf.inputs.inputnode.metadata = [ + metadata = [ loads(Path(str(f).replace(".nii.gz", ".json")).read_text()) for f in in_data ] + topup_wf.inputs.inputnode.metadata = metadata # fmt: off wf.connect([ @@ -49,37 +49,37 @@ def test_topup_wf(tmpdir, datadir, workdir, outdir, epi_path): if outdir: from nipype.interfaces.afni import Automask - from ....interfaces.reportlets import FieldmapReportlet + from ...outputs import init_fmap_derivatives_wf, init_fmap_reports_wf - pre_mask = pe.Node(Automask(dilate=1, outputtype="NIFTI_GZ"), - name="pre_mask") - merge_corrected = pe.Node(IntraModalMerge(hmc=False), name="merge_corrected") - - rep = pe.Node( - FieldmapReportlet(reference_label="EPI Reference"), "simple_report" + fmap_derivatives_wf = init_fmap_derivatives_wf( + output_dir=str(outdir), + write_coeff=True, + custom_entities={"est": "pepolar"}, + bids_fmap_id="pepolar_id", ) - rep.interface._always_run = True - ds_report = pe.Node( - DerivativesDataSink( - base_directory=str(outdir), - out_path_base="sdcflows", - datatype="figures", - suffix="fieldmap", - desc="pepolar", - dismiss_entities="fmap", - ), - name="ds_report", + fmap_derivatives_wf.inputs.inputnode.source_files = in_data + fmap_derivatives_wf.inputs.inputnode.fmap_meta = metadata + + fmap_reports_wf = init_fmap_reports_wf( + output_dir=str(outdir), fmap_type="pepolar", ) - ds_report.inputs.source_file = in_data[0] + fmap_reports_wf.inputs.inputnode.source_files = in_data + + pre_mask = pe.Node(Automask(dilate=1, outputtype="NIFTI_GZ"), name="pre_mask") + merge_corrected = pe.Node(IntraModalMerge(hmc=False), name="merge_corrected") # fmt: off wf.connect([ (topup_wf, merge_corrected, [("outputnode.fmap_ref", "in_files")]), - (merge_corrected, rep, [("out_avg", "reference")]), (merge_corrected, pre_mask, [("out_avg", "in_file")]), - (topup_wf, rep, [("outputnode.fmap", "fieldmap")]), - (pre_mask, rep, [("out_file", "mask")]), - (rep, ds_report, [("out_report", "in_file")]), + (merge_corrected, fmap_reports_wf, [("out_avg", "inputnode.fmap_ref")]), + (topup_wf, fmap_reports_wf, [("outputnode.fmap", "inputnode.fieldmap")]), + (pre_mask, fmap_reports_wf, [("out_file", "inputnode.fmap_mask")]), + (topup_wf, fmap_derivatives_wf, [ + ("outputnode.fmap", "inputnode.fieldmap"), + ("outputnode.fmap_ref", "inputnode.fmap_ref"), + ("outputnode.coefficients", "inputnode.fmap_coeff"), + ]), ]) # fmt: on diff --git a/sdcflows/workflows/fit/tests/test_phdiff.py b/sdcflows/workflows/fit/tests/test_phdiff.py index 4f249198d9..6574dabce1 100644 --- a/sdcflows/workflows/fit/tests/test_phdiff.py +++ b/sdcflows/workflows/fit/tests/test_phdiff.py @@ -4,8 +4,6 @@ from json import loads import pytest -from niworkflows.interfaces.bids import DerivativesDataSink -from nipype.pipeline import engine as pe from ..fieldmap import init_fmap_wf, Workflow @@ -27,58 +25,47 @@ def test_phdiff(tmpdir, datadir, workdir, outdir, fmap_path): tmpdir.chdir() fmap_path = [datadir / f for f in fmap_path] + fieldmaps = [ + (str(f.absolute()), loads(Path(str(f).replace(".nii.gz", ".json")).read_text())) + for f in fmap_path + ] wf = Workflow( name=f"phdiff_{fmap_path[0].name.replace('.nii.gz', '').replace('-', '_')}" ) phdiff_wf = init_fmap_wf(omp_nthreads=2) + phdiff_wf.inputs.inputnode.fieldmap = fieldmaps phdiff_wf.inputs.inputnode.magnitude = [ - str(f.absolute()).replace("diff", "1").replace("phase", "magnitude") - for f in fmap_path - ] - phdiff_wf.inputs.inputnode.fieldmap = [ - (str(f.absolute()), loads(Path(str(f).replace(".nii.gz", ".json")).read_text())) - for f in fmap_path + f.replace("diff", "1").replace("phase", "magnitude") for f, _ in fieldmaps ] if outdir: - from ....interfaces.reportlets import FieldmapReportlet - - rep = pe.Node(FieldmapReportlet(reference_label="Magnitude"), "simple_report") - rep.interface._always_run = True + from ...outputs import init_fmap_derivatives_wf, init_fmap_reports_wf - ds_report = pe.Node( - DerivativesDataSink( - base_directory=str(outdir), - out_path_base="sdcflows", - datatype="figures", - suffix="fieldmap", - desc="phasediff", - dismiss_entities="fmap", - ), - name="ds_report", + fmap_derivatives_wf = init_fmap_derivatives_wf( + output_dir=str(outdir), + custom_entities={"est": "phasediff"}, + bids_fmap_id="phasediff_id", ) - ds_report.inputs.source_file = str(fmap_path[0]) + fmap_derivatives_wf.inputs.inputnode.source_files = [f for f, _ in fieldmaps] + fmap_derivatives_wf.inputs.inputnode.fmap_meta = [f for _, f in fieldmaps] - dsink_fmap = pe.Node( - DerivativesDataSink( - base_directory=str(outdir), - dismiss_entities="fmap", - desc="phasediff", - suffix="fieldmap", - ), - name="dsink_fmap", + fmap_reports_wf = init_fmap_reports_wf( + output_dir=str(outdir), + fmap_type="phasediff" if len(fieldmaps) == 1 else "phases", ) - dsink_fmap.interface.out_path_base = "sdcflows" - dsink_fmap.inputs.source_file = str(fmap_path[0]) + fmap_reports_wf.inputs.inputnode.source_files = [f for f, _ in fieldmaps] # fmt: off wf.connect([ - (phdiff_wf, rep, [("outputnode.fmap", "fieldmap"), - ("outputnode.fmap_ref", "reference"), - ("outputnode.fmap_mask", "mask")]), - (rep, ds_report, [("out_report", "in_file")]), - (phdiff_wf, dsink_fmap, [("outputnode.fmap", "in_file")]), + (phdiff_wf, fmap_reports_wf, [ + ("outputnode.fmap", "inputnode.fieldmap"), + ("outputnode.fmap_ref", "inputnode.fmap_ref"), + ("outputnode.fmap_mask", "inputnode.fmap_mask")]), + (phdiff_wf, fmap_derivatives_wf, [ + ("outputnode.fmap", "inputnode.fieldmap"), + ("outputnode.fmap_ref", "inputnode.fmap_ref"), + ]), ]) # fmt: on else: diff --git a/sdcflows/workflows/outputs.py b/sdcflows/workflows/outputs.py index 8912d26ee2..dee2ccb7f8 100644 --- a/sdcflows/workflows/outputs.py +++ b/sdcflows/workflows/outputs.py @@ -3,23 +3,107 @@ """Writing out outputs.""" from nipype.pipeline import engine as pe from nipype.interfaces import utility as niu -from niworkflows.interfaces.bids import DerivativesDataSink +from niworkflows.interfaces.bids import DerivativesDataSink as _DDS + + +class DerivativesDataSink(_DDS): + """Overload the ``out_path_base`` setting.""" + + out_path_base = "sdcflows" + + +del _DDS + + +def init_fmap_reports_wf( + *, output_dir, fmap_type, custom_entities=None, name="fmap_reports_wf", +): + """ + Set up a battery of datasinks to store reports in the right location. + + Parameters + ---------- + fmap_type : :obj:`str` + The fieldmap estimator type. + custom_entities : :obj:`dict` + Define extra entities that will be written out in filenames. + output_dir : :obj:`str` + Directory in which to save derivatives + name : :obj:`str` + Workflow name (default: ``"fmap_reports_wf"``) + + Inputs + ------ + source_files + One or more fieldmap file(s) of the BIDS dataset that will serve for naming reference. + fieldmap + The preprocessed fieldmap, in its original space with Hz units. + fmap_ref + An anatomical reference (e.g., magnitude file) + fmap_mask + A brain mask in the fieldmap's space. + + """ + from ..interfaces.reportlets import FieldmapReportlet + + custom_entities = custom_entities or {} + + workflow = pe.Workflow(name=name) + inputnode = pe.Node( + niu.IdentityInterface( + fields=["source_files", "fieldmap", "fmap_ref", "fmap_mask"] + ), + name="inputnode", + ) + + rep = pe.Node(FieldmapReportlet(), "simple_report") + rep.interface._always_run = True + + ds_fmap_report = pe.Node( + DerivativesDataSink( + base_directory=str(output_dir), + datatype="figures", + suffix="fieldmap", + desc=fmap_type, + dismiss_entities=("fmap",), + allowed_entities=tuple(custom_entities.keys()), + ), + name="ds_fmap_report", + ) + for k, v in custom_entities.items(): + setattr(ds_fmap_report.inputs, k, v) + + # fmt:off + workflow.connect([ + (inputnode, rep, [("fieldmap", "fieldmap"), + ("fmap_ref", "reference"), + ("fmap_mask", "mask")]), + (rep, ds_fmap_report, [("out_report", "in_file")]), + (inputnode, ds_fmap_report, [("source_files", "source_file")]), + + ]) + # fmt:on + + return workflow def init_fmap_derivatives_wf( *, - bids_root, output_dir, - write_coeff=False, + bids_fmap_id=None, + custom_entities=None, name="fmap_derivatives_wf", + write_coeff=False, ): """ Set up datasinks to store derivatives in the right location. Parameters ---------- - bids_root : :obj:`str` - Root path of BIDS dataset + bids_fmap_id : :obj:`str` + Sets the ``B0FieldIdentifier`` metadata into the outputs. + custom_entities : :obj:`dict` + Define extra entities that will be written out in filenames. output_dir : :obj:`str` Directory in which to save derivatives name : :obj:`str` @@ -27,8 +111,8 @@ def init_fmap_derivatives_wf( Inputs ------ - source_file - A fieldmap file of the BIDS dataset that will serve for naming reference. + source_files + One or more fieldmap file(s) of the BIDS dataset that will serve for naming reference. fieldmap The preprocessed fieldmap, in its original space with Hz units. fmap_coeff @@ -37,25 +121,24 @@ def init_fmap_derivatives_wf( An anatomical reference (e.g., magnitude file) """ - workflow = pe.Workflow(name=name) + custom_entities = custom_entities or {} + workflow = pe.Workflow(name=name) inputnode = pe.Node( niu.IdentityInterface( - fields=[ - "source_file", - "fieldmap", - "fmap_coeff", - "fmap_ref", - ]), - name="inputnode" + fields=["source_files", "fieldmap", "fmap_coeff", "fmap_ref", "fmap_meta"] + ), + name="inputnode", ) ds_reference = pe.Node( DerivativesDataSink( base_directory=output_dir, - desc="reference", + compress=True, suffix="fieldmap", - compress=True), + dismiss_entities=("fmap",), + allowed_entities=tuple(custom_entities.keys()), + ), name="ds_reference", ) @@ -64,17 +147,31 @@ def init_fmap_derivatives_wf( base_directory=output_dir, desc="preproc", suffix="fieldmap", - compress=True), + compress=True, + allowed_entities=tuple(custom_entities.keys()), + ), name="ds_fieldmap", ) ds_fieldmap.inputs.Units = "Hz" + if bids_fmap_id: + ds_fieldmap.inputs.B0FieldIdentifier = bids_fmap_id + + for k, v in custom_entities.items(): + setattr(ds_reference.inputs, k, v) + setattr(ds_fieldmap.inputs, k, v) # fmt:off workflow.connect([ - (inputnode, ds_reference, [("source_file", "source_file"), - ("fmap_ref", "in_file")]), - (inputnode, ds_fieldmap, [("source_file", "source_file"), - ("fieldmap", "in_file")]), + (inputnode, ds_reference, [("source_files", "source_file"), + ("fmap_ref", "in_file"), + (("source_files", _getsourcetype), "desc")]), + (inputnode, ds_fieldmap, [("source_files", "source_file"), + ("fieldmap", "in_file"), + ("source_files", "RawSources")]), + (ds_reference, ds_fieldmap, [ + (("out_file", _getname), "AnatomicalReference"), + ]), + (inputnode, ds_fieldmap, [(("fmap_meta", _selectintent), "IntendedFor")]), ]) # fmt:on @@ -85,19 +182,25 @@ def init_fmap_derivatives_wf( DerivativesDataSink( base_directory=output_dir, suffix="fieldmap", - compress=True), + compress=True, + allowed_entities=tuple(custom_entities.keys()), + ), name="ds_coeff", iterfield=("in_file", "desc"), ) gen_desc = pe.Node(niu.Function(function=_gendesc), name="gen_desc") + for k, v in custom_entities.items(): + setattr(ds_coeff.inputs, k, v) + # fmt:off workflow.connect([ - (inputnode, ds_coeff, [("source_file", "source_file"), + (inputnode, ds_coeff, [("source_files", "source_file"), ("fmap_coeff", "in_file")]), (inputnode, gen_desc, [("fmap_coeff", "infiles")]), (gen_desc, ds_coeff, [("out", "desc")]), + (ds_coeff, ds_fieldmap, [(("out_file", _getname), "AssociatedCoefficients")]), ]) # fmt:on @@ -197,3 +300,26 @@ def _gendesc(infiles): return "coeff" return [f"coeff{i}" for i, _ in enumerate(infiles)] + + +def _getname(infile): + from pathlib import Path + + if isinstance(infile, (list, tuple)): + return [Path(f).name for f in infile] + return Path(infile).name + + +def _getsourcetype(infiles): + from pathlib import Path + + fname = Path(infiles[0]).name + return "epi" if fname.endswith(("_epi.nii.gz", "_epi.nii")) else "magnitude" + + +def _selectintent(metadata): + from bids.utils import listify + + return sorted( + set([el for m in metadata for el in listify(m.get("IntendedFor", []))]) + )