Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Completeness Module #141

Open
wants to merge 14 commits into
base: development
Choose a base branch
from
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ h5py
matplotlib
packaging
pyyaml
plotly
LukasAdamowicz marked this conversation as resolved.
Show resolved Hide resolved
avro
2 changes: 2 additions & 0 deletions src/skdh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from skdh import sit2stand
from skdh import features
from skdh import context
from skdh import completeness

__skdh_version__ = __version__

Expand All @@ -55,5 +56,6 @@
"features",
"utility",
"context",
"completeness",
"__skdh_version__",
]
25 changes: 25 additions & 0 deletions src/skdh/completeness/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from skdh.completeness.complete import *
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
from skdh.completeness.helpers import *
from skdh.completeness.parse import *
from skdh.completeness.utils import *
from skdh.completeness.visualizations import *

__all__ = (
LukasAdamowicz marked this conversation as resolved.
Show resolved Hide resolved
"completeness_pipe",
"convert_sfreq_to_sampling_interval",
"from_unix",
"to_unix",
"find_nan_regions",
"vivalink_parse_ecg_data",
"vivalink_parse_acc_data",
"empatica_parse_acc_data",
"compute_summary_metrics",
"dic_to_str",
"clean_df",
"compute_completeness_master",
"find_charging_periods",
"find_wear_periods",
"calculate_completeness_timescale",
"compute_completeness",
"truncate_data_dic"
)
127 changes: 127 additions & 0 deletions src/skdh/completeness/complete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
import pickle
import numpy as np
from skdh.base import BaseProcess
from skdh.completeness.utils import load_subject_data, compute_completeness_master, compute_summary_metrics, check_hyperparameters
from skdh.completeness.visualizations import visualize_overview_plot, plot_completeness, plot_data_gaps, plot_timescale_completeness


class completeness_pipe(BaseProcess):
LukasAdamowicz marked this conversation as resolved.
Show resolved Hide resolved
r"""
Pipeline for assessing signal completeness.
"""

def __init__(
LukasAdamowicz marked this conversation as resolved.
Show resolved Hide resolved
self,
subject,
subject_folder,
device_name,
fpath_output,
measures,
resample_width_mins=5,
gap_size_mins=30,
ranges=None,
data_gaps=None,
time_periods=None,
timescales=None):

check_hyperparameters(subject,
subject_folder,
device_name,
fpath_output,
measures,
resample_width_mins,
gap_size_mins,
ranges,
data_gaps,
time_periods,
timescales)

super().__init__(
subject=subject,
subject_folder=subject_folder,
device_name=device_name,
fpath_output=fpath_output,
measures=measures,
resample_width=resample_width_mins,
gap_size_mins=gap_size_mins,
ranges=ranges,
data_gaps=data_gaps,
time_periods=time_periods,
timescales=timescales)

self.subject = subject
self.subject_folder = subject_folder
self.device_name = device_name
self.fpath_output = fpath_output
self.measures = measures
self.resample_width_mins = resample_width_mins
self.gap_size_mins = gap_size_mins
self.ranges = ranges
self.data_gaps = data_gaps
self.time_periods = time_periods
self.timescales = timescales

# @handle_process_returns(results_to_kwargs=True)
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
def predict(self,
generate_figures=True,
**kwargs):
"""
Compute completeness and save results. Create and save figures if generate_figures is True.
"""
super().predict(
expect_days=False,
expect_wear=False,
**kwargs,
)
data_dic = load_subject_data(self.subject_folder, self.subject, self.device_name, self.measures, self.ranges)
LukasAdamowicz marked this conversation as resolved.
Show resolved Hide resolved

if self.time_periods is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this has to be handled here, but we might need to give some thought how time periods would be specified for multiiple different subjects, ie can they only be specified via hours from recording start, etc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time periods are specified by the user as times in numpy datetime format, or as 'daily' in which case it's recording time start plus 24 h, 48h, etc. Since the pipeline works on each subject independently, any batch processing of multiple subjects would just be handled by the user looping through all subjects. Not sure if this is consistent with the rest of SKDH processing but I think so?

