Skip to content

Commit

Permalink
implement gffcsqify, improve vcfparallel for query support etc
Browse files Browse the repository at this point in the history
  • Loading branch information
kdm9 committed Nov 7, 2023
1 parent 22914ef commit dd22a87
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 19 deletions.
14 changes: 9 additions & 5 deletions blsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
from .genigvjs import genigvjs_main
cmds["genigvjs"] = genigvjs_main

from .liftoff_gff3 import liftoff_gff3_main
cmds["liftoff-gff3"] = liftoff_gff3_main

from .gfftagsane import main as gfftagsane_main
cmds["gfftagsane"] = gfftagsane_main

from .pansn_rename import main as pansn_rename_main
cmds["pansn-rename"] = pansn_rename_main
Expand Down Expand Up @@ -76,6 +71,15 @@
from .gffcat import gffcat_main
cmds["gffcat"] = gffcat_main

from .gffcsqify import main as gffcsqify_main
cmds["gffcsqify"] = gffcsqify_main

from .liftoff_gff3 import liftoff_gff3_main
cmds["liftoff-gff3"] = liftoff_gff3_main

from .gfftagsane import main as gfftagsane_main
cmds["gfftagsane"] = gfftagsane_main


from .pairslash import main as pairslash_main
cmds["pairslash"] = pairslash_main
Expand Down
37 changes: 37 additions & 0 deletions blsl/gffcsqify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
from blsl.gffparse import parseGFF3, write_gene, gffInfoFields, write_gff_line
import argparse


def main(argv=None):
"""Format a reasonably compliant GFF for use with bcftools csq"""
ap = argparse.ArgumentParser()
ap.add_argument("-o", "--output", default="/dev/stdout",
help="Output GFF compatible with bcftools csq")
ap.add_argument("input", help="Input GFF")
args = ap.parse_args(argv)

with open(args.output, "w") as fh:
for entry in parseGFF3(args.input):
if entry["type"] == "gene":
id = entry["attributes"]["ID"]
entry["attributes"]["ID"] = f"gene:{id}"
entry["attributes"]["biotype"] = "protein_coding" # FIXME properly parse this
elif entry["type"] in ("mRNA",):
id = entry["attributes"]["ID"]
parent = entry["attributes"]["Parent"]
entry["attributes"]["ID"] = f"transcript:{id}"
entry["attributes"]["Parent"] = f"gene:{parent}"
entry["attributes"]["biotype"] = "protein_coding" # FIXME properly parse this
elif entry["type"] in ("exon", "CDS", "five_prime_UTR", "three_prime_UTR"):
parent = entry["attributes"]["Parent"]
entry["attributes"]["Parent"] = f"transcript:{parent}"
#if entry["type"] == "CDS" and entry["attributes"]["original_annotator"] == "Liftoff" and entry["phase"] is None:
# entry["phase"] = 0
else:
continue
write_gff_line(entry, file=fh)


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions blsl/gffparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ def prefix_name(entry, sid):
#print("H", gcid, gchild["attributes"]["Name"])


def write_gff_line(line, file=None):
attr = ";".join(f"{k}={v}" for k, v in line["attributes"].items())
cols = [line.get(f, ".") for f in gffInfoFields[:-1]] + [attr]
cols = ["." if x is None else x for x in cols]
print(*cols, file=file, sep="\t")

def write_line(entry, file):
x = [entry[field] if entry[field] is not None else "." for i, field in enumerate(gffInfoFields)]
x[-1] = attr2line(x[-1])
Expand Down
75 changes: 61 additions & 14 deletions blsl/vcfparallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from tqdm import tqdm
from cyvcf2 import VCF

import sys
from sys import stdin, stdout, stderr
import shutil
import subprocess
import argparse
import math
Expand All @@ -17,10 +17,10 @@
import uuid


def parallel_regions(vcf, cores=1):
def parallel_regions(vcf, cores=1, npercore=10):
V = VCF(vcf)
# 10 chunks per chrom per core
chunks = len(V.seqlens)*10*cores
chunks = len(V.seqlens)*npercore*cores
for cname, clen in zip(V.seqnames, V.seqlens):
chunk = int(max(min(clen, 1000), math.ceil(clen/chunks)))
for start in range(0, clen, chunk):
Expand All @@ -29,42 +29,87 @@ def parallel_regions(vcf, cores=1):
yield f"{cname}:{s:09d}-{e:09d}"


