From a587ef15dd896689427bd301a679a7ec9bcd85bd Mon Sep 17 00:00:00 2001 From: golobor Date: Tue, 1 Jan 2019 23:19:20 -0500 Subject: [PATCH] update n_threads requirements for all processes --- distiller.nf | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/distiller.nf b/distiller.nf index db9c47d..7d3ba56 100644 --- a/distiller.nf +++ b/distiller.nf @@ -129,13 +129,14 @@ LIB_RUN_SOURCES_LOCAL_TRUNCATE_CHUNK def fastqDumpCmd(file_or_srr, library, run, srr_start=0, srr_end=-1, threads=1) { def srr_start_flag = (srr_start == 0) ? '' : (' --minSpotId ' + srr_start) def srr_end_flag = (srr_end == -1) ? '' : (' --maxSpotId ' + srr_end) + def bgzip_threads = Math.max(1,((threads as int)-2).intdiv(2)) def cmd = """ HOME=`readlink -e ./` fastq-dump ${file_or_srr} -Z --split-spot ${srr_start_flag} ${srr_end_flag} \ | pyfilesplit --lines 4 \ - >(bgzip -c -@{threads} > ${library}.${run}.1.fastq.gz) \ - >(bgzip -c -@{threads} > ${library}.${run}.2.fastq.gz) \ + >(bgzip -c -@{bgzip_threads} > ${library}.${run}.1.fastq.gz) \ + >(bgzip -c -@{bgzip_threads} > ${library}.${run}.2.fastq.gz) \ | cat """ return cmd @@ -164,25 +165,26 @@ def sraDownloadTruncateCmd(sra_query, library, run, truncate_fastq_reads=0, if ((srr_start > 0) || (srr_end != -1)) { cmd = """ - ${fastqDumpCmd(srr, library, run, srr_start, srr_end)} + ${fastqDumpCmd(srr, library, run, srr_start, srr_end, threads)} if [ -d ./ncbi ]; then rm -Rf ./ncbi; fi """ } else { cmd = """ wget ftp://ftp-trace.ncbi.nlm.nih.gov/sra/sra-instant/reads/ByRun/sra/SRR/SRR${srrnum.take(3)}/${srr}/${srr}.sra -qO ${srr}.sra - ${fastqDumpCmd(srr+'.sra', library, run, 0, -1)} + ${fastqDumpCmd(srr+'.sra', library, run, 0, -1, threads)} rm ${srr}.sra """ } def chunk_lines = 4 * chunksize + def split_bgzip_threads = Math.max(1, (threads as int)-1) if ( (truncate_fastq_reads == 0) && (chunk_lines > 0) ) { for (side in 1..2) { cmd += """ zcat ${library}.${run}.${side}.fastq.gz | \ split -l ${chunk_lines} --numeric-suffixes=1 \ - --filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \ + --filter 'bgzip -c -@ ${split_bgzip_threads} > \$FILE.${side}.fastq.gz' - \ ${library}.${run}. rm ${library}.${run}.${side}.fastq.gz """ @@ -200,22 +202,23 @@ def sraDownloadTruncateCmd(sra_query, library, run, truncate_fastq_reads=0, String fastqDownloadTruncateCmd(query, library, run, side, - truncate_fastq_reads=0, chunksize=0, threads=1) { + truncate_fastq_reads=0, chunksize=0, threads=2) { def cmd = '' def truncate_lines = 4 * truncate_fastq_reads def chunk_lines = 4 * chunksize + def bgzip_threads = Math.max(1, (threads as int)-1) if (truncate_lines > 0) { cmd = """head -n ${truncate_lines} < <(wget ${query} -O - | gunzip -cd )\ - | bgzip -c -@ ${threads} \ + | bgzip -c -@ ${bgzip_threads} \ > ${library}.${run}.0.${side}.fastq.gz """ } else if (chunk_lines > 0) { cmd = """wget ${query} -O - \ | gunzip -cd \ | split -l ${chunk_lines} --numeric-suffixes=1 \ - --filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \ + --filter 'bgzip -c -@ ${bgzip_threads} > \$FILE.${side}.fastq.gz' - \ ${library}.${run}. """ } else { @@ -232,18 +235,18 @@ String fastqLocalTruncateChunkCmd(path, library, run, side, def truncate_lines = 4 * truncate_fastq_reads def chunk_lines = 4 * chunksize - + def bgzip_threads = Math.max(1, (threads as int)-1) if (truncate_lines > 0) { cmd = """head -n ${truncate_lines} < <( zcat ${path} ) \ - | bgzip -c -@ ${threads} \ + | bgzip -c -@ ${bgzip_threads} \ > ${library}.${run}.0.${side}.fastq.gz """ } else if (chunk_lines > 0) { cmd = """ zcat ${path} | \ split -l ${chunk_lines} --numeric-suffixes=1 \ - --filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \ + --filter 'bgzip -c -@ ${bgzip_threads} > \$FILE.${side}.fastq.gz' - \ ${library}.${run}. """ } else { @@ -447,15 +450,19 @@ process map_parse_sort_chunks { params['parse'].get('keep_unparsed_bams','false').toBoolean() ? "| tee >(samtools view -bS > ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.bam)" : "" ) def parsing_options = params['parse'].get('parsing_options','') + def bwa_threads = Math.max(1, + ((((task.cpus as int)*0.6).round()) as int)) + def sorting_threads = Math.max(1, (task.cpus as int)-bwa_threads) + """ TASK_TMP_DIR=\$(mktemp -d -p ${task.distillerTmpDir} distiller.tmp.XXXXXXXXXX) touch ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.bam - bwa mem -t ${task.cpus} ${mapping_options} -SP ${bwa_index_base} ${fastq1} ${fastq2} \ + bwa mem -t ${bwa_threads} ${mapping_options} -SP ${bwa_index_base} ${fastq1} ${fastq2} \ ${keep_unparsed_bams_command} \ | pairtools parse ${dropsam_flag} ${dropreadid_flag} ${dropseq_flag} \ ${parsing_options} \ -c ${chrom_sizes} \ - | pairtools sort --nproc ${task.cpus} \ + | pairtools sort --nproc ${sorting_threads} \ -o ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.pairsam.${suffix} \ --tmpdir \$TASK_TMP_DIR \ | cat