self.time_periods = [(np.min([x.index[0] for x in data_dic['Measurement Streams'].values()]),
np.max([x.index[-1] for x in data_dic['Measurement Streams'].values()]))]
elif self.time_periods == 'daily':
t0 = np.min([x.index[0] for x in data_dic['Measurement Streams'].values()])
t1 = np.max([x.index[-1] for x in data_dic['Measurement Streams'].values()])
no_days = int(np.ceil((t1 - t0) / np.timedelta64(24, 'h')))
self.time_periods = [(t0 + k * np.timedelta64(24, 'h'), t0 + (k + 1) * np.timedelta64(24, 'h') if
t1 > t0 + (k + 1) * np.timedelta64(24, 'h') else t1) for k in range(no_days)]

# Compute completeness metrics
completeness_master_dic = compute_completeness_master(data_dic, data_gaps=self.data_gaps, time_periods=self.time_periods,
timescales=self.timescales)

# Save raw results and summary metrics
os.system('mkdir ' + self.fpath_output + '/' + data_dic['Subject ID'])
os.system('mkdir ' + self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name)
pickle.dump(completeness_master_dic,
open(self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/raw_completeness', 'wb'))
df_summary = compute_summary_metrics(completeness_master_dic, self.time_periods, self.timescales,
self.measures) # Daily wear time, charging, data gaps, native completeness
df_summary.to_csv(self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/summary_metrics.csv')

# Create and save visualizations of results
figures = {}
if generate_figures:
fpath_dir = self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/'
overview_fig = visualize_overview_plot(data_dic=data_dic, fpath=fpath_dir + 'overview.html',
resample_width_mins=self.resample_width_mins, gap_size_mins=self.gap_size_mins,
time_periods=self.time_periods)
reason_color_dic = {'normal' : 'blue', 'unknown' : 'magenta', 'charging' : 'orange', 'no_wear' : 'red'}
figures.update({'overview': overview_fig})
completeness_fig = plot_completeness(completeness_master_dic, data_dic, self.time_periods,
fpath=fpath_dir + 'completeness', reason_color_dic=reason_color_dic)
figures.update({'Completeness': completeness_fig})
if not self.data_gaps is None:
data_gap_fig = plot_data_gaps(completeness_master_dic, data_dic, self.data_gaps, self.time_periods,
fpath=fpath_dir + 'data_gaps', reason_color_dic=reason_color_dic)
figures.update({'data_gaps': data_gap_fig})
if not self.timescales is None:
timescale_compl_fig = plot_timescale_completeness(completeness_master_dic, data_dic, self.time_periods,
self.timescales,
fpath=fpath_dir + 'timescale_completeness',
reason_color_dic=reason_color_dic)
figures.update({'timescale_completeness': timescale_compl_fig})

return {"Completeness": {'data_dic' : data_dic, 'compl_dic' : completeness_master_dic,
'df_summary' : df_summary, 'figures' : figures}}

59 changes: 59 additions & 0 deletions src/skdh/completeness/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import datetime
import numpy as np

def convert_sfreq_to_sampling_interval(x):
"""
Convert sfreq in Hz (samples/second) to sampling interval (timedelta64[ns]).
:param x: np.array | float | int, sampling frequency in samples/second (Hz).
:return: timedelta64[ns], sampling interval as a time delta.
"""
return np.timedelta64(1, 'ns') * (1 / x * 10 ** 9)


def from_unix(ts, time_unit='s', utc_offset=0):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this not be handled by a pandas.to_timestamp call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be handled by pd.to_timedelta but I'm not sure if there's any benefit of using that over numpy since it returns the same data format

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the benefit would not be requiring a function that has to be maintained by us when one exists

Copy link
Collaborator Author

@johnsam7 johnsam7 May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sry thought you meant the np.timedelta64 function. pd.DataFrame.to_timestamp does not do the same thing as this whole function.

"""
Utility function to convert a unix time to timestamp.

Parameters
----------
ts : unix time in seconds or series of unix times
time_unit : 's' ! 'ms' Str, 's' if ts is ~10 ** 9, 'ms' if ts ~10 ** 12
utc_offset : utc_offset due time zone in hours

Return
------
float

"""
time_start = np.datetime64("1970-01-01T00:00:00")
return ts * np.timedelta64(1, time_unit) + time_start + np.timedelta64(utc_offset * 60 ** 2, "s")


def to_unix(ts, time_unit='s', utc_offset=0):
"""
Utility function to convert a timestamp to unix time.

Parameters
----------
ts : timestamp or series of timestamps
time_unit : 's' ! 'ms' Str, 's' if ts is ~10 ** 9, 'ms' if ts ~10 ** 12

Return
------
float

"""
time_start = np.datetime64("1970-01-01T00:00:00")
return ((ts - time_start) - np.timedelta64(utc_offset, 'h')) / np.timedelta64(1, time_unit)


def find_nan_regions(arr):
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
nan_inds = np.where(np.isnan(arr))[0]
if len(nan_inds) > 0:
end_nan_inds = np.append(nan_inds[np.where(~(np.diff(nan_inds) == 1))[0]], nan_inds[-1])
start_nan_inds = np.insert(nan_inds[::-1][np.where(~(np.diff(nan_inds[::-1]) == -1))[0]][::-1], 0, nan_inds[0])
nan_region_lengths = end_nan_inds - start_nan_inds + 1
else:
start_nan_inds, end_nan_inds, nan_region_lengths = np.array([]), np.array([]), np.array([])
return start_nan_inds, end_nan_inds, nan_region_lengths

12 changes: 12 additions & 0 deletions src/skdh/completeness/meson.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
py3.install_sources(
[
'__init__.py',
'complete.py',
'helpers.py',
'parse.py',
'utils.py',
'visualizations.py',
],
pure: false,
subdir: 'skdh/completeness',
)
134 changes: 134 additions & 0 deletions src/skdh/completeness/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import numpy as np
import pandas as pd
import warnings
import re
import os
from skdh.completeness.helpers import from_unix, convert_sfreq_to_sampling_interval

def vivalink_parse_ecg_data(df, ecg_key, s_freq):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These device specific parsers should be wrapped into the CSV reader somehow, or if not possible, we can discuss how to include them or rely on the end-user to generate the inputs for this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parsers are more like helper functions for me or other people working with these devices to help getting from native to the forced input format, they're not actually a part of the pipeline. The native format changes a lot depending on software versions, data capture platform etc. so they should not be integrated directly in the pipeline or they will fail every single time a new data batch comes. We could possibly move this to a private extension repo if we don't want to have it here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think private would be better. Or we modify existing IO functions if there is a way to add small functionality bits that we need. MultiReader might already handle some/most of this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the file from the repo, I believe we have an internal private skdh extension repo already that we can add it to?

if type(s_freq) == int:
n_samples = np.repeat(s_freq, repeats=len(df))
sfreqs = np.array((1 / n_samples * 10 ** 9).astype(int), dtype='<m8[ns]')
elif type(s_freq) == str:
n_samples = df[s_freq]
sfreqs = np.array([], dtype='<m8[ns]')
for c in range(len(df)):
sfreqs = np.append(sfreqs, np.repeat(np.timedelta64(int(1 / df.iloc[c, df.columns.get_loc(s_freq)] * 10 ** 9), 'ns'), repeats=df[s_freq].iloc[c]))
else:
raise TypeError('n_samples has to be a string if a key in df, or an int')
ecg_signal = []
device = np.array([])
df_ecg = pd.DataFrame()
times = np.array([])
for k in range(len(df)):
ecg_records = df[ecg_key].iloc[k].replace('[', '').replace(']', '').split(',')
ecg_segment = np.array([np.nan] * n_samples[k])
if not len(ecg_records) == n_samples[k]:
warnings.warn('Not correct number of samples in df entry. Presuming they are in the beginning of the'
' measurement period.')
for c, ele in enumerate(df[ecg_key].iloc[k].replace('[', '').replace(']', '').split(',')):
if 'null' not in ele:
ecg_segment[c] = float(ele)
else:
ecg_segment[c] = np.nan
ecg_signal.append(ecg_segment)
times = np.append(times, np.array(df['Record Time'][k]).repeat(n_samples[k]) + (np.arange(n_samples[k]) * 1 / n_samples[k] * 1000))
device = np.append(device, np.array(df['Device ID'][k]).repeat(n_samples[k]))
ecg_signal = np.array(ecg_signal).ravel()
df_ecg['Time Unix (ms)'] = times
df_ecg['ecg_raw'] = ecg_signal
df_ecg['Sampling_Freq'] = sfreqs
df_ecg['Device ID'] = device

return df_ecg


def vivalink_parse_acc_data(df, acc_key):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see previous


acc_samp = []
for k in range(len(df)):
sensor_info = df['Sensor Info'].iloc[k].split(';')
acc_samp.append(int(
re.sub("[^0-9]", "", sensor_info[np.where(['accSamplingFrequency' in x for x in sensor_info])[0][0]])))

acc_signal = np.array([]).reshape((0, 3))
device = np.array([])
times = np.array([])
sfreq = np.array([])
for k in range(len(df)):
acc_records = np.array(df[acc_key].iloc[k].split(','))
x_inds = np.array(['x' in x for x in acc_records])
y_inds = np.array(['y' in x for x in acc_records])
z_inds = np.array(['z' in x for x in acc_records])
assert np.sum(x_inds) == np.sum(y_inds),\
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
'Number of x and y acceleration data points not the same!'
assert np.sum(y_inds) == np.sum(z_inds),\
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
'Number of y and z acceleration data points not the same!'
acc_data = np.array([re.sub("[^0-9]", "", x) for x in acc_records], dtype=float)
x_acc = acc_data[x_inds]
y_acc = acc_data[y_inds]
z_acc = acc_data[z_inds]
acc_mat = np.array([x_acc, y_acc, z_acc]).T
if not acc_mat.shape[0] == acc_samp[k]:
warnings.warn('Not correct number of samples in df entry. Presuming they are in the beginning of the'
' measurement period.')
acc_segment = np.array([[[np.nan] * 3] * acc_samp[k]]).squeeze()
acc_segment[:len(acc_mat), :] = acc_mat
acc_signal = np.vstack((acc_signal, acc_segment))
device = np.append(device, np.array(df['Device ID'][k]).repeat(acc_samp[k]))
times = np.append(times, np.array(df['Record Time'][k]).repeat(acc_samp[k]) + (np.arange(acc_samp[k]) * 1/acc_samp[k] * 1000))
sfreq = np.append(sfreq, np.ones(acc_samp[k]) * acc_samp[k])
triaxial_acc = []
for c, axis in enumerate(['x', 'y', 'z']):
df_acc = pd.DataFrame()
df_acc['Time Unix (ms)'] = times
df_acc['acc_raw_' + axis] = acc_signal[:, c]
df_acc['Sampling Frequency (Hz)'] = sfreq
df_acc['Device ID'] = device
triaxial_acc.append(df_acc)
return triaxial_acc


def empatica_parse_acc_data(fdir_raw):
johnsam7 marked this conversation as resolved.
Show resolved Hide resolved
from avro.datafile import DataFileReader
from avro.io import DatumReader

fnames = os.listdir(fdir_raw)
times_all = np.array([])
acc_raw_all = np.array([]).reshape((0, 3))
devices_all = []
sfreq_all = []
for fname in fnames:
if fname[-5:] == '.avro':
reader = DataFileReader(open(fdir_raw + fname, "rb"), DatumReader())
data_list = []
for data_frac in reader:
data_list.append(data_frac)
reader.close()
if not data_list[0]['rawData']['accelerometer']['timestampStart'] == 0: # Avoid empty files
times = data_list[0]['rawData']['accelerometer']['timestampStart'] / 10 ** 6 + data_list[0]['rawData'][
'accelerometer']['samplingFrequency'] ** -1 * np.arange(len(data_list[0]['rawData']['accelerometer']['x']))
acc_raw = np.array([data_list[0]['rawData']['accelerometer']['x'],
data_list[0]['rawData']['accelerometer']['y'],
data_list[0]['rawData']['accelerometer']['z']]).T
acc_raw = acc_raw * data_list[0]['rawData']['accelerometer']['imuParams']['physicalMax'] / \
data_list[0]['rawData']['accelerometer']['imuParams']['digitalMax']
devices_all = devices_all + [data_list[0]['deviceSn']] * len(times)
sfreq_all = sfreq_all + [data_list[0]['rawData']['accelerometer']['samplingFrequency']] * len(times)
times_all = np.concatenate((times_all, times))
acc_raw_all = np.concatenate((acc_raw_all, acc_raw), axis=0)
sort_inds = np.argsort(times_all)
times_all = times_all[sort_inds]
acc_raw_all = acc_raw_all[sort_inds]
devices_all = np.array(devices_all)[sort_inds]
sfreq_all = np.array(sfreq_all)[sort_inds]

dfs = []
for c, axis in enumerate(['x', 'y', 'z']):
df_acc_raw = pd.DataFrame(data={'acc_raw_' + axis : acc_raw_all[:, c]}, index=times_all)
df_acc_raw['Device ID'] = devices_all
df_acc_raw['Sampling_Freq'] = convert_sfreq_to_sampling_interval(sfreq_all)
df_acc_raw.index = from_unix(df_acc_raw.index, time_unit='s')
dfs.append(df_acc_raw)

return dfs
Loading
Loading