def one_chunk(vcf, chunk, temp_prefix, filter="", pipeline=""):
def one_chunk(vcf, chunk, temp_prefix, filter="", pipeline="", index=True):
cmd=f"bcftools view -r {chunk} {filter} -Ou {vcf}"
if pipeline:
cmd = f"{cmd} | {pipeline}"
ofile = f"{temp_prefix}{chunk}.bcf"
with open(ofile, "wb") as ofh, open(ofile + ".log", "wb") as log:
subprocess.run(cmd, shell=True, stdout=ofh, stderr=log, check=True)
subprocess.run(f"bcftools index -f {ofile}", shell=True, stderr=log, check=True)
with open(ofile + ".log", "wb") as log:
with open(ofile, "wb") as ofh:
subprocess.run(cmd, shell=True, stdout=ofh, stderr=log, check=True)
if index:
subprocess.run(f"bcftools index -f {ofile}", shell=True, stderr=log, check=True)
return ofile


def chunkwise_pipeline(args):
filestomerge = []
with ProcessPoolExecutor(args.threads) as exc:
jobs = []
for region in parallel_regions(args.vcf):
jobs.append(exc.submit(one_chunk, args.vcf, region, args.temp_prefix, filter=args.filter, pipeline=args.commands))
for job in tqdm(as_completed(jobs), total=len(jobs), unit="chunk"):
if args.regions is None:
regions = parallel_regions(args.vcf)
else:
regions = set()
for line in args.regions:
chrom, start0, end, *_ = line.rstrip("\n").split("\t")
start0 = int(start0)
regions.add(f"{chrom}:{start0+1}-{end}")
regions = list(sorted(regions))
for region in regions:
jobs.append(exc.submit(one_chunk, args.vcf, region, args.temp_prefix, filter=args.filter, pipeline=args.commands, index=not args.merge_with_cat))
for job in tqdm(as_completed(jobs), total=len(jobs), unit="chunk", desc="chunkwise variant pipeline"):
ofile = job.result()
filestomerge.append(ofile)
return filestomerge


def merge_one(files, prefix, threads=1):
fofn = f"{prefix}fofn.txt"
output = f"{prefix}output.bcf"
with open(fofn, "w") as fh:
for file in sorted(files):
print(file, file=fh)
cmd = f"bcftools concat --file-list {fofn} --rm-dup all --allow-overlaps --threads {threads} -Ob0 -o {output} && bcftools index -f {output}"
proc = subprocess.run(cmd, shell=True, check=True)
return output

def merge_results(args, filestomerge):
fofn = f"{args.temp_prefix}fofn.txt"
if args.merge_with_cat:
with open(args.output, "wb") as ofh:
for file in tqdm(filestomerge, desc="merge with cat", unit="file"):
with open(file, "rb") as fh:
shutil.copyfileobj(fh, ofh)
return args.output
if args.threads > 4:
print("Grouped merge -> {args.threads} groups")
groups = [list() for _ in range(args.threads)]
for i, file in enumerate(filestomerge):
groups[i%args.threads].append(file)
final_merge = []
with ProcessPoolExecutor(args.threads) as exc:
jobs = []
for i, files in enumerate(groups):
jobs.append(exc.submit(merge_one, files, f"{args.temp_prefix}merge_group_{i}_"))
for job in tqdm(as_completed(jobs), total=len(jobs), unit="group"):
ofile = job.result()
final_merge.append(ofile)
else:
final_merge = filestomerge

fofn = f"{args.temp_prefix}final_fofn.txt"
with open(fofn, "w") as fh:
for file in sorted(filestomerge):
for file in sorted(final_merge):
print(file, file=fh)
cmd = f"bcftools concat --file-list {fofn} --rm-dup all --allow-overlaps --threads {args.threads} -O{args.outformat} -o {args.output}"
proc = subprocess.run(cmd, shell=True, check=True)
return proc.returncode
return args.output


def main(argv=None):
"""Use bcftools to calculate various statistics, outputing an R-ready table"""
ap = argparse.ArgumentParser()
ap.add_argument("-R", "--regions", type=argparse.FileType("rt"),
help="Use regions defined in this BED file. NB! Will only do these regions and will not check for overlaps. Be careful. Cols should be chrom, start0, end, ... and tab separated (i.e. a BED file).")
ap.add_argument("-o", "--output", required=True,
help="Output V/BCF file")
ap.add_argument("-O", "--outformat", default="z8",
Expand All @@ -74,7 +119,9 @@ def main(argv=None):
ap.add_argument("-f", "--filter", default="", type=str,
help="bcftools view arguments for variant filtering")
ap.add_argument("-c", "--commands", default="", type=str,
help="command(s) to operate. Must take uncompressed bcf on stdin and yield uncompresed bcf on stdout. Can use | and other shell features.")
help="command(s) to operate. Must take uncompressed bcf on stdin and yield bcf (i.e -Ob0) on stdout. Can use | and other shell features.")
ap.add_argument("-M", "--merge-with-cat", action="store_true",
help="command(s) produce a non-BCF file, so merge with 'cat'.")
ap.add_argument("-t", "--threads", default=1, type=int,
help="Number of parallel threads")
ap.add_argument("vcf")
Expand Down

0 comments on commit dd22a87

Please sign in to comment.