Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/nucleo to v1variants #1417

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beagle/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.91.6"
__version__ = "1.91.7"
75 changes: 59 additions & 16 deletions runner/operator/access/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from runner.models import Run, RunStatus, Port
from file_system.models import File, FileMetadata


logger = logging.getLogger(__name__)

ACCESS_CURATED_BAMS_FILE_GROUP_SLUG = "access_curated_normals"
Expand All @@ -29,29 +28,34 @@ def get_request_id(run_ids, request_id=None):
raise Exception("Could not get find request id")


def get_request_id_runs(request_id):
def get_request_id_runs(app, run_ids, request_id):
"""
Get the latest completed bam-generation runs for the given request ID

:param request_id: str - IGO request ID
:return: List[str] - List of most recent runs from given request ID
"""
operator_run_id = (
Run.objects.filter(
tags__igoRequestId=request_id,
app__name__in=["access legacy", "access nucleo"],
operator_run__status=RunStatus.COMPLETED,

if not request_id:
most_recent_runs_for_request = Run.objects.filter(pk__in=run_ids, status=RunStatus.COMPLETED)
request_id = RunStatus[0].tags["igoRequestId"]
else:
most_recent_runs_for_request = (
Run.objects.filter(
tags__igoRequestId=request_id,
app__name__in=app,
status=RunStatus.COMPLETED,
operator_run__status=RunStatus.COMPLETED,
)
.order_by("-created_date")
.first()
.operator_run.runs.all()
.filter(status=RunStatus.COMPLETED)
)
.exclude(finished_date__isnull=True)
.order_by("-finished_date")
.first()
.operator_run_id
)
if not len(most_recent_runs_for_request):
raise Exception("No matching Nucleo runs found for request {}".format(request_id))

request_id_runs = Run.objects.filter(
operator_run_id=operator_run_id, app__name__in=["access legacy", "access nucleo"], status=RunStatus.COMPLETED
)
return request_id_runs
return most_recent_runs_for_request, request_id


def create_cwl_file_object(file_path):
Expand Down Expand Up @@ -180,3 +184,42 @@ def get_unfiltered_matched_normal(patient_id, request_id=None):
logger.warning(msg)

return unfiltered_matched_normal_bam, unfiltered_matched_normal_sample_id


def parse_nucleo_output_ports(run, port_name):
bam_bai = Port.objects.get(name=port_name, run=run.pk)
if not len(bam_bai.files.all()) in [1, 2]:
raise Exception("Port {} for run {} should have just 1 bam or 1 (bam/bai) pair".format(port_name, run.id))
bam = [b for b in bam_bai.files.all() if b.file_name.endswith(".bam")][0]
return bam


def find_request_bams(run):
"""
Find simplex and duplex bams from a request's nucleo run
- run_ids: run_ids from a request's nucleo run

:return: list of paired simplex and duplex bams and normal bam
"""
nucleo_output_port_names = [
"uncollapsed_bam",
"fgbio_group_reads_by_umi_bam",
"fgbio_collapsed_bam",
"fgbio_filter_consensus_reads_duplex_bam",
"fgbio_postprocessing_simplex_bam",
]
bams = {}
for o in nucleo_output_port_names:
# We are running a multi-sample workflow on just one sample,
# so we create single-element lists here
bam = parse_nucleo_output_ports(run, o)
bams[o] = bam

return bams


def is_tumor_bam(file):
if not file.endswith(".bam"):
return False
t_n_timepoint = file.split("-")[2]
return not t_n_timepoint[0] == "N"
122 changes: 122 additions & 0 deletions runner/operator/access/v2_1_0/cnv/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import json
import logging
from jinja2 import Template

from django.conf import settings
from runner.models import Port, RunStatus
from runner.operator.operator import Operator
from runner.run.objects.run_creator_object import RunCreator
from file_system.repository.file_repository import FileRepository
from runner.operator.access import get_request_id_runs, find_request_bams, is_tumor_bam


logger = logging.getLogger(__name__)

WORKDIR = os.path.dirname(os.path.abspath(__file__))


class AccessV2LegacyCNVOperator(Operator):
"""
Operator for the ACCESS Legacy Copy Number Variants workflow:

http://www.github.com/mskcc/access-pipeline/workflows/subworkflows/call_cnv.cwl

This Operator will search for ACCESS Unfiltered Bam files based on an IGO Request ID.
"""

@staticmethod
def is_tumor_bam(file):
if not file.file_name.endswith(".bam"):
return False
t_n_timepoint = file.file_name.split("-")[2]
return not t_n_timepoint[0] == "N"

def get_sample_inputs(self):
"""
Create all sample inputs for all runs triggered in this instance of the operator.

:return: list of json_objects
"""
runs, self.request_id = get_request_id_runs(
["access v2 nucleo", "access nucleo"], self.run_ids, self.request_id
)

bams = []
for run in runs:
bams.append(find_request_bams(run))

# TUMOR Unfiltered
unfiltered_tumor_bams = [
b["fgbio_collapsed_bam"] for b in bams if is_tumor_bam(b["fgbio_collapsed_bam"].file_name)
]

sample_ids = []
tumor_bams = []
sample_sexes = []

for tumor_bam in unfiltered_tumor_bams:
sample_id = tumor_bam.file_name.split("_cl_aln")[0]
# Use the initial fastq metadata to get the sex of the sample
# Todo: Need to store this info on the bams themselves
tumor_fastqs = FileRepository.filter(
file_type="fastq",
metadata={"tumorOrNormal": "Tumor", settings.CMO_SAMPLE_NAME_METADATA_KEY: sample_id},
filter_redact=True,
)
sample_sex = tumor_fastqs[0].metadata["sex"]
tumor_bams.append(tumor_bam)
sample_sexes.append(sample_sex)
sample_ids.append(sample_id)

sample_inputs = [
self.construct_sample_inputs(tumor_bams[i], sample_sexes[i]) for i in range(0, len(tumor_bams))
]

return sample_inputs, sample_ids

def get_jobs(self):
"""
Convert job inputs into serialized jobs

:return: list[(serialized job info, Job)]
"""
inputs, sample_ids = self.get_sample_inputs()

return [
(
RunCreator(
**{
"name": "ACCESS V2 LEGACY CNV M1: %s, %i of %i" % (self.request_id, i + 1, len(inputs)),
"app": self.get_pipeline_id(),
"inputs": job,
"tags": {
settings.REQUEST_ID_METADATA_KEY: self.request_id,
"cmoSampleIds": sample_ids[i],
settings.PATIENT_ID_METADATA_KEY: "-".join(sample_ids[i].split("-")[0:2]),
},
}
)
)
for i, job in enumerate(inputs)
]

def construct_sample_inputs(self, tumor_bam, sample_sex):
"""
Use sample metadata and json template to create inputs for the CWL run

:return: JSON format sample inputs
"""
with open(os.path.join(WORKDIR, "input_template.json.jinja2")) as file:
template = Template(file.read())

tumor_sample_list = tumor_bam.path + "\t" + sample_sex
tumor_sample_id = tumor_bam.file_name.split("_cl_aln_srt_MD_IR_FX_BR")[0]

input_file = template.render(
tumor_sample_id=tumor_sample_id,
tumor_sample_list_content=json.dumps(tumor_sample_list),
)
print(input_file)
sample_input = json.loads(input_file)
return sample_input
27 changes: 27 additions & 0 deletions runner/operator/access/v2_1_0/cnv/input_template.json.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"project_name": "{{ tumor_sample_id }}",
"tumor_sample_list": {
"contents": {{ tumor_sample_list_content }},
"basename": "tumor_manifest.txt",
"class": "File"
},
"normal_sample_list": {
"class": "File",
"location": "juno:///juno/cmo/access/production/resources/msk-access/v2.0/novaseq_curated_unfiltered_bams_dmp/versions/v1.0/normal_manifest_access_v2_plasma.txt"
},
"threads": 8,
"tmp_dir": "/scratch",
"targets_coverage_bed": {
"class": "File",
"location": "juno:///juno/cmo/access/production/resources/msk-access/v2.0/regions_of_interest/versions/v1.0/ACCESSv2_targets_coverage.bed"
},
"targets_coverage_annotation": {
"class": "File",
"location": "juno:///juno/cmo/access/production/resources/msk-access/v2.0/regions_of_interest/versions/v1.0/ACCESSv2_targets_coverage.txt"
},
"reference_fasta": {
"class": "File",
"location": "juno:///juno/work/access/production/resources/reference/versions/hg19_virus_special/hg19_virus.fasta"
},
"version": "1.3.40"
}
139 changes: 139 additions & 0 deletions runner/operator/access/v2_1_0/msi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""""" """""" """""" """""" """""
" ACCESS-Pipeline MSI workflow operator
" http://www.github.com/mskcc/access-pipeline/workflows/msi.cwl
""" """""" """""" """""" """""" ""

import os
import json
import logging
from jinja2 import Template
from django.conf import settings
from file_system.models import File
from runner.operator.operator import Operator
from runner.run.objects.run_creator_object import RunCreator
from runner.models import Port, RunStatus
from runner.operator.access import (
get_request_id,
get_request_id_runs,
create_cwl_file_object,
find_request_bams,
is_tumor_bam,
)


logger = logging.getLogger(__name__)

# Todo: needs to work for Nucleo bams as well
TUMOR_SEARCH = "-L0"
NORMAL_SEARCH = "-N0"
STANDARD_BAM_SEARCH = "_cl_aln_srt_MD_IR_FX_BR.bam"
WORKDIR = os.path.dirname(os.path.abspath(__file__))


class AccessV2LegacyMSIOperator(Operator):
"""
Operator for the ACCESS Legacy Microsatellite Instability workflow:

