-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path2022-12-14--lowercase-aligned-portion.py
executable file
·81 lines (66 loc) · 2.81 KB
/
2022-12-14--lowercase-aligned-portion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python3
import json
import gzip
import glob
from Bio.SeqIO.QualityIO import FastqGeneralIterator
def rc(s):
return "".join({'T':'A',
'G':'C',
'A':'T',
'C':'G',
'N':'N'}[x] for x in reversed(s))
sample_barcodes = {}
with open("laura-sample-barcodes.txt") as inf:
for line in inf:
sample_name, fwd, rev = line.strip().split()
sample_barcodes[sample_name] = fwd, rev
out_generous = []
out_strict = []
for fname in glob.glob("*.barcoding"):
with open(fname) as inf:
for lineno, line in enumerate(inf):
(score_fwd, score_rev,
fwd_is_rc, rev_is_rc,
fwd_found, rev_found,
fwd_alignment, rev_alignment) = line.strip().split("\t")
sample_names = set()
highlights = []
for score, is_rc, found, alignment, fwdrev in [
(score_fwd, fwd_is_rc, fwd_found, fwd_alignment, 0),
(score_rev, rev_is_rc, rev_found, rev_alignment, 1)]:
if float(score) > 2.5:
for sample_name in sample_barcodes:
if found == sample_barcodes[sample_name][fwdrev]:
break
else:
continue
sample_names.add(sample_name)
highlights.append(json.loads(alignment))
if sample_names:
original = fname.replace(".barcoding", "")
with gzip.open(original, mode='rt') as inf2:
for seqno, (title, sequence, quality) in enumerate(
FastqGeneralIterator(inf2)):
if seqno == lineno:
sequence = list(sequence)
for _, highlight in highlights:
for begin, end in highlight:
for i in range(begin, end):
sequence[i] = sequence[i].lower()
row = ">%s %s\n%s" % (
title,
" ".join(sorted(sample_names)),
"".join(sequence))
# found both ends and they match the same sample
if len(highlights) == 2 and len(sample_name) == 1:
out_strict.append(row)
else:
out_generous.append(row)
with open("demultiplexed-strict.fasta", "w") as outf:
for row in out_strict:
outf.write(row)
outf.write("\n")
with open("demultiplexed-generous.fasta", "w") as outf:
for row in out_generous:
outf.write(row)
outf.write("\n")