-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfilters.py
105 lines (92 loc) · 3.05 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import scipy
import numpy as np
from scipy import signal
ACC_FS = 20
ECG_FS = 512
PPG_FS_125 = 125 # don't skip any received data, ambient data has been skipped by the watch
PPG_FS_512 = 512 # don't skip any received data, ambient data has been skipped by the watch
LOW_PASS_CUTOFF = 35
HIGH_PASS_CUTOFF = 0.5
def power_line_noise_filter(data, fs):
f0 = 65.0
Q = 30.0
w0 = f0 / (fs/2)
b, a = signal.iirnotch(w0, Q)
return scipy.signal.filtfilt(b, a, data)
def high_pass_filter(data, fs, cutoff):
nyq = fs / 2.0
normalized_cutoff = cutoff / nyq
b, a = signal.butter(3, normalized_cutoff, btype="highpass", analog=False)
return scipy.signal.filtfilt(b, a, data)
def low_pass_filter(data, fs, cutoff):
nyq = fs / 2.0
normalized_cutoff = cutoff / nyq
b, a = signal.butter(3, normalized_cutoff, btype="lowpass", analog=False)
return scipy.signal.filtfilt(b, a, data)
def ppg512_pl_filter(x):
""" A map function to perform power line noise filter against ppg512 data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = power_line_noise_filter(filtered, PPG_FS_512)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def ppg512_hp_filter(x):
""" A map function to perform high pass filter against ppg512 data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = high_pass_filter(filtered, PPG_FS_512, HIGH_PASS_CUTOFF)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def ppg512_lp_filter(x):
""" A map function to perform low pass filter against ppg512 data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = low_pass_filter(filtered, PPG_FS_512, LOW_PASS_CUTOFF)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def ecg_pl_filter(x):
""" A map function to perform power line noise filter against ecg data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = power_line_noise_filter(filtered, ECG_FS)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def ecg_hp_filter(x):
""" A map function to perform high pass filter against ecg data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = high_pass_filter(filtered, PPG_FS_512, HIGH_PASS_CUTOFF)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def ecg_lp_filter(x):
""" A map function to perform low pass filter against ecg data
Input: numpy array
Output: numpy array
"""
filtered = x[:,1]
filtered = low_pass_filter(filtered, PPG_FS_512, LOW_PASS_CUTOFF)
filtered = np.column_stack((x[:,0], filtered))
return filtered
def acc_mag_filter(x):
filtered = x[:,1]
print filtered
return x
def acc_flat(x):
""" A map function to simply the acc data format
Input: list, e.g. [(timestamp, (x, y, z))]
Output: list, e.g. [[timestamp, x, y, z]]
"""
y = []
for i in x:
y.append([i[0], i[1][0], i[1][1], i[1][2]])
return y