http://www.github.com/mskcc/access-pipeline/workflows/subworkflows/msi.cwl

This Operator will search for ACCESS Standard Bam files based on an IGO Request ID. It will
also find the matched normals based on the patient ID.
"""

@staticmethod
def is_tumor_bam(file):
# Todo: extract to common fn across 4 downstream operators
if not file.file_name.endswith(".bam"):
return False
t_n_timepoint = file.file_name.split("-")[2]
return not t_n_timepoint[0] == "N"

def get_sample_inputs(self):
"""
Create all sample inputs for all runs triggered in this instance of the operator.

:return: list of json_objects
"""
runs, self.request_id = get_request_id_runs(
["access v2 nucleo", "access nucleo"], self.run_ids, self.request_id
)

bams = []
for run in runs:
bams.append(find_request_bams(run))

# TUMOR Uncollapsed
standard_tumor_bams = [b["uncollapsed_bam"] for b in bams if is_tumor_bam(b["fgbio_collapsed_bam"].file_name)]

# Dictionary that associates tumor bam with standard bam with tumor_sample_id
sample_tumor_normal = {}
for standard_tumor_bam in standard_tumor_bams:
tumor_sample_id = standard_tumor_bam.file_name.split("_cl_aln")[0]
patient_id = "-".join(tumor_sample_id.split("-")[0:2])

# Find the matched Normal Standard bam (which could be associated with a different request_id)
sample_search_start = patient_id + NORMAL_SEARCH
matched_normal_bam = File.objects.filter(
file_name__startswith=sample_search_start, file_name__endswith=STANDARD_BAM_SEARCH
)
if not len(matched_normal_bam) > 0:
msg = "No matching standard normal Bam found for patient {}".format(patient_id)
logger.warning(msg)
continue

matched_normal_bam = matched_normal_bam.order_by("-created_date").first()

sample_tumor_normal[tumor_sample_id] = {"normal": matched_normal_bam, "tumor": standard_tumor_bam}

sample_inputs = [
self.construct_sample_inputs(key, value["tumor"], value["normal"])
for key, value in sample_tumor_normal.items()
]

return sample_inputs

def get_jobs(self):
"""
Convert job inputs into serialized jobs

