diff --git a/conf/eggo/eggo-director.cfg b/conf/eggo/eggo-director.cfg index 3691cc8..175e8d1 100644 --- a/conf/eggo/eggo-director.cfg +++ b/conf/eggo/eggo-director.cfg @@ -77,6 +77,9 @@ adam_home: %(work_path)s/adam ; path on worker machines where the eggo repo is checked out eggo_home: %(work_path)s/eggo +; memory allocated to spark on each slave. This is a configuration parameter for spark-submit +executor_memory: 55G + [aws] ; These can be set/overridden by setting corresponding local env vars (in diff --git a/conf/eggo/eggo.cfg b/conf/eggo/eggo.cfg index af276fa..50ec8aa 100644 --- a/conf/eggo/eggo.cfg +++ b/conf/eggo/eggo.cfg @@ -79,6 +79,9 @@ adam_home: %(work_path)s/adam ; last component of the path must be 'eggo' eggo_home: %(work_path)s/eggo +; memory allocated to spark on each slave. This is a configuration parameter for spark-submit +executor_memory: 55G + [aws] ; These can be set/overridden by setting corresponding local env vars (in diff --git a/eggo/dag.py b/eggo/dag.py index d0a62b2..654af93 100644 --- a/eggo/dag.py +++ b/eggo/dag.py @@ -23,12 +23,12 @@ from tempfile import mkdtemp from subprocess import call, check_call -from luigi import Task, Config +from luigi import Task, Config, WrapperTask from luigi.s3 import S3Target, S3FlagTarget, S3Client from luigi.hdfs import HdfsClient, HdfsTarget from luigi.file import LocalTarget from luigi.hadoop import JobTask, HadoopJobRunner -from luigi.parameter import Parameter +from luigi.parameter import Parameter, BoolParameter from eggo.config import eggo_config, validate_toast_config from eggo.util import random_id, build_dest_filename @@ -315,61 +315,101 @@ def run(self): class ADAMBasicTask(Task): - adam_command = Parameter() - allowed_file_formats = Parameter() - edition = 'basic' + copy_to_hdfs = BoolParameter(True, significant=False) - def requires(self): - return DownloadDatasetHadoopTask( - destination=ToastConfig().raw_data_url()) + @property + def adam_command(self): + '''Override with specific ADAM command and additional args. - def run(self): - format = ToastConfig().config['sources'][0]['format'].lower() - if format not in self.allowed_file_formats: - raise ValueError("Format '{0}' not in allowed formats {1}.".format( - format, self.allowed_file_formats)) + Return the string following `adam-submit`. Use self.input().path + to access the local HDFS source data. - # 1. Copy the data from source (e.g. S3) to Hadoop's default filesystem - tmp_hadoop_path = '/tmp/{rand_id}.{format}'.format(rand_id=random_id(), - format=format) - distcp_cmd = '{hadoop_home}/bin/hadoop distcp {source} {target}'.format( - hadoop_home=eggo_config.get('worker_env', 'hadoop_home'), - source=ToastConfig().raw_data_url(), target=tmp_hadoop_path) - check_call(distcp_cmd, shell=True) + Example: + + @property + def adam_command(self): + return 'transform {source} {target}'.format( + source=self.self.input().path, target=self.output().path) + ''' + raise NotImplementedError('Subclass must override self.adam_command to ' + 'return the command string for adam-submit') + + def run(self): + if self.copy_to_hdfs: + # 1. Copy the data from source (e.g. S3) to Hadoop's default filesystem + base, ext = os.path.splitext(self.input().path) + tmp_hadoop_path = '/tmp/{rand_id}{ext}'.format(rand_id=random_id(), + ext=ext) + distcp_cmd = '{hadoop_home}/bin/hadoop distcp {source} {target}'.format( + hadoop_home=eggo_config.get('worker_env', 'hadoop_home'), + source=self.input().path, + target=tmp_hadoop_path) + check_call(distcp_cmd, shell=True) + + # swap the tmp filepath into the original command + adam_command = self.adam_command.replace(self.input().path, tmp_hadoop_path) + else: + adam_command = self.adam_command # 2. Run the adam-submit job - adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} {adam_command} ' - '{source} {target}').format( + adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} ' + '--executor-memory {executor_memory} ' + '{adam_command}').format( adam_home=eggo_config.get('worker_env', 'adam_home'), spark_master=eggo_config.get('worker_env', 'spark_master'), - adam_command=self.adam_command, source=tmp_hadoop_path, - target=ToastConfig().edition_url(edition=self.edition)) + executor_memory=eggo_config.get('worker_env', 'executor_memory'), + adam_command=adam_command) check_call(adam_cmd, shell=True) + if self.copy_to_hdfs: + delete_tmp_cmd = '{hadoop_home}/bin/hadoop fs -rmr {tmp_file}'.format( + hadoop_home=eggo_config.get('worker_env', 'hadoop_home'), + tmp_file=tmp_hadoop_path) + check_call(delete_tmp_cmd, shell=True) + + +class ConvertToADAMTask(ADAMBasicTask): + + allowed_file_formats = Parameter() + edition = Parameter('basic') + + @property + def adam_command(self): + if 'bam' in self.allowed_file_formats: + cmd_template = 'transform {source} {target}' + elif 'vcf' in self.allowed_file_formats: + cmd_template = 'vcf2adam {source} {target}' + return cmd_template.format( + source=self.input().path, + target=self.output().path) + + def requires(self): + format = ToastConfig().config['sources'][0]['format'].lower() + if format not in self.allowed_file_formats: + raise ValueError("Format '{0}' not in allowed formats {1}.".format( + format, self.allowed_file_formats)) + + return DownloadDatasetHadoopTask( + destination=ToastConfig().raw_data_url()) + def output(self): return flag_target(ToastConfig().edition_url(edition=self.edition)) -class ADAMFlattenTask(Task): +class ADAMFlattenTask(ADAMBasicTask): - adam_command = Parameter() allowed_file_formats = Parameter() source_edition = 'basic' edition = 'flat' - def requires(self): - return ADAMBasicTask(adam_command=self.adam_command, - allowed_file_formats=self.allowed_file_formats) + @property + def adam_command(self): + return 'flatten {source} {target}'.format( + source=self.input().path, target=self.output().path) - def run(self): - adam_cmd = ('{adam_home}/bin/adam-submit --master {spark_master} flatten ' - '{source} {target}').format( - adam_home=eggo_config.get('worker_env', 'adam_home'), - spark_master=eggo_config.get('worker_env', 'spark_master'), - source=ToastConfig().edition_url( - edition=self.source_edition), - target=ToastConfig().edition_url(edition=self.edition)) - check_call(adam_cmd, shell=True) + def requires(self): + return ConvertToADAMTask(edition=self.source_edition, + allowed_file_formats=self.allowed_file_formats) def output(self): return flag_target(ToastConfig().edition_url(edition=self.edition)) @@ -381,13 +421,11 @@ def output(self): return flag_target(ToastConfig().edition_url(edition=self.edition)) -class VCF2ADAMTask(Task): +class VCF2ADAMTask(WrapperTask): def requires(self): - basic = ADAMBasicTask(adam_command='vcf2adam', - allowed_file_formats=['vcf']) - flat = ADAMFlattenTask(adam_command='vcf2adam', - allowed_file_formats=['vcf']) + basic = ConvertToADAMTask(allowed_file_formats=['vcf']) + flat = ADAMFlattenTask(allowed_file_formats=['vcf']) dependencies = [basic] conf = ToastConfig().config editions = conf['editions'] if 'editions' in conf else [] @@ -398,20 +436,12 @@ def requires(self): dependencies.append(flat) return dependencies - def run(self): - pass - - def output(self): - pass - -class BAM2ADAMTask(Task): +class BAM2ADAMTask(WrapperTask): def requires(self): - basic = ADAMBasicTask(adam_command='transform', - allowed_file_formats=['sam', 'bam']) - flat = ADAMFlattenTask(adam_command='transform', - allowed_file_formats=['sam', 'bam']) + basic = ConvertToADAMTask(allowed_file_formats=['sam', 'bam']) + flat = ADAMFlattenTask(allowed_file_formats=['sam', 'bam']) dependencies = [basic] conf = ToastConfig().config editions = conf['editions'] if 'editions' in conf else [] @@ -421,4 +451,3 @@ def requires(self): elif edition == 'flat': dependencies.append(flat) return dependencies - diff --git a/examples/batch_transform.py b/examples/batch_transform.py new file mode 100644 index 0000000..0264317 --- /dev/null +++ b/examples/batch_transform.py @@ -0,0 +1,73 @@ +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Luigi workflow to convert a batch of bams to ADAM""" +import os.path + +import luigi +from luigi import Parameter, WrapperTask +from luigi.s3 import S3PathTask, S3Target, S3Client + +from eggo.dag import ADAMBasicTask + + +def convert_s3n(s): + return s.replace('s3://', 's3n://') + + +class ADAMTransformTask(ADAMBasicTask): + + source_uri = Parameter() + destination_uri = Parameter(None) + + @property + def adam_command(self): + return 'transform {source} {target}'.format( + source=self.input().path, + target=self.output().path) + + def requires(self): + return S3PathTask(path=self.source_uri) + + def output(self): + if self.destination_uri is not None: + return S3Target(self.destination_uri) + return S3Target(self.source_uri.replace('.bam', '.adam')) + + +class BatchTransform(WrapperTask): + + source_folder = Parameter() + destination_folder = Parameter(None) + + def requires(self): + source_folder_s3n = convert_s3n(self.source_folder) + + if self.destination_folder is not None: + destination_folder_s3n = convert_s3n(self.destination_folder) + else: + destination_folder_s3n = source_folder_s3n + + s3 = S3Client() + for key_path in s3.list(self.source_folder): + if key_path.endswith('.bam') or key_path.endswith('.sam'): + source_uri = os.path.join(source_folder_s3n, key_path) + destination_uri = os.path.join(destination_folder_s3n, key_path.replace('.bam', '.adam')) + yield ADAMTransformTask(source_uri=source_uri, destination_uri=destination_uri) + + +if __name__ == '__main__': + luigi.run() diff --git a/examples/count_kmers.py b/examples/count_kmers.py new file mode 100644 index 0000000..7a153dc --- /dev/null +++ b/examples/count_kmers.py @@ -0,0 +1,63 @@ +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Luigi workflow to convert bams to ADAM and count kmers""" + +import luigi +from luigi import Parameter +from luigi.s3 import S3Target, S3PathTask + +from eggo.dag import ADAMBasicTask + + +class ADAMTransformTask(ADAMBasicTask): + + source_uri = Parameter() + + @property + def adam_command(self): + return 'transform {source} {target}'.format( + source=self.input().path, + target=self.output().path) + + def requires(self): + return S3PathTask(path=self.source_uri) + + def output(self): + return S3Target(self.source_uri.replace('.bam', '.adam')) + + +class CountKmersTask(ADAMBasicTask): + + source_uri = Parameter() + kmer_length = Parameter(21) + + @property + def adam_command(self): + return 'count_kmers {source} {target} {kmer_length}'.format( + source=self.input().path, + target=self.output().path, + kmer_length=self.kmer_length) + + def requires(self): + return ADAMTransformTask(source_uri=self.source_uri) + + def output(self): + return S3Target(self.source_uri.replace('.bam', '.kmer')) + + +if __name__ == '__main__': + luigi.run() diff --git a/test/jenkins/conf/eggo.jenkins.local.cfg b/test/jenkins/conf/eggo.jenkins.local.cfg index 306ede1..1635aec 100644 --- a/test/jenkins/conf/eggo.jenkins.local.cfg +++ b/test/jenkins/conf/eggo.jenkins.local.cfg @@ -71,6 +71,9 @@ adam_home: %(work_path)s/adam ; path on worker machines where the eggo repo is checked out eggo_home: %(work_path)s/eggo +; memory allocated to spark on each slave. This is a configuration parameter for spark-submit +executor_memory: 55G + [aws] ; These can be set/overridden by setting corresponding local env vars (in ALL_CAPS) diff --git a/test/jenkins/conf/eggo.jenkins.spark_ec2.cfg b/test/jenkins/conf/eggo.jenkins.spark_ec2.cfg index f035567..7051206 100644 --- a/test/jenkins/conf/eggo.jenkins.spark_ec2.cfg +++ b/test/jenkins/conf/eggo.jenkins.spark_ec2.cfg @@ -72,6 +72,9 @@ adam_home: %(work_path)s/adam ; path on worker machines where the eggo repo is checked out eggo_home: %(work_path)s/eggo +; memory allocated to spark on each slave. This is a configuration parameter for spark-submit +executor_memory: 55G + [aws] ; These can be set/overridden by setting corresponding local env vars (in ALL_CAPS)