Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize AdamBasicTask for any ADAM command. Closes #13 #106

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/eggo/eggo-director.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ adam_home: %(work_path)s/adam
; path on worker machines where the eggo repo is checked out
eggo_home: %(work_path)s/eggo

; memory allocated to spark on each slave. This is a configuration parameter for spark-submit
executor_memory: 55G


[aws]
; These can be set/overridden by setting corresponding local env vars (in
Expand Down
3 changes: 3 additions & 0 deletions conf/eggo/eggo.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ adam_home: %(work_path)s/adam
; last component of the path must be 'eggo'
eggo_home: %(work_path)s/eggo

; memory allocated to spark on each slave. This is a configuration parameter for spark-submit
executor_memory: 55G


[aws]
; These can be set/overridden by setting corresponding local env vars (in
Expand Down
139 changes: 84 additions & 55 deletions eggo/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
from tempfile import mkdtemp
from subprocess import call, check_call

from luigi import Task, Config
from luigi import Task, Config, WrapperTask
from luigi.s3 import S3Target, S3FlagTarget, S3Client
from luigi.hdfs import HdfsClient, HdfsTarget
from luigi.file import LocalTarget
from luigi.hadoop import JobTask, HadoopJobRunner
from luigi.parameter import Parameter
from luigi.parameter import Parameter, BoolParameter

from eggo.config import eggo_config, validate_toast_config
from eggo.util import random_id, build_dest_filename
Expand Down Expand Up @@ -315,61 +315,101 @@ def run(self):

class ADAMBasicTask(Task):

adam_command = Parameter()
allowed_file_formats = Parameter()
edition = 'basic'
copy_to_hdfs = BoolParameter(True, significant=False)

def requires(self):
return DownloadDatasetHadoopTask(
destination=ToastConfig().raw_data_url())
@property
def adam_command(self):
'''Override with specific ADAM command and additional args.

def run(self):
format = ToastConfig().config['sources'][0]['format'].lower()
if format not in self.allowed_file_formats:
raise ValueError("Format '{0}' not in allowed formats {1}.".format(
format, self.allowed_file_formats))
Return the string following `adam-submit`. Use self.input().path
to access the local HDFS source data.

# 1. Copy the data from source (e.g. S3) to Hadoop's default filesystem
tmp_hadoop_path = '/tmp/{rand_id}.{format}'.format(rand_id=random_id(),
format=format)
distcp_cmd = '{hadoop_home}/bin/hadoop distcp {source} {target}'.format(
hadoop_home=eggo_config.get('worker_env', 'hadoop_home'),
source=ToastConfig().raw_data_url(), target=tmp_hadoop_path)
check_call(distcp_cmd, shell=True)
Example:

@property
def adam_command(self):
return 'transform {source} {target}'.format(
source=self.self.input().path, target=self.output().path)
'''
raise NotImplementedError('Subclass must override self.adam_command to '
'return the command string for adam-submit')

def run(self):
if self.copy_to_hdfs:
# 1. Copy the data from source (e.g. S3) to Hadoop's default filesystem
base, ext = os.path.splitext(self.input().path)
tmp_hadoop_path = '/tmp/{rand_id}{ext}'.format(rand_id=random_id(),
ext=ext)
distcp_cmd = '{hadoop_home}/bin/hadoop distcp {source} {target}'.format(
hadoop_home=eggo_config.get('worker_env', 'hadoop_home'),
source=self.input().path,
target=tmp_hadoop_path)
check_call(distcp_cmd, shell=True)

# swap the tmp filepath into the original command
adam_command = self.adam_command.replace(self.input().path, tmp_hadoop_path)
else:
adam_command = self.adam_command

# 2. Run the adam-submit job
adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} {adam_command} '
'{source} {target}').format(
adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} '
'--executor-memory {executor_memory} '
'{adam_command}').format(
adam_home=eggo_config.get('worker_env', 'adam_home'),
spark_master=eggo_config.get('worker_env', 'spark_master'),
adam_command=self.adam_command, source=tmp_hadoop_path,
target=ToastConfig().edition_url(edition=self.edition))
executor_memory=eggo_config.get('worker_env', 'executor_memory'),
adam_command=adam_command)
check_call(adam_cmd, shell=True)

