-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathModel.py
56 lines (42 loc) · 1.88 KB
/
Model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import numpy as np
from Pacer import Pacer
from blehrm.interface import BlehrmClientInterface
import logging
from PySide6.QtCore import QObject, Signal
from analysis.HrvAnalyser import HrvAnalyser
from analysis.BreathAnalyser import BreathAnalyser
class Model(QObject):
sensor_connected = Signal()
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
self.sensor_client = None
self.pacer = Pacer()
self.hrv_analyser = HrvAnalyser()
self.breath_analyser = BreathAnalyser()
async def set_and_connect_sensor(self, sensor: BlehrmClientInterface):
self.sensor_client = sensor
await self.sensor_client.connect()
await self.sensor_client.get_device_info()
await self.sensor_client.print_device_info()
await self.sensor_client.start_ibi_stream(callback=self.handle_ibi_callback)
await self.sensor_client.start_acc_stream(callback=self.handle_acc_callback)
self.sensor_connected.emit()
async def disconnect_sensor(self):
await self.sensor_client.disconnect()
def handle_ibi_callback(self, data):
t, ibi = data
self.hrv_analyser.update(t, ibi)
def handle_acc_callback(self, data):
'''
Handles reading accelerometer for the sensor
Updates the breath_analyser which calculates breathing rate
One each breath, hrv_analyser calculates metrics
'''
t = data[0]
acc = data[1:]
self.breath_analyser.update_chest_acc(t, acc)
# Breath-by-breath analysis
if self.breath_analyser.is_end_of_breath and not self.breath_analyser.br_history.is_empty():
t_range = self.breath_analyser.get_last_breath_t_range()
self.hrv_analyser.update_breath_by_breath_metrics(t_range)