Skip to content

Commit

Permalink
update n_threads requirements for all processes
Browse files Browse the repository at this point in the history
  • Loading branch information
golobor committed Jan 2, 2019
1 parent 941c447 commit a587ef1
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions distiller.nf
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ LIB_RUN_SOURCES_LOCAL_TRUNCATE_CHUNK
def fastqDumpCmd(file_or_srr, library, run, srr_start=0, srr_end=-1, threads=1) {
def srr_start_flag = (srr_start == 0) ? '' : (' --minSpotId ' + srr_start)
def srr_end_flag = (srr_end == -1) ? '' : (' --maxSpotId ' + srr_end)
def bgzip_threads = Math.max(1,((threads as int)-2).intdiv(2))

def cmd = """
HOME=`readlink -e ./`
fastq-dump ${file_or_srr} -Z --split-spot ${srr_start_flag} ${srr_end_flag} \
| pyfilesplit --lines 4 \
>(bgzip -c -@{threads} > ${library}.${run}.1.fastq.gz) \
>(bgzip -c -@{threads} > ${library}.${run}.2.fastq.gz) \
>(bgzip -c -@{bgzip_threads} > ${library}.${run}.1.fastq.gz) \
>(bgzip -c -@{bgzip_threads} > ${library}.${run}.2.fastq.gz) \
| cat """

return cmd
Expand Down Expand Up @@ -164,25 +165,26 @@ def sraDownloadTruncateCmd(sra_query, library, run, truncate_fastq_reads=0,

if ((srr_start > 0) || (srr_end != -1)) {
cmd = """
${fastqDumpCmd(srr, library, run, srr_start, srr_end)}
${fastqDumpCmd(srr, library, run, srr_start, srr_end, threads)}
if [ -d ./ncbi ]; then rm -Rf ./ncbi; fi
"""
}
else {
cmd = """
wget ftp://ftp-trace.ncbi.nlm.nih.gov/sra/sra-instant/reads/ByRun/sra/SRR/SRR${srrnum.take(3)}/${srr}/${srr}.sra -qO ${srr}.sra
${fastqDumpCmd(srr+'.sra', library, run, 0, -1)}
${fastqDumpCmd(srr+'.sra', library, run, 0, -1, threads)}
rm ${srr}.sra
"""
}

def chunk_lines = 4 * chunksize
def split_bgzip_threads = Math.max(1, (threads as int)-1)
if ( (truncate_fastq_reads == 0) && (chunk_lines > 0) ) {
for (side in 1..2) {
cmd += """
zcat ${library}.${run}.${side}.fastq.gz | \
split -l ${chunk_lines} --numeric-suffixes=1 \
--filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \
--filter 'bgzip -c -@ ${split_bgzip_threads} > \$FILE.${side}.fastq.gz' - \
${library}.${run}.
rm ${library}.${run}.${side}.fastq.gz
"""
Expand All @@ -200,22 +202,23 @@ def sraDownloadTruncateCmd(sra_query, library, run, truncate_fastq_reads=0,


String fastqDownloadTruncateCmd(query, library, run, side,
truncate_fastq_reads=0, chunksize=0, threads=1) {
truncate_fastq_reads=0, chunksize=0, threads=2) {
def cmd = ''

def truncate_lines = 4 * truncate_fastq_reads
def chunk_lines = 4 * chunksize
def bgzip_threads = Math.max(1, (threads as int)-1)

if (truncate_lines > 0) {
cmd = """head -n ${truncate_lines} < <(wget ${query} -O - | gunzip -cd )\
| bgzip -c -@ ${threads} \
| bgzip -c -@ ${bgzip_threads} \
> ${library}.${run}.0.${side}.fastq.gz
"""
} else if (chunk_lines > 0) {
cmd = """wget ${query} -O - \
| gunzip -cd \
| split -l ${chunk_lines} --numeric-suffixes=1 \
--filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \
--filter 'bgzip -c -@ ${bgzip_threads} > \$FILE.${side}.fastq.gz' - \
${library}.${run}.
"""
} else {
Expand All @@ -232,18 +235,18 @@ String fastqLocalTruncateChunkCmd(path, library, run, side,

def truncate_lines = 4 * truncate_fastq_reads
def chunk_lines = 4 * chunksize

def bgzip_threads = Math.max(1, (threads as int)-1)

if (truncate_lines > 0) {
cmd = """head -n ${truncate_lines} < <( zcat ${path} ) \
| bgzip -c -@ ${threads} \
| bgzip -c -@ ${bgzip_threads} \
> ${library}.${run}.0.${side}.fastq.gz
"""
} else if (chunk_lines > 0) {
cmd = """
zcat ${path} | \
split -l ${chunk_lines} --numeric-suffixes=1 \
--filter 'bgzip -c -@ ${threads} > \$FILE.${side}.fastq.gz' - \
--filter 'bgzip -c -@ ${bgzip_threads} > \$FILE.${side}.fastq.gz' - \
${library}.${run}.
"""
} else {
Expand Down Expand Up @@ -447,15 +450,19 @@ process map_parse_sort_chunks {
params['parse'].get('keep_unparsed_bams','false').toBoolean() ?
"| tee >(samtools view -bS > ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.bam)" : "" )
def parsing_options = params['parse'].get('parsing_options','')
def bwa_threads = Math.max(1,
((((task.cpus as int)*0.6).round()) as int))
def sorting_threads = Math.max(1, (task.cpus as int)-bwa_threads)

"""
TASK_TMP_DIR=\$(mktemp -d -p ${task.distillerTmpDir} distiller.tmp.XXXXXXXXXX)
touch ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.bam
bwa mem -t ${task.cpus} ${mapping_options} -SP ${bwa_index_base} ${fastq1} ${fastq2} \
bwa mem -t ${bwa_threads} ${mapping_options} -SP ${bwa_index_base} ${fastq1} ${fastq2} \
${keep_unparsed_bams_command} \
| pairtools parse ${dropsam_flag} ${dropreadid_flag} ${dropseq_flag} \
${parsing_options} \
-c ${chrom_sizes} \
| pairtools sort --nproc ${task.cpus} \
| pairtools sort --nproc ${sorting_threads} \
-o ${library}.${run}.${ASSEMBLY_NAME}.${chunk}.pairsam.${suffix} \
--tmpdir \$TASK_TMP_DIR \
| cat
Expand Down

0 comments on commit a587ef1

Please sign in to comment.