Skip to content

Commit

Permalink
improvements to vcfparallel
Browse files Browse the repository at this point in the history
  • Loading branch information
kdm9 committed Jul 2, 2024
1 parent 04535c0 commit c59bbc6
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions blsl/vcfparallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import shutil
import subprocess
import argparse
import math
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import math
import uuid
import re

Expand All @@ -37,9 +37,8 @@ def one_chunk(vcf, chunk, temp_prefix, filterarg="", pipeline="", index=True):
cmd = f"{cmd} | {pipeline}"
if index:
cmd = f"{cmd} | bcftools view -Ob0 --write-index -o {ofile}"
with open(ofile + ".log", "wb") as log:
with open(ofile, "wb") as ofh:
subprocess.run(cmd, shell=True, stdout=ofh, stderr=log, check=True)
cmd = f"({cmd}) >{ofile} 2>{ofile}.log"
subprocess.run(cmd, shell=True, check=True)
return ofile


Expand Down Expand Up @@ -70,8 +69,8 @@ def merge_one(files, prefix, threads=1, merge_type="fast"):
with open(fofn, "w") as fh:
for file in sorted(files):
print(file, file=fh)
merge = "--allow-overlaps --rm-dup all" if merge_type == "slow" else "--naive"
cmd = f"(bcftools concat --file-list {fofn} {merge} --threads {threads} -Ob0 --write-index -o {output}) &>{output.log}"
merge = "--allow-overlaps --rm-dup all" if merge_type == "slow" else ""
cmd = f"(bcftools concat --file-list {fofn} {merge} --threads {threads} -Ob0 --write-index -o {output}) >{output}.log 2>&1"
try:
subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError:
Expand All @@ -88,11 +87,12 @@ def merge_results(args, filestomerge):
with open(file, "rb") as fh:
shutil.copyfileobj(fh, ofh)
return args.output
if args.threads > 4:
print(f"Grouped merge -> {args.threads} groups")
groups = [list() for _ in range(args.threads)]
for i, file in enumerate(filestomerge):
groups[i%args.threads].append(file)
filestomerge =list(filestomerge)
if len(filestomerge) > 1000:
n_groups = int(math.ceil(math.sqrt(len(filestomerge))))
print(f"Grouped merge -> {n_groups} groups")
groupsize = int(math.ceil(len(filestomerge) / n_groups))
groups = [filestomerge[i:min(i+groupsize, len(filestomerge))] for i in range(0, len(filestomerge), groupsize)]
final_merge = []
with ProcessPoolExecutor(args.threads) as exc:
jobs = []
Expand All @@ -109,8 +109,9 @@ def merge_results(args, filestomerge):
for file in sorted(final_merge):
print(file, file=fh)
index = "--write-index" if re.match(r"[zb]", args.outformat) else ""
merge = "--allow-overlaps --rm-dup all" if args.merge_type == "slow" else "--naive"
cmd = f"bcftools concat --file-list {fofn} {merge} --threads {args.threads} -O{args.outformat} -o {args.output} {index}"
merge = "--allow-overlaps --rm-dup all" if args.merge_type == "slow" else ""
print(f"Finally merging {len(final_merge)} files to {args.output}")
cmd = f"bcftools concat --file-list {fofn} {merge} --threads {args.threads} -O{args.outformat} -o {args.output} {index} >{args.output}.log 2>&1"
subprocess.run(cmd, shell=True, check=True)
return args.output

Expand All @@ -126,7 +127,7 @@ def main(argv=None):
help="Output file format (passed directly to bcftools -O, see man bcftools but z=vcf.gz and b=bcf)")
ap.add_argument("-T", "--temp-prefix", default=None,
help="Temporary output directory")
ap.add_argument("-S", "--slow-merge", default="fast", type=str, action="store_const", const="slow", dest="merge_type",
ap.add_argument("-S", "--slow-merge", default="fast", action="store_const", const="slow", dest="merge_type",
help="Use slow bcftools merging (with --allow-overlaps and --remove-duplicates)")
ap.add_argument("-f", "--filter", default="", type=str,
help="bcftools view arguments for variant filtering")
Expand Down

0 comments on commit c59bbc6

Please sign in to comment.