-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvcf_to_HyDe_data.py
161 lines (116 loc) · 3.8 KB
/
vcf_to_HyDe_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/local/bin/python
# Python 2.7.6
# vcf_to_HyDe_data.py
# 17 April 2018
# Mathias Scharmann
import argparse
import os
########################## HEAD
# this function checks if file exists and exits script if not
def extant_file(x):
"""
'Type' for argparse - checks that file exists but does not open.
"""
if not os.path.exists(x):
print "Error: {0} does not exist".format(x)
exit()
x = str(x)
return x
# breaks script if non-UNIX linebreaks in input file popmap
def linebreak_check(x):
if "\r" in open(x, "rb").readline():
print "Error: classic mac (CR) or DOS (CRLF) linebreaks in {0}".format(x)
exit()
######
# parses arguments from command line
def get_commandline_arguments ():
parser = argparse.ArgumentParser()
parser.add_argument("--vcf", required=True, type=extant_file,
help="name/path of vcf input file", metavar="FILE")
# parser.add_argument("--format", required=True, help="the format of the vcf: stacks or freebayes")
#
args = parser.parse_args()
#
# if args.format not in ["stacks","freebayes"]:
# print "--format value not accepted; must be 'stacks' or 'freebayes' "
# exit()
# finish
return args
################################## CORE
def parse_vcf(vcf_file):
with open(vcf_file, "r") as INFILE:
for line in INFILE:
if line.startswith("##"):
continue
if line.startswith("#"):
header_line = line.lstrip("#").strip("\n").split("\t")
print header_line
break
samples = sorted(header_line[9:])
samples_vcf_idx = {}
for sample in samples:
print sample
vcf_idx = header_line.index(sample)
samples_vcf_idx[sample] = vcf_idx
# print popdict_vcf_idx
# now start the main business of walking through the vcf:
out_columns = []
with open(vcf_file, "r") as INFILE:
linecnt = 0
snpcnt = 0
for line in INFILE:
linecnt += 1
if line.startswith("#"):
continue
if len(line) < 2: # empty lines or so
continue
fields = line.strip("\n").split("\t")
if len(fields[3]) > 1:
continue # exclude variants that are not SNPs; theres no way to code those things in a phylip format!
snpcnt += 1
column = []
variants = [fields[3]] + fields[4].split(",")
# print progress every 10k sites only
if (snpcnt / 10000.0).is_integer():
print linecnt, snpcnt
for sample in samples:
idxes = [int(x) for x in fields[samples_vcf_idx[sample]].split(":")[0].split("/") if not x == "." ]
if len(idxes) > 0:
alleles = [variants[x] for x in idxes ]
# get IUPAC code if necessary (heterozygotes)
column.append( get_IUPAC_amb_code ( "".join(sorted(alleles)) ) )
else:
column.append("-")
# print column
out_columns.append(column)
# columns to rows/lines:
outlines = []
snpcnt
for idx, sample in enumerate(samples):
seq = "".join( [ x[idx] for x in out_columns ] )
out_line = sample + " " + seq
outlines.append(out_line)
with open(vcf_file + ".HyDe_data.txt", "w") as OUTFILE:
OUTFILE.write("\n".join(outlines) + "\n" )
print "Done! number of sites: ", snpcnt, " number of samples: ", len(samples)
def get_IUPAC_amb_code (nucs):
# nucs must be a "".joined string of the sorted list of nucleotides
# the IUPAC ambiguity codes:
# the keys in this IUPAC ambiguity code dictionary are produced by:
# "".join(sorted(list("NUCLEOTIDES"))) -> consistent lookup possible without additional loops!
combs = {'AC':'M', 'GT':'K', 'CG':'S', 'AT':'W', 'AG':'R', 'CT':'Y',
'ACG':'V', 'CGT':'B', 'AGT':'D', 'ACT':'H', 'ACGT':'N'
}
if len(set(nucs)) > 1:
# sometimes theres already an N, so need to except that and return N instead!
try:
out_code = combs[nucs]
except KeyError:
out_code = "N"
else:
out_code = list(nucs)[0]
return out_code
################################## MAIN
args = get_commandline_arguments ()
#print args
parse_vcf(args.vcf)