-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain_gui.py
258 lines (206 loc) · 9.04 KB
/
main_gui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import csv
import os
import struct
import sys
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import filedialog
from pyubx2 import UBXReader
from pyulog.core import ULog
from ubx_interface import CLIDPAIR_INV, MSGATTR, SYNC1, SYNC2, UbxParserState
class tkinterGUI:
def __init__(self) -> None:
root = tk.Tk()
root.attributes("-topmost", True)
root.title("PX4 gps_dump parser")
root.geometry("400x400+50+100")
frame_grid = tk.Frame(root, relief=tk.GROOVE, borderwidth=2)
frame_grid.pack(side=tk.TOP)
frame_file = tk.Frame(frame_grid)
frame_file.grid(column=0, row=0, padx=5, pady=5)
butn_select = tk.Button(
frame_file,
text="ULog Select",
command=self.select_dialog,
bg="#2196f3"
)
butn_select.pack(side=tk.LEFT, padx=10)
self.cap_filename = tk.StringVar()
textbox_cap = tk.Entry(
frame_file,
textvariable=self.cap_filename, width=30
)
textbox_cap.pack(side=tk.LEFT, padx=10)
# abs path to .ulog
self.ulog_abs_path = None
separator = ttk.Separator(frame_grid, orient="horizontal")
separator.grid(column=0, row=1, sticky="ew")
frame_pack = tk.Frame(root, relief=tk.GROOVE, borderwidth=2)
frame_pack.pack(side=tk.TOP, pady=5)
butn_parser = tk.Button(
frame_pack, text="GO", command=self.start_parser, bg="#2196f3"
)
butn_parser.pack(side=tk.LEFT, padx=10)
log_label = tk.Label(frame_pack, text="OUTPUT")
log_label.pack(side=tk.LEFT, padx=5)
self.logbox = tk.Text(frame_pack, width=40, height=35)
self.logbox.pack(side=tk.TOP)
root.mainloop()
def parsing(self):
"""Parsing gps_dump msgs and save to csv"""
output_file_prefix = os.path.basename(self.ulog_abs_path)
# strip ".ulog"
if output_file_prefix.lower().endswith(".ulg"):
output_file_prefix = output_file_prefix[:-4]
raw_msgs = self.extract_gps_dump(self.ulog_abs_path)
count_msgs = dict()
valid_msgs = dict()
for _, msg in raw_msgs.items():
if len(msg[0]) == 2:
# somehow only contains preamble 1 and preamble 2
continue
data = msg[0]
timestamp = msg[1]
if len(data) < 6:
# somehow data that is not enough to unpack
# the class, id, and payload length
continue
if data[0] == SYNC1 and data[1] == SYNC2:
try:
(cls, id, payload_len) = struct.unpack("<BBH", bytes(data[2:6]))
clid = CLIDPAIR_INV.get((cls, id))
except Exception as e:
# ignore unexpected data
print(e)
continue
if clid:
try:
# 6 bytes (sync1, sync2, class, id, len1, len2)
# 2 bytes (checksum)
parsed_msg = UBXReader.parse(
bytes(data[: 6 + payload_len + 2])
)
except Exception as e:
print(e)
continue
if clid in count_msgs:
count_msgs[clid] += 1
else:
count_msgs[clid] = 1
entries = []
header = []
try:
for item in MSGATTR[clid]:
if isinstance(item, dict) and "rgroup" in item:
rep_attr, rep_entries = item["rgroup"]
n_rep = getattr(parsed_msg, rep_attr)
for r in range(int(n_rep)):
for r_entry in rep_entries:
attr_name = r_entry + f"_{r + 1:02}"
entries.append(getattr(parsed_msg, attr_name))
header.append(attr_name)
else:
entries.append(getattr(parsed_msg, item))
header.append(item)
if clid not in valid_msgs:
valid_msgs[clid] = dict(timestamp=header)
valid_msgs[clid][timestamp] = entries
# valid_msgs[clid] = {timestamp: entries}
else:
valid_msgs[clid][timestamp] = entries
except Exception:
continue
print(f"gps_dump contains following messages \n{count_msgs}")
# print(valid_msgs)
for k, v in valid_msgs.items():
output_filename = output_file_prefix + f"_{k}.csv"
csv_fields = ["timestamp"] + v.get("timestamp")
with open(output_filename, mode="w", newline="") as csv_entry:
csv_writer = csv.writer(csv_entry, delimiter=",")
csv_writer.writerow(csv_fields)
for t, entry in v.items():
if t == "timestamp":
continue
csv_writer.writerow([t] + list(entry))
def extract_gps_dump(self, ulog_filename):
"""Get gps dump data"""
# looking into gps_dump
msg_filter = ["gps_dump"]
ulog = ULog(ulog_filename, msg_filter, disable_str_exceptions=False)
data = ulog.data_list
if len(data) == 0:
print(f"File {ulog_filename} does not have necessary msgs")
sys.exit(0)
if len(ulog.dropouts) > 0:
print(f"File has {ulog.dropouts} dropouts")
for d in data:
print(f"Found {len(d.field_data)} msgs in {d.name}")
gps_dump_data = data[0]
field_names = [f.field_name for f in gps_dump_data.field_data]
if "len" not in field_names or "data[0]" not in field_names:
print("gps dump: msgs are not correct")
sys.exit(-1)
msg_lens = gps_dump_data.data["len"]
print(f"gps_dump msg lens {len(msg_lens)}")
parser_state = UbxParserState.IDLE
synced_msgs = {}
msg_buffer = []
for i in range(len(gps_dump_data.data["timestamp"])):
for d in range(79):
data_byte = gps_dump_data.data["data[{}]".format(d)][i]
if parser_state == UbxParserState.IDLE:
msg_buffer = []
if data_byte == SYNC1:
parser_state = UbxParserState.SYNC
elif parser_state == UbxParserState.SYNC:
msg_buffer = []
if data_byte == SYNC2:
msg_buffer.append(SYNC1)
msg_buffer.append(SYNC2)
parser_state = UbxParserState.READING
elif parser_state == UbxParserState.READING:
if data_byte == SYNC1 and d + 1 < 79:
next_data_byte = gps_dump_data.data["data[{}]".format(d + 1)][i]
if next_data_byte == SYNC2:
synced_msgs[i] = (
msg_buffer,
gps_dump_data.data["timestamp"][i],
)
parser_state = UbxParserState.SYNC
else:
msg_buffer.append(data_byte)
elif (
data_byte == SYNC1
and d == 78
and i + 1 < len(gps_dump_data.data["timestamp"])
):
# last index in gps_dump data, look into next batch
next_data_byte = gps_dump_data.data["data[{}]".format(0)][i + 1]
if next_data_byte == SYNC2:
synced_msgs[i] = (
msg_buffer,
gps_dump_data.data["timestamp"][i],
)
parser_state = UbxParserState.SYNC
else:
msg_buffer.append(data_byte)
else:
msg_buffer.append(data_byte)
return synced_msgs
def start_parser(self):
if self.ulog_abs_path is None:
self.pprint("File not valid!")
return
self.pprint(f"Parsing {os.path.basename(self.ulog_abs_path)} file")
self.parsing()
def select_dialog(self):
path = os.path.abspath(os.path.dirname(__file__))
cap_filepath = filedialog.askopenfilename(initialdir=path)
self.ulog_abs_path = cap_filepath
filename = os.path.basename(cap_filepath)
self.cap_filename.set(filename)
def pprint(self, str):
self.logbox.insert(tk.END, str + "\n")
self.logbox.see(tk.END)
if __name__ == "__main__":
gps_dump_parser = tkinterGUI()