if self.copy_to_hdfs:
delete_tmp_cmd = '{hadoop_home}/bin/hadoop fs -rmr {tmp_file}'.format(
hadoop_home=eggo_config.get('worker_env', 'hadoop_home'),
tmp_file=tmp_hadoop_path)
check_call(delete_tmp_cmd, shell=True)


class ConvertToADAMTask(ADAMBasicTask):

allowed_file_formats = Parameter()
edition = Parameter('basic')

@property
def adam_command(self):
if 'bam' in self.allowed_file_formats:
cmd_template = 'transform {source} {target}'
elif 'vcf' in self.allowed_file_formats:
cmd_template = 'vcf2adam {source} {target}'
return cmd_template.format(
source=self.input().path,
target=self.output().path)

def requires(self):
format = ToastConfig().config['sources'][0]['format'].lower()
if format not in self.allowed_file_formats:
raise ValueError("Format '{0}' not in allowed formats {1}.".format(
format, self.allowed_file_formats))

return DownloadDatasetHadoopTask(
destination=ToastConfig().raw_data_url())

def output(self):
return flag_target(ToastConfig().edition_url(edition=self.edition))


class ADAMFlattenTask(Task):
class ADAMFlattenTask(ADAMBasicTask):

adam_command = Parameter()
allowed_file_formats = Parameter()
source_edition = 'basic'
edition = 'flat'

def requires(self):
return ADAMBasicTask(adam_command=self.adam_command,
allowed_file_formats=self.allowed_file_formats)
@property
def adam_command(self):
return 'flatten {source} {target}'.format(
source=self.input().path, target=self.output().path)

def run(self):
adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} flatten '
'{source} {target}').format(
adam_home=eggo_config.get('worker_env', 'adam_home'),
spark_master=eggo_config.get('worker_env', 'spark_master'),
source=ToastConfig().edition_url(
edition=self.source_edition),
target=ToastConfig().edition_url(edition=self.edition))
check_call(adam_cmd, shell=True)
def requires(self):
return ConvertToADAMTask(edition=self.source_edition,
allowed_file_formats=self.allowed_file_formats)

def output(self):
return flag_target(ToastConfig().edition_url(edition=self.edition))
Expand All @@ -381,13 +421,11 @@ def output(self):
return flag_target(ToastConfig().edition_url(edition=self.edition))


class VCF2ADAMTask(Task):
class VCF2ADAMTask(WrapperTask):

def requires(self):
basic = ADAMBasicTask(adam_command='vcf2adam',
allowed_file_formats=['vcf'])
flat = ADAMFlattenTask(adam_command='vcf2adam',
allowed_file_formats=['vcf'])
basic = ConvertToADAMTask(allowed_file_formats=['vcf'])
flat = ADAMFlattenTask(allowed_file_formats=['vcf'])
dependencies = [basic]
conf = ToastConfig().config
editions = conf['editions'] if 'editions' in conf else []
Expand All @@ -398,20 +436,12 @@ def requires(self):
dependencies.append(flat)
return dependencies

def run(self):
pass

def output(self):
pass


class BAM2ADAMTask(Task):
class BAM2ADAMTask(WrapperTask):

def requires(self):
basic = ADAMBasicTask(adam_command='transform',
allowed_file_formats=['sam', 'bam'])
flat = ADAMFlattenTask(adam_command='transform',
allowed_file_formats=['sam', 'bam'])
basic = ConvertToADAMTask(allowed_file_formats=['sam', 'bam'])
flat = ADAMFlattenTask(allowed_file_formats=['sam', 'bam'])
dependencies = [basic]
conf = ToastConfig().config
editions = conf['editions'] if 'editions' in conf else []
Expand All @@ -421,4 +451,3 @@ def requires(self):
elif edition == 'flat':
dependencies.append(flat)
return dependencies

