-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdexcom_receiver.py
125 lines (106 loc) · 4.21 KB
/
dexcom_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import time
from glucose import GlucoseValue
import threading
import logging
from usbreceiver import constants
from usbreceiver.readdata import Dexcom
class DexcomReceiverSession():
def __init__(self, callback, usb_reset_cmd = None):
self.logger = logging.getLogger('DEXPY')
self.callback = callback
self.device = None
self.timer = None
self.lock = threading.RLock()
self.initial_backfill_executed = False
self.last_gv = None
self.system_time_offset = None
self.usb_reset_cmd = usb_reset_cmd
self.ts_usb_reset = time.time() + 360
def start_monitoring(self):
self.on_timer()
def on_timer(self):
with self.lock:
if not self.ensure_connected():
self.set_timer(15)
elif self.read_glucose_values():
self.ts_usb_reset = time.time() + 360
self.set_timer(30)
else:
if self.usb_reset_cmd is not None:
ts_now = time.time()
if ts_now > self.ts_usb_reset:
self.logger.debug('performing usb reset')
os.system(self.usb_reset_cmd)
ts_now = time.time()
self.ts_usb_reset = ts_now + 360
self.set_timer(10)
def ensure_connected(self):
try:
if self.device is None:
port = Dexcom.FindDevice()
if port is None:
self.logger.warning("Dexcom receiver not found")
return False
else:
self.device = Dexcom(port)
self.system_time_offset = self.get_device_time_offset()
return True
except Exception as e:
self.logger.warning("Error reading from usb device\n" + str(e))
self.device = None
self.system_time_offset = None
return False
def set_timer(self, seconds):
self.timer = threading.Timer(seconds, self.on_timer)
self.timer.setDaemon(True)
self.logger.debug("timer set to %d seconds" % seconds)
self.timer.start()
def stop_monitoring(self):
with self.lock:
self.timer.cancel()
def read_glucose_values(self, ts_cut_off: float = None):
try:
if ts_cut_off is None:
if self.initial_backfill_executed:
ts_cut_off = time.time() - 3 * 60 * 60
else:
ts_cut_off = time.time() - 24 * 60 * 60
records = self.device.iter_records('EGV_DATA')
new_value_received = False
for rec in records:
if not rec.display_only:
gv = self._as_gv(rec)
if self.last_gv is None or self.last_gv.st != gv.st:
self.last_gv = gv
new_value_received = True
self.callback([gv])
break
if new_value_received:
for rec in records:
if not rec.display_only:
gv = self._as_gv(rec)
if gv.st >= ts_cut_off:
self.callback([gv])
else:
break
for rec in self.device.iter_records('BACKFILLED_EGV'):
if not rec.display_only:
gv = self._as_gv(rec)
if gv.st >= ts_cut_off:
self.callback([gv])
else:
break
self.initial_backfill_executed = True
return new_value_received
except Exception as e:
self.logger.warning("Error reading from usb device\n" + str(e))
return False
def get_device_time_offset(self):
now_time = time.time()
device_time = self.device.ReadSystemTime()
return now_time - device_time
def _as_gv(self, record):
st = record.meter_time + self.system_time_offset
direction = record.full_trend & constants.EGV_TREND_ARROW_MASK
return GlucoseValue(None, None, st, record.glucose, direction)