forked from Lekensteyn/el4000
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathel4000.py
executable file
·199 lines (173 loc) · 7.55 KB
/
el4000.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#!/usr/bin/env python
# Utility to interpret files from the Voltcraft Energy Logger 4000 (as sold by
# Conrad).
#
# Copyright (C) 2014 Peter Wu <[email protected]>
import os, sys
from argparse import ArgumentParser
import datetime
import logging
from defs import info, data_hdr, data, setup, SETUP_MAGIC, STARTCODE
import printers
_logger = logging.getLogger(__name__)
def process_setup(filename, printer, setup_args):
# Original setup template, default to empty
setup_old = None
try:
size = os.path.getsize(filename)
if size != 0 and size != setup.size():
raise RuntimeError(('Setup file {0} must non-existent, empty or of ' +
'size {1}, but found {2}').format(filename, setup.size(), size))
if size == setup.size():
with open(filename, 'rb') as f:
setup_old = f.read(size)
if len(setup_old) != setup.size():
_logger.warn('Unable to read setup file')
except os.error:
# File does not exist?
pass
# If setup file is not initialized, use an empty one
if not setup_old or len(setup_old) != setup.size():
setup_old = setup.size() * b'\x00'
# Unpack old file, ignoring any input errors
old_t = setup.unpack(setup_old, validate=False)
if not setup_args:
# No setup parameters? Just show current values then.
printers.print_namedtuple(old_t, setup)
else:
# Parameters were given. Create mutable dict from old tuples such that
# the fields can be modified.
t = dict([(k, v) for k, v in zip(old_t._fields, old_t)])
for arg in setup_args:
if not '=' in arg:
_logger.error('Option %s is missing value, skipping', arg)
continue
name, val = arg.split('=', 1)
if not name in setup.names:
_logger.error('Invalid setup key: {0}'.format(name))
else:
print('Changing {0} from {1} to {2}'.format(name, t[name], val))
t[name] = float(val)
# Build new file contents
setup_new = setup.pack(t)
new_t = setup.unpack(setup_new, validate=False)
print('Setup file: ', filename)
printers.print_namedtuple(new_t, setup)
# If there are changes, write them away
if setup_new != setup_old:
_logger.info('Writing new file')
with open(filename, 'wb') as f:
f.write(setup_new)
else:
_logger.info('No changes, not writing file')
def process_file(filename, printer, dt, data_only):
with open(filename, 'rb') as f:
size = os.fstat(f.fileno()).st_size
if size == info.size():
# Info files
t = info.parse_from_file(f)
# Initialize time from info file
dt[0] = datetime.datetime(2000 + t.init_date_year,
t.init_date_month, t.init_date_day,
t.init_time_hour, t.init_time_minute)
if not data_only:
printer.print_info(t)
else:
# Data files.
# First, test whether this file is not a setup file
buf = f.read(len(SETUP_MAGIC))
if buf == SETUP_MAGIC:
_logger.warn('Setup file is ignored. Use --setup option instead')
return
eof = 4 * b'\xff'
while True:
if len(buf) < len(eof):
buf += f.read(len(eof) - len(buf))
if not buf:
break
elif buf == eof[0:len(buf)]:
# End of file code (or short read at the end)
#sys.stdout.flush()
#_logger.info('EOF detected, skipping')
#continue
break
if buf[0:len(STARTCODE)] == STARTCODE:
# Not data, but header before data
buf += f.read(data_hdr.size() - len(buf))
t = data_hdr.unpack(buf)
# New time reference!
dt[0] = datetime.datetime(2000 + t.record_year,
t.record_month, t.record_day,
t.record_hour, t.record_minute)
printer.print_data_header(t)
else:
buf += f.read(data.size() - len(buf))
t = data.unpack(buf)
# For time reference
date_str = dt[0].strftime('%Y-%m-%d %H:%M')
printer.print_data(t, date=date_str)
# Assume that this is called for every minute
dt[0] += datetime.timedelta(minutes=1)
# Clear buffer since it is processed
buf = b''
verbosities = [
logging.CRITICAL,
logging.ERROR,
logging.WARNING, # Default
logging.INFO,
logging.DEBUG
]
available_printers = {
'base': printers.BasePrinter,
'raw': printers.RawPrinter,
'csv': printers.CSVPrinter,
'watt': printers.EffectivePowerPrinter,
'va': printers.ApparentPowerPrinter
}
parser = ArgumentParser(description='Energy Logger 4000 utility')
parser.add_argument('-p', '--printer', choices=available_printers.keys(),
default='base',
help="Output formatter (default '%(default)s')")
parser.add_argument('-d', '--delimiter', default=',',
help="Output delimiter for CSV output (default '%(default)s')")
parser.add_argument('-v', '--verbose', action='count',
default=verbosities.index(logging.WARNING),
help='Increase logging level (twice for extra verbose)')
parser.add_argument('-s', '--setup', metavar='key=value', nargs='*',
help='Process a setupel3.bin file. Optional parameters \
can be given to set a field (-s unit_id=1 for example). \
If no parameters are given, the current values are printed')
parser.add_argument('-o', '--data-only', action='store_true',
help='Use info files only for updating the initial \
timestamp for data files, do not print their contents')
parser.add_argument('files', metavar='binfile', nargs='+',
help='info or data files (.bin) from SD card. If --setup \
is given, then this is the output file (and input for \
defaults). The order of files are significant when a \
timestamp is involved')
if __name__ == '__main__':
args = parser.parse_args()
# Set log level on root
args.verbose = min(args.verbose, len(verbosities) - 1)
logging.basicConfig(level=verbosities[args.verbose])
myprinter = available_printers[args.printer]
files_count = len(args.files)
if args.setup is not None:
if files_count != 1:
_logger.error('Only one file can be specified for set-up')
sys.exit(1)
# Unknown date and time, initialize with something low.
dt = [datetime.datetime(1970, 1, 1)]
for filename in args.files:
try:
printer = myprinter(filename, separator=args.delimiter)
except TypeError:
printer = myprinter(filename)
# Treat setup specially, it acts as input and output file
if args.setup is not None:
process_setup(args.files[0], myprinter, args.setup)
else:
# Display current filename for multiple files
if files_count > 1 and not args.data_only:
print('# ' + filename)
process_file(filename, printer, dt, args.data_only)