diff --git a/blsl/vcfparallel.py b/blsl/vcfparallel.py index f410565..cacaa57 100644 --- a/blsl/vcfparallel.py +++ b/blsl/vcfparallel.py @@ -10,10 +10,10 @@ import shutil import subprocess import argparse -import math import tempfile from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path +import math import uuid import re @@ -37,9 +37,8 @@ def one_chunk(vcf, chunk, temp_prefix, filterarg="", pipeline="", index=True): cmd = f"{cmd} | {pipeline}" if index: cmd = f"{cmd} | bcftools view -Ob0 --write-index -o {ofile}" - with open(ofile + ".log", "wb") as log: - with open(ofile, "wb") as ofh: - subprocess.run(cmd, shell=True, stdout=ofh, stderr=log, check=True) + cmd = f"({cmd}) >{ofile} 2>{ofile}.log" + subprocess.run(cmd, shell=True, check=True) return ofile @@ -70,8 +69,8 @@ def merge_one(files, prefix, threads=1, merge_type="fast"): with open(fofn, "w") as fh: for file in sorted(files): print(file, file=fh) - merge = "--allow-overlaps --rm-dup all" if merge_type == "slow" else "--naive" - cmd = f"(bcftools concat --file-list {fofn} {merge} --threads {threads} -Ob0 --write-index -o {output}) &>{output.log}" + merge = "--allow-overlaps --rm-dup all" if merge_type == "slow" else "" + cmd = f"(bcftools concat --file-list {fofn} {merge} --threads {threads} -Ob0 --write-index -o {output}) >{output}.log 2>&1" try: subprocess.run(cmd, shell=True, check=True) except subprocess.CalledProcessError: @@ -88,11 +87,12 @@ def merge_results(args, filestomerge): with open(file, "rb") as fh: shutil.copyfileobj(fh, ofh) return args.output - if args.threads > 4: - print(f"Grouped merge -> {args.threads} groups") - groups = [list() for _ in range(args.threads)] - for i, file in enumerate(filestomerge): - groups[i%args.threads].append(file) + filestomerge =list(filestomerge) + if len(filestomerge) > 1000: + n_groups = int(math.ceil(math.sqrt(len(filestomerge)))) + print(f"Grouped merge -> {n_groups} groups") + groupsize = int(math.ceil(len(filestomerge) / n_groups)) + groups = [filestomerge[i:min(i+groupsize, len(filestomerge))] for i in range(0, len(filestomerge), groupsize)] final_merge = [] with ProcessPoolExecutor(args.threads) as exc: jobs = [] @@ -109,8 +109,9 @@ def merge_results(args, filestomerge): for file in sorted(final_merge): print(file, file=fh) index = "--write-index" if re.match(r"[zb]", args.outformat) else "" - merge = "--allow-overlaps --rm-dup all" if args.merge_type == "slow" else "--naive" - cmd = f"bcftools concat --file-list {fofn} {merge} --threads {args.threads} -O{args.outformat} -o {args.output} {index}" + merge = "--allow-overlaps --rm-dup all" if args.merge_type == "slow" else "" + print(f"Finally merging {len(final_merge)} files to {args.output}") + cmd = f"bcftools concat --file-list {fofn} {merge} --threads {args.threads} -O{args.outformat} -o {args.output} {index} >{args.output}.log 2>&1" subprocess.run(cmd, shell=True, check=True) return args.output @@ -126,7 +127,7 @@ def main(argv=None): help="Output file format (passed directly to bcftools -O, see man bcftools but z=vcf.gz and b=bcf)") ap.add_argument("-T", "--temp-prefix", default=None, help="Temporary output directory") - ap.add_argument("-S", "--slow-merge", default="fast", type=str, action="store_const", const="slow", dest="merge_type", + ap.add_argument("-S", "--slow-merge", default="fast", action="store_const", const="slow", dest="merge_type", help="Use slow bcftools merging (with --allow-overlaps and --remove-duplicates)") ap.add_argument("-f", "--filter", default="", type=str, help="bcftools view arguments for variant filtering")