73 changes: 73 additions & 0 deletions examples/batch_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to Big Data Genomics (BDG) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The BDG licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Luigi workflow to convert a batch of bams to ADAM"""
import os.path

import luigi
from luigi import Parameter, WrapperTask
from luigi.s3 import S3PathTask, S3Target, S3Client

from eggo.dag import ADAMBasicTask


def convert_s3n(s):
return s.replace('s3://', 's3n://')


class ADAMTransformTask(ADAMBasicTask):

source_uri = Parameter()
destination_uri = Parameter(None)

@property
def adam_command(self):
return 'transform {source} {target}'.format(
source=self.input().path,
target=self.output().path)

def requires(self):
return S3PathTask(path=self.source_uri)

def output(self):
if self.destination_uri is not None:
return S3Target(self.destination_uri)
return S3Target(self.source_uri.replace('.bam', '.adam'))


class BatchTransform(WrapperTask):

source_folder = Parameter()
destination_folder = Parameter(None)

def requires(self):
source_folder_s3n = convert_s3n(self.source_folder)

if self.destination_folder is not None:
destination_folder_s3n = convert_s3n(self.destination_folder)
else:
destination_folder_s3n = source_folder_s3n

s3 = S3Client()
for key_path in s3.list(self.source_folder):
if key_path.endswith('.bam') or key_path.endswith('.sam'):
source_uri = os.path.join(source_folder_s3n, key_path)
destination_uri = os.path.join(destination_folder_s3n, key_path.replace('.bam', '.adam'))
yield ADAMTransformTask(source_uri=source_uri, destination_uri=destination_uri)


if __name__ == '__main__':
luigi.run()
63 changes: 63 additions & 0 deletions examples/count_kmers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to Big Data Genomics (BDG) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The BDG licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Luigi workflow to convert bams to ADAM and count kmers"""

import luigi
from luigi import Parameter
from luigi.s3 import S3Target, S3PathTask

from eggo.dag import ADAMBasicTask


class ADAMTransformTask(ADAMBasicTask):

source_uri = Parameter()

@property
def adam_command(self):
return 'transform {source} {target}'.format(
source=self.input().path,
target=self.output().path)

def requires(self):
return S3PathTask(path=self.source_uri)

def output(self):
return S3Target(self.source_uri.replace('.bam', '.adam'))


class CountKmersTask(ADAMBasicTask):

source_uri = Parameter()
kmer_length = Parameter(21)

@property
def adam_command(self):
return 'count_kmers {source} {target} {kmer_length}'.format(
source=self.input().path,
target=self.output().path,
kmer_length=self.kmer_length)

def requires(self):
return ADAMTransformTask(source_uri=self.source_uri)

def output(self):
return S3Target(self.source_uri.replace('.bam', '.kmer'))


if __name__ == '__main__':
luigi.run()
3 changes: 3 additions & 0 deletions test/jenkins/conf/eggo.jenkins.local.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ adam_home: %(work_path)s/adam
; path on worker machines where the eggo repo is checked out
eggo_home: %(work_path)s/eggo

; memory allocated to spark on each slave. This is a configuration parameter for spark-submit
executor_memory: 55G


[aws]
; These can be set/overridden by setting corresponding local env vars (in ALL_CAPS)
Expand Down
3 changes: 3 additions & 0 deletions test/jenkins/conf/eggo.jenkins.spark_ec2.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ adam_home: %(work_path)s/adam
; path on worker machines where the eggo repo is checked out
eggo_home: %(work_path)s/eggo

; memory allocated to spark on each slave. This is a configuration parameter for spark-submit
executor_memory: 55G


[aws]
; These can be set/overridden by setting corresponding local env vars (in ALL_CAPS)
Expand Down