:return: list[(serialized job info, Job)]
"""
inputs = self.get_sample_inputs()

return [
RunCreator(
**{
"name": "ACCESS V2 LEGACY MSI M1: %s, %i of %i" % (self.request_id, i + 1, len(inputs)),
"app": self.get_pipeline_id(),
"inputs": job,
"tags": {
settings.REQUEST_ID_METADATA_KEY: self.request_id,
"cmoSampleIds": job["sample_name"],
settings.PATIENT_ID_METADATA_KEY: "-".join(job["sample_name"][0].split("-")[0:2]),
},
}
)
for i, job in enumerate(inputs)
]

def construct_sample_inputs(self, sample_name, tumor_bam, matched_normal_bam):
"""
Use sample metadata and json template to create inputs for the CWL run

:return: JSON format sample inputs
"""
with open(os.path.join(WORKDIR, "input_template.json.jinja2")) as file:
template = Template(file.read())

sample_names = [sample_name]
matched_normal_bams = [create_cwl_file_object(matched_normal_bam.path)]
tumor_bams = [create_cwl_file_object(tumor_bam.path)]

input_file = template.render(
tumor_bams=json.dumps(tumor_bams),
normal_bams=json.dumps(matched_normal_bams),
sample_names=json.dumps(sample_names),
)

sample_input = json.loads(input_file)
return sample_input
Loading
Loading