diff --git a/ActivityAnalyses/gait_analysis.py b/ActivityAnalyses/gait_analysis.py index f4e4949d..c1e2c9de 100644 --- a/ActivityAnalyses/gait_analysis.py +++ b/ActivityAnalyses/gait_analysis.py @@ -1,6 +1,6 @@ """ --------------------------------------------------------------------------- - OpenCap processing: gaitAnalysis.py + OpenCap processing: gait_analysis.py --------------------------------------------------------------------------- Copyright 2023 Stanford University and the Authors diff --git a/ActivityAnalyses/squat_analysis.py b/ActivityAnalyses/squat_analysis.py new file mode 100644 index 00000000..eeb46089 --- /dev/null +++ b/ActivityAnalyses/squat_analysis.py @@ -0,0 +1,502 @@ +""" + --------------------------------------------------------------------------- + OpenCap processing: squat_analysis.py + --------------------------------------------------------------------------- + + Copyright 2024 Stanford University and the Authors + + Author(s): Antoine Falisse, Carmichael Ong + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import sys +sys.path.append('../') + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks, peak_widths +from matplotlib import pyplot as plt + +from utilsKinematics import kinematics +import opensim + +class squat_analysis(kinematics): + + def __init__(self, session_dir, trial_name, n_repetitions=-1, + lowpass_cutoff_frequency_for_coordinate_values=-1, + trimming_start=0, trimming_end=0): + + # Inherit init from kinematics class. + super().__init__( + session_dir, + trial_name, + lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values) + + # We might want to trim the start/end of the trial to remove bad data. + self.trimming_start = trimming_start + self.trimming_end = trimming_end + + # Marker data load and filter. + self.markerDict = self.get_marker_dict( + session_dir, + trial_name, + lowpass_cutoff_frequency = lowpass_cutoff_frequency_for_coordinate_values) + + # Coordinate values. + self.coordinateValues = self.get_coordinate_values() + + # Body transforms + self.bodyTransformDict = self.get_body_transform_dict() + + # Making sure time vectors of marker and coordinate data are the same. + if not np.allclose(self.markerDict['time'], self.coordinateValues['time'], atol=.001, rtol=0): + raise Exception('Time vectors of marker and coordinate data are not the same.') + + if not np.allclose(self.bodyTransformDict['time'], self.coordinateValues['time'], atol=.001, rtol=0): + raise Exception('Time vectors of body transofrms and coordinate data are not the same.') + + # Trim marker, body transforms, and coordinate data. + if self.trimming_start > 0: + self.idx_trim_start = np.where(np.round(self.markerDict['time'] - self.trimming_start,6) <= 0)[0][-1] + self.markerDict['time'] = self.markerDict['time'][self.idx_trim_start:] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][self.idx_trim_start:,:] + self.bodyTransformDict['time'] = self.bodyTransformDict['time'][self.idx_trim_start:] + for body in self.bodyTransformDict['body_transforms']: + self.bodyTransformDict['body_transforms'][body] = \ + self.bodyTransformDict['body_transforms'][body][self.idx_trim_start:] + self.coordinateValues = self.coordinateValues.iloc[self.idx_trim_start:] + + if self.trimming_end > 0: + self.idx_trim_end = np.where(np.round(self.markerDict['time'],6) <= np.round(self.markerDict['time'][-1] - self.trimming_end,6))[0][-1] + 1 + self.markerDict['time'] = self.markerDict['time'][:self.idx_trim_end] + for marker in self.markerDict['markers']: + self.markerDict['markers'][marker] = self.markerDict['markers'][marker][:self.idx_trim_end,:] + self.bodyTransformDict['time'] = self.bodyTransformDict['time'][self.idx_trim_end:] + for body in self.bodyTransformDict['body_transforms']: + self.bodyTransformDict['body_transforms'][body] = \ + self.bodyTransformDict['body_transforms'][body][self.idx_trim_end:] + self.coordinateValues = self.coordinateValues.iloc[:self.idx_trim_end] + + # Segment squat repetitions. + self.squatEvents = self.segment_squat(n_repetitions=n_repetitions, visualizeSegmentation=True) + self.nRepetitions = np.shape(self.squatEvents['eventIdxs'])[0] + + # Initialize variables to be lazy loaded. + self._comValues = None + + # Time + self.time = self.coordinateValues['time'].to_numpy() + + # Compute COM trajectory. + def comValues(self): + if self._comValues is None: + self._comValues = self.get_center_of_mass_values() + if self.trimming_start > 0: + self._comValues = self._comValues.iloc[self.idx_trim_start:] + if self.trimming_end > 0: + self._comValues = self._comValues.iloc[:self.idx_trim_end] + return self._comValues + + def get_squat_events(self): + + return self.squatEvents + + def compute_scalars(self, scalarNames, return_all=False): + + # Verify that scalarNames are methods in squat_analysis. + method_names = [func for func in dir(self) if callable(getattr(self, func))] + possibleMethods = [entry for entry in method_names if 'compute_' in entry] + + if scalarNames is None: + print('No scalars defined, these methods are available:') + print(*possibleMethods) + return + + nonexistant_methods = [entry for entry in scalarNames if 'compute_' + entry not in method_names] + + if len(nonexistant_methods) > 0: + raise Exception(str(['compute_' + a for a in nonexistant_methods]) + ' does not exist in squat_analysis class.') + + scalarDict = {} + for scalarName in scalarNames: + thisFunction = getattr(self, 'compute_' + scalarName) + scalarDict[scalarName] = {} + (scalarDict[scalarName]['value'], + scalarDict[scalarName]['units']) = thisFunction(return_all=return_all) + + return scalarDict + + + def segment_squat(self, n_repetitions=-1, peak_proportion_threshold=0.7, + peak_width_rel_height=0.95, peak_distance_sec=0.5, + toe_height_threshold=0.05, + visualizeSegmentation=False): + + pelvis_ty = self.coordinateValues['pelvis_ty'].to_numpy() + dt = np.mean(np.diff(self.time)) + + # Identify minimums. + pelvSignal = np.array(-pelvis_ty - np.min(-pelvis_ty)) + pelvRange = np.abs(np.max(pelvis_ty) - np.min(pelvis_ty)) + peakThreshold = peak_proportion_threshold * pelvRange + idxMinPelvTy,_ = find_peaks(pelvSignal, prominence=peakThreshold, + distance=peak_distance_sec/dt) + peakWidths = peak_widths(pelvSignal, idxMinPelvTy, + rel_height=peak_width_rel_height) + + # Store start and end indices. + startEndIdxs = [] + for start_ips, end_ips in zip(peakWidths[2], peakWidths[3]): + startEndIdxs.append([start_ips, end_ips]) + startEndIdxs = np.rint(startEndIdxs).astype(int) + + # Limit the number of repetitions. + if n_repetitions != -1: + startEndIdxs = startEndIdxs[-n_repetitions:] + + # Store start and end event times + eventTimes = self.time[startEndIdxs] + + if visualizeSegmentation: + plt.figure() + plt.plot(-pelvSignal) + for c_v, val in enumerate(startEndIdxs): + plt.plot(val, -pelvSignal[val], marker='o', markerfacecolor='k', + markeredgecolor='none', linestyle='none', + label='Start/End rep') + if c_v == 0: + plt.legend() + plt.hlines(-peakWidths[1], peakWidths[2], peakWidths[3], color='k', + label='peak start/end') + plt.xlabel('Frames') + plt.ylabel('Position [m]') + plt.title('Vertical pelvis position') + plt.draw() + + # Detect squat type (double leg, single leg right, single leg left) + # Use toe markers + eventTypes = [] + markersDict = self.markerDict['markers'] + + for eventIdx in startEndIdxs: + lToeYMean = np.mean(markersDict['L_toe_study'][eventIdx[0]:eventIdx[1]+1, 1]) + rToeYMean = np.mean(markersDict['r_toe_study'][eventIdx[0]:eventIdx[1]+1, 1]) + + if lToeYMean - rToeYMean > toe_height_threshold: + eventTypes.append('single_leg_r') + elif rToeYMean - lToeYMean > toe_height_threshold: + eventTypes.append('single_leg_l') + else: + eventTypes.append('double_leg') + + # Output. + squatEvents = { + 'eventIdxs': startEndIdxs, + 'eventTimes': eventTimes, + 'eventNames':['repStart','repEnd'], + 'eventTypes': eventTypes} + + return squatEvents + + def compute_peak_angle(self, coordinate, peak_type="maximum", return_all=False): + + # Verify that the coordinate exists. + if coordinate not in self.coordinateValues.columns: + raise Exception(coordinate + ' does not exist in coordinate values. Verify the name of the coordinate.') + + # Compute max angle for each repetition. + peak_angles = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + if peak_type == "maximum": + peak_angles[i] = np.max(self.coordinateValues[coordinate].to_numpy()[rep_range[0]:rep_range[1]+1]) + elif peak_type == "minimum": + peak_angles[i] = np.min(self.coordinateValues[coordinate].to_numpy()[rep_range[0]:rep_range[1]+1]) + else: + raise Exception('peak_type must be "maximum" or "minimum".') + + # Average across all strides. + peak_angle_mean = np.mean(peak_angles) + peak_angle_std = np.std(peak_angles) + + # Define units. + units = 'deg' + + if return_all: + return peak_angles, units + else: + return peak_angle_mean, peak_angle_std, units + + def compute_ratio_peak_angle(self, coordinate_a, coordinate_b, peak_type="maximum", return_all=False): + + peak_angles_a, units_a = self.compute_peak_angle(coordinate_a, peak_type=peak_type, return_all=True) + peak_angles_b, units_b = self.compute_peak_angle(coordinate_b, peak_type=peak_type, return_all=True) + + # Verify that units are the same. + if units_a != units_b: + raise Exception('Units of the two coordinates are not the same.') + + ratio_angles = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + ratio_angles[i] = peak_angles_a[i] / peak_angles_b[i] * 100 + + # Average across all strides. + ratio_angle_mean = np.mean(ratio_angles) + ratio_angle_std = np.std(ratio_angles) + + # Define units + units = '%' + + if return_all: + return ratio_angles, units + else: + return ratio_angle_mean, ratio_angle_std, units + + def compute_range_of_motion(self, coordinate, return_all=False): + + # Verify that the coordinate exists. + if coordinate not in self.coordinateValues.columns: + raise Exception(coordinate + ' does not exist in coordinate values. Verify the name of the coordinate.') + + # Compute max angle for each repetition. + ranges_of_motion = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + ranges_of_motion[i] = (np.max(self.coordinateValues[coordinate].to_numpy()[rep_range[0]:rep_range[1]+1]) - + np.min(self.coordinateValues[coordinate].to_numpy()[rep_range[0]:rep_range[1]+1])) + + # Average across all strides. + range_of_motion_mean = np.mean(ranges_of_motion) + range_of_motion_std = np.std(ranges_of_motion) + + # Define units. + units = 'deg' + + if return_all: + return ranges_of_motion, units + else: + return range_of_motion_mean, range_of_motion_std, units + + def compute_squat_depth(self, return_all=False): + pelvis_ty = self.coordinateValues['pelvis_ty'].to_numpy() + + squat_depths = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + + pelvis_ty_range = pelvis_ty[rep_range[0]:rep_range[1]+1] + squat_depths[i] = np.max(pelvis_ty_range) - np.min(pelvis_ty_range) + + # Average across all squats. + squat_depths_mean = np.mean(squat_depths) + squat_depths_std = np.std(squat_depths) + + # Define units. + units = 'm' + + if return_all: + return squat_depths, units + else: + return squat_depths_mean, squat_depths_std, units + + def compute_trunk_lean_relative_to_pelvis(self, return_all=False): + torso_transforms = self.bodyTransformDict['body_transforms']['torso'] + pelvis_transforms = self.bodyTransformDict['body_transforms']['pelvis'] + + max_trunk_leans = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + + torso_transforms_range = torso_transforms[rep_range[0]:rep_range[1]+1] + pelvis_transforms_range = pelvis_transforms[rep_range[0]:rep_range[1]+1] + + trunk_leans_range = np.zeros(len(torso_transforms_range)) + for j in range(len(torso_transforms_range)): + y_axis = opensim.Vec3(0, 1, 0) + torso_y_in_ground = torso_transforms_range[j].xformFrameVecToBase(y_axis).to_numpy() + + z_axis = opensim.Vec3(0, 0, 1) + pelvis_z_in_ground = pelvis_transforms_range[j].xformFrameVecToBase(z_axis).to_numpy() + + acos_deg = np.rad2deg(np.arccos(np.dot(torso_y_in_ground, pelvis_z_in_ground))) + trunk_leans_range[j] = 90 - acos_deg + + # using absolute value for now. perhaps keep track of both positive + # and negative trunk lean angles (to try to detect a bad side?) + max_trunk_leans[i] = np.max(np.abs(trunk_leans_range)) + units = 'deg' + + if return_all: + return max_trunk_leans, units + + else: + trunk_lean_mean = np.mean(max_trunk_leans) + trunk_lean_std = np.std(max_trunk_leans) + return trunk_lean_mean, trunk_lean_std, units + + def compute_trunk_lean_relative_to_ground(self, return_all=False): + torso_transforms = self.bodyTransformDict['body_transforms']['torso'] + pelvis_transforms = self.bodyTransformDict['body_transforms']['pelvis'] + + max_trunk_leans = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + + torso_transforms_range = torso_transforms[rep_range[0]:rep_range[1]+1] + pelvis_transforms_range = pelvis_transforms[rep_range[0]:rep_range[1]+1] + + trunk_leans_range = np.zeros(len(torso_transforms_range)) + for j in range(len(torso_transforms_range)): + y_axis = opensim.Vec3(0, 1, 0) + torso_y_in_ground = torso_transforms_range[j].xformFrameVecToBase(y_axis).to_numpy() + + # find the heading of the pelvis (in the ground x-z plane) + x_axis = opensim.Vec3(1, 0, 0) + pelvis_x_in_ground = pelvis_transforms_range[j].xformFrameVecToBase(x_axis).to_numpy() + pelvis_x_in_ground_xz_plane = pelvis_x_in_ground + pelvis_x_in_ground_xz_plane[1] = 0 + pelvis_x_in_ground_xz_plane /= np.linalg.norm(pelvis_x_in_ground_xz_plane) + + # find the z-axis for comparison to the torso (z-axis in ground + # that is rotated to the heading of the pelvis) + rotated_z_axis = np.cross(pelvis_x_in_ground_xz_plane, np.array([0, 1, 0])) + + acos_deg = np.rad2deg(np.arccos(np.dot(torso_y_in_ground, rotated_z_axis))) + trunk_leans_range[j] = 90 - acos_deg + + # using absolute value for now. perhaps keep track of both positive + # and negative trunk lean angles (to try to detect a bad side?) + max_trunk_leans[i] = np.max(np.abs(trunk_leans_range)) + + units = 'deg' + + if return_all: + return max_trunk_leans, units + + else: + trunk_lean_mean = np.mean(max_trunk_leans) + trunk_lean_std = np.std(max_trunk_leans) + return trunk_lean_mean, trunk_lean_std, units + + def compute_trunk_flexion_relative_to_ground(self, return_all=False): + torso_transforms = self.bodyTransformDict['body_transforms']['torso'] + pelvis_transforms = self.bodyTransformDict['body_transforms']['pelvis'] + + max_trunk_flexions = np.zeros((self.nRepetitions)) + for i in range(self.nRepetitions): + rep_range = self.squatEvents['eventIdxs'][i] + + torso_transforms_range = torso_transforms[rep_range[0]:rep_range[1]+1] + pelvis_transforms_range = pelvis_transforms[rep_range[0]:rep_range[1]+1] + + trunk_flexions_range = np.zeros(len(torso_transforms_range)) + for j in range(len(torso_transforms_range)): + y_axis = opensim.Vec3(0, 1, 0) + torso_y_in_ground = torso_transforms_range[j].xformFrameVecToBase(y_axis).to_numpy() + + # find the heading of the pelvis (in the ground x-z plane) + x_axis = opensim.Vec3(1, 0, 0) + pelvis_x_in_ground = pelvis_transforms_range[j].xformFrameVecToBase(x_axis).to_numpy() + pelvis_x_in_ground_xz_plane = pelvis_x_in_ground + pelvis_x_in_ground_xz_plane[1] = 0 + pelvis_x_in_ground_xz_plane /= np.linalg.norm(pelvis_x_in_ground_xz_plane) + + acos_deg = np.rad2deg(np.arccos(np.dot(torso_y_in_ground, pelvis_x_in_ground_xz_plane))) + trunk_flexions_range[j] = 90 - acos_deg + + # using absolute value for now. perhaps keep track of both positive + # and negative trunk lean angles (to try to detect a bad side?) + max_trunk_flexions[i] = np.max(trunk_flexions_range) + + units = 'deg' + + if return_all: + return max_trunk_flexions, units + + else: + max_trunk_flexions_mean = np.mean(max_trunk_flexions) + max_trunk_flexions_std = np.std(max_trunk_flexions) + return max_trunk_flexions_mean, max_trunk_flexions_std, units + + def get_coordinates_segmented(self): + + colNames = self.coordinateValues.columns + data = self.coordinateValues.to_numpy(copy=True) + coordValuesSegmented = [] + for eventIdx in self.squatEvents['eventIdxs']: + coordValuesSegmented.append(pd.DataFrame(data=data[eventIdx[0]:eventIdx[1]], columns=colNames)) + + return coordValuesSegmented + + def get_center_of_mass_values_segmented(self): + + data = np.vstack((self.comValues()['x'],self.comValues()['y'],self.comValues()['z'])).T + colNames = ['com_x', 'com_y', 'com_z'] + comValuesSegmented = [] + for eventIdx in self.squatEvents['eventIdxs']: + comValuesSegmented.append(pd.DataFrame(data=data[eventIdx[0]:eventIdx[1]], columns=colNames)) + + return comValuesSegmented + + def get_coordinates_segmented_normalized_time(self): + + colNames = self.coordinateValues.columns + data = self.coordinateValues.to_numpy(copy=True) + coordValuesSegmentedNorm = [] + for eventIdx in self.squatEvents['eventIdxs']: + coordValues = data[eventIdx[0]:eventIdx[1]] + coordValuesSegmentedNorm.append(np.stack([np.interp(np.linspace(0,100,101), + np.linspace(0,100,len(coordValues)),coordValues[:,i]) \ + for i in range(coordValues.shape[1])],axis=1)) + + coordValuesTimeNormalized = {} + # Average. + coordVals_mean = np.mean(np.array(coordValuesSegmentedNorm),axis=0) + coordValuesTimeNormalized['mean'] = pd.DataFrame(data=coordVals_mean, columns=colNames) + # Standard deviation. + if self.nRepetitions > 2: + coordVals_sd = np.std(np.array(coordValuesSegmentedNorm), axis=0) + coordValuesTimeNormalized['sd'] = pd.DataFrame(data=coordVals_sd, columns=colNames) + else: + coordValuesTimeNormalized['sd'] = None + # Indiv. + coordValuesTimeNormalized['indiv'] = [pd.DataFrame(data=d, columns=colNames) for d in coordValuesSegmentedNorm] + + return coordValuesTimeNormalized + + def get_center_of_mass_segmented_normalized_time(self): + + data = np.vstack((self.comValues()['x'],self.comValues()['y'],self.comValues()['z'])).T + colNames = ['com_x', 'com_y', 'com_z'] + comValuesSegmentedNorm = [] + for eventIdx in self.squatEvents['eventIdxs']: + comValues = data[eventIdx[0]:eventIdx[1]] + comValuesSegmentedNorm.append(np.stack([np.interp(np.linspace(0,100,101), + np.linspace(0,100,len(comValues)),comValues[:,i]) \ + for i in range(comValues.shape[1])],axis=1)) + + comValuesTimeNormalized = {} + # Average. + comValues_mean = np.mean(np.array(comValuesSegmentedNorm),axis=0) + comValuesTimeNormalized['mean'] = pd.DataFrame(data=comValues_mean, columns=colNames) + # Standard deviation. + if self.nRepetitions > 2: + comValues_sd = np.std(np.array(comValuesSegmentedNorm), axis=0) + comValuesTimeNormalized['sd'] = pd.DataFrame(data=comValues_sd, columns=colNames) + else: + comValuesTimeNormalized['sd'] = None + # Indiv. + comValuesTimeNormalized['indiv'] = [pd.DataFrame(data=d, columns=colNames) for d in comValuesSegmentedNorm] + + return comValuesTimeNormalized + \ No newline at end of file diff --git a/Examples/example_squat_analysis.py b/Examples/example_squat_analysis.py new file mode 100644 index 00000000..6f6232a4 --- /dev/null +++ b/Examples/example_squat_analysis.py @@ -0,0 +1,247 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: example_squat_analysis.py + --------------------------------------------------------------------------- + Copyright 2024 Stanford University and the Authors + + Author(s): Antoine Falisse, Carmichael Ong + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Please contact us for any questions: https://www.opencap.ai/#contact + + This example shows how to run a kinematic analysis of squat data. + +''' + +import os +import sys +sys.path.append("../") +sys.path.append("../ActivityAnalyses") +import numpy as np + +from squat_analysis import squat_analysis +from utils import get_trial_id, download_trial +from utilsPlotting import plot_dataframe_with_shading + +# %% Paths. +baseDir = os.path.join(os.getcwd(), '..') +dataFolder = os.path.join(baseDir, 'Data') + +# %% User-defined variables. + +# example without KA +session_id = '1b380cb9-3c5c-498c-bc30-a1a379ffa04c' +#trial_name = 'bossu_squats' +trial_name = 'single_leg_squats_l' + +# example with KA +#session_id = 'e742eb1c-efbc-4c17-befc-a772150ca84d' +#trial_name = 'SLS_L1' + +# Select how many repetitions you'd like to analyze. Select -1 for all +# repetitions detected in the trial. +n_repetitions = -1 + +# Select lowpass filter frequency for kinematics data. +filter_frequency = 4 + +# %% Gait analysis. +# Get trial id from name. +trial_id = get_trial_id(session_id,trial_name) + +# Set session path. +sessionDir = os.path.join(dataFolder, session_id) + +# Download data. +trialName = download_trial(trial_id,sessionDir,session_id=session_id) + +# Init gait analysis. +squat = squat_analysis( + sessionDir, trialName, + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_repetitions=n_repetitions) + +# Detect squat type +squat_events = squat.get_squat_events() +eventTypes = squat.squatEvents['eventTypes'] +print('Detected {} total squats: {} single_leg_l, {} single_leg_r, {} double_leg.'.format( + len(eventTypes), eventTypes.count('single_leg_l'), + eventTypes.count('single_leg_r'), eventTypes.count('double_leg'))) +print('') + +unique_types = set(eventTypes) +if len(unique_types) < 1: + raise Exception('No squats detected.') +if len(unique_types) > 1: + raise Exception('More than one type of squat detected.') + +# Check if we can also analyze adduction +knee_adduction_names = ['knee_adduction_r', 'knee_adduction_l'] +analyze_knee_adduction = False +if all(x in squat.coordinates for x in knee_adduction_names): + analyze_knee_adduction = True + +# Example metrics +max_trunk_lean_ground_mean, max_trunk_lean_ground_std, max_trunk_lean_ground_units = squat.compute_trunk_lean_relative_to_ground() +max_trunk_flexion_mean, max_trunk_flexion_std, max_trunk_flexion_units = squat.compute_trunk_flexion_relative_to_ground() +squat_depth_mean, squat_depth_std, squat_depth_units = squat.compute_squat_depth() + +print('Squat metrics summary:') +print('Max trunk lean relative to ground: {} +/- {} {}'.format(np.round(max_trunk_lean_ground_mean,2), np.round(max_trunk_lean_ground_std,2), max_trunk_lean_ground_units)) +print('Max trunk flexion flexion to ground: {} +/- {} {}'.format(np.round(max_trunk_flexion_mean,2), np.round(max_trunk_flexion_std,2), max_trunk_flexion_units)) +print('Squat depth: {} +/- {} {}'.format(np.round(squat_depth_mean,2), np.round(squat_depth_std,2), squat_depth_units)) +print('') + +if eventTypes[0] == 'double_leg': + ratio_max_knee_flexion_angle_mean, ratio_max_knee_flexion_angle_std, ratio_max_knee_flexion_angle_unit = squat.compute_ratio_peak_angle('knee_angle_r', 'knee_angle_l') + ratio_max_hip_flexion_angle_mean, ratio_max_hip_flexion_angle_std, ratio_max_knee_flexion_angle_unit = squat.compute_ratio_peak_angle('hip_flexion_r', 'hip_flexion_l') + + +if eventTypes[0] == 'double_leg' or eventTypes[0] == 'single_leg_r': + max_hip_flexion_angle_r_mean, max_hip_flexion_angle_r_std, max_hip_flexion_angle_r_units = squat.compute_peak_angle('hip_flexion_r') + max_knee_flexion_angle_r_mean, max_knee_flexion_angle_r_std, max_knee_flexion_angle_r_units = squat.compute_peak_angle('knee_angle_r') + max_ankle_flexion_angle_r_mean, max_ankle_flexion_angle_r_std, max_ankle_flexion_angle_r_units = squat.compute_peak_angle('ankle_angle_r') + max_hip_adduction_angle_r_mean, max_hip_adduction_angle_r_std, max_hip_adduction_angle_r_units = squat.compute_peak_angle('hip_adduction_r') + rom_knee_flexion_angle_r_mean, rom_knee_flexion_angle_r_std, rom_knee_flexion_angle_r_units = squat.compute_range_of_motion('knee_angle_r') + + print('Right side summary:') + print('Peak hip flexion angle: {} +/- {} {}'.format(np.round(max_hip_flexion_angle_r_mean,2), np.round(max_hip_flexion_angle_r_std,2), max_hip_flexion_angle_r_units)) + print('Peak knee flexion angle: {} +/- {} {}'.format(np.round(max_knee_flexion_angle_r_mean,2), np.round(max_knee_flexion_angle_r_std,2), max_knee_flexion_angle_r_units)) + print('Peak ankle dorsiflexion angle: {} +/- {} {}'.format(np.round(max_ankle_flexion_angle_r_mean,2), np.round(max_ankle_flexion_angle_r_std,2), max_ankle_flexion_angle_r_units)) + + if analyze_knee_adduction: + max_knee_adduction_angle_r_mean, max_knee_adduction_angle_r_std, max_knee_adduction_angle_r_units = squat.compute_peak_angle('knee_adduction_r') + print('Peak knee adduction angle: {} +/- {} {}'.format(np.round(max_knee_adduction_angle_r_mean,2), np.round(max_knee_adduction_angle_r_std,2), max_knee_adduction_angle_r_units)) + + print('') + +if eventTypes[0] == 'double_leg' or eventTypes[0] == 'single_leg_l': + max_hip_flexion_angle_l_mean, max_hip_flexion_angle_l_std, max_hip_flexion_angle_l_units = squat.compute_peak_angle('hip_flexion_l') + max_knee_flexion_angle_l_mean, max_knee_flexion_angle_l_std, max_knee_flexion_angle_l_units = squat.compute_peak_angle('knee_angle_l') + max_ankle_flexion_angle_l_mean, max_ankle_flexion_angle_l_std, max_ankle_flexion_angle_l_units = squat.compute_peak_angle('ankle_angle_l') + max_hip_adduction_angle_l_mean, max_hip_adduction_angle_l_std, max_hip_adduction_angle_l_units = squat.compute_peak_angle('hip_adduction_l') + rom_knee_flexion_angle_l_mean, rom_knee_flexion_angle_l_std, rom_knee_flexion_angle_l_units = squat.compute_range_of_motion('knee_angle_l') + + print('Left side summary:') + print('Peak hip flexion angle: {} +/- {} {}'.format(np.round(max_hip_flexion_angle_l_mean,2), np.round(max_hip_flexion_angle_l_std,2), max_hip_flexion_angle_l_units)) + print('Peak knee flexion angle: {} +/- {} {}'.format(np.round(max_knee_flexion_angle_l_mean,2), np.round(max_knee_flexion_angle_l_std,2), max_knee_flexion_angle_l_units)) + print('Peak ankle dorsiflexion angle: {} +/- {} {}'.format(np.round(max_ankle_flexion_angle_l_mean,2), np.round(max_ankle_flexion_angle_l_std,2), max_ankle_flexion_angle_l_units)) + + if analyze_knee_adduction: + max_knee_adduction_angle_l_mean, max_knee_adduction_angle_l_std, max_knee_adduction_angle_l_units = squat.compute_peak_angle('knee_adduction_l') + print('Peak knee adduction angle: {} +/- {} {}'.format(np.round(max_knee_adduction_angle_l_mean,2), np.round(max_knee_adduction_angle_l_std,2), max_knee_adduction_angle_l_units)) + + print('') + +# Example metrics to json: aggregated over both legs +max_knee_flexion_angle_r_mean, max_knee_flexion_angle_r_std, _ = squat.compute_peak_angle('knee_angle_r') +max_knee_flexion_angle_l_mean, max_knee_flexion_angle_l_std, _ = squat.compute_peak_angle('knee_angle_l') +max_knee_flexion_angle_mean_mean = np.mean(np.array([max_knee_flexion_angle_r_mean, max_knee_flexion_angle_l_mean])) +max_knee_flexion_angle_mean_std = np.mean(np.array([max_knee_flexion_angle_r_std, max_knee_flexion_angle_l_std])) + +max_hip_flexion_angle_r_mean, max_hip_flexion_angle_r_std, _ = squat.compute_peak_angle('hip_flexion_r') +max_hip_flexion_angle_l_mean, max_hip_flexion_angle_l_std, _ = squat.compute_peak_angle('hip_flexion_l') +max_hip_flexion_angle_mean_mean = np.mean(np.array([max_hip_flexion_angle_r_mean, max_hip_flexion_angle_l_mean])) +max_hip_flexion_angle_mean_std = np.mean(np.array([max_hip_flexion_angle_r_std, max_hip_flexion_angle_l_std])) + +max_hip_adduction_angle_r_mean, max_hip_adduction_angle_r_std, _ = squat.compute_peak_angle('hip_adduction_r') +max_hip_adduction_angle_l_mean, max_hip_adduction_angle_l_std, _ = squat.compute_peak_angle('hip_adduction_l') +max_hip_adduction_angle_mean_mean = np.mean(np.array([max_hip_adduction_angle_r_mean, max_hip_adduction_angle_l_mean])) +max_hip_adduction_angle_mean_std = np.mean(np.array([max_hip_adduction_angle_r_std, max_hip_adduction_angle_l_std])) + +rom_knee_flexion_angle_r_mean, rom_knee_flexion_angle_r_std, _ = squat.compute_range_of_motion('knee_angle_r') +rom_knee_flexion_angle_l_mean, rom_knee_flexion_angle_l_std, _ = squat.compute_range_of_motion('knee_angle_l') +rom_knee_flexion_angle_mean_mean = np.mean(np.array([rom_knee_flexion_angle_r_mean, rom_knee_flexion_angle_l_mean])) +rom_knee_flexion_angle_mean_std = np.mean(np.array([rom_knee_flexion_angle_r_std, rom_knee_flexion_angle_l_std])) + +print('Aggregating over both legs:') +print('Peak knee flexion angle: {} +/- {} deg'.format(np.round(max_knee_flexion_angle_mean_mean,2), np.round(max_knee_flexion_angle_mean_std,2))) +print('Peak hip flexion angle: {} +/- {} deg'.format(np.round(max_hip_flexion_angle_mean_mean,2), np.round(max_hip_flexion_angle_mean_std,2))) +print('Peak hip adduction angle: {} +/- {} deg'.format(np.round(max_hip_adduction_angle_mean_mean,2), np.round(max_hip_adduction_angle_mean_std,2))) +print('ROM knee flexion angle: {} +/- {} deg'.format(np.round(rom_knee_flexion_angle_mean_mean,2), np.round(rom_knee_flexion_angle_mean_std,2))) + +squat_scalars = {} +squat_scalars['peak_knee_flexion_angle_mean'] = {'value': max_knee_flexion_angle_mean_mean} +squat_scalars['peak_knee_flexion_angle_mean']['label'] = 'Mean peak knee flexion angle (deg)' +squat_scalars['peak_knee_flexion_angle_mean']['colors'] = ["red", "yellow", "green"] +peak_knee_flexion_angle_threshold = 100 +squat_scalars['peak_knee_flexion_angle_mean']['min_limit'] = float(np.round(0.90*peak_knee_flexion_angle_threshold)) +squat_scalars['peak_knee_flexion_angle_mean']['max_limit'] = float(peak_knee_flexion_angle_threshold) + +squat_scalars['peak_knee_flexion_angle_std'] = {'value': max_knee_flexion_angle_mean_std} +squat_scalars['peak_knee_flexion_angle_std']['label'] = 'Std peak knee flexion angle (deg)' +squat_scalars['peak_knee_flexion_angle_std']['colors'] = ["green", "yellow", "red"] +std_threshold_min = 2 +std_threshold_max = 4 +squat_scalars['peak_knee_flexion_angle_std']['min_limit'] = float(std_threshold_min) +squat_scalars['peak_knee_flexion_angle_std']['max_limit'] = float(std_threshold_max) + +squat_scalars['peak_hip_flexion_angle_mean'] = {'value': max_hip_flexion_angle_mean_mean} +squat_scalars['peak_hip_flexion_angle_mean']['label'] = 'Mean peak hip flexion angle (deg)' +squat_scalars['peak_hip_flexion_angle_mean']['colors'] = ["red", "yellow", "green"] +peak_hip_flexion_angle_threshold = 100 +squat_scalars['peak_hip_flexion_angle_mean']['min_limit'] = float(np.round(0.90*peak_hip_flexion_angle_threshold)) +squat_scalars['peak_hip_flexion_angle_mean']['max_limit'] = float(peak_hip_flexion_angle_threshold) + +squat_scalars['peak_hip_flexion_angle_std'] = {'value': max_hip_flexion_angle_mean_std} +squat_scalars['peak_hip_flexion_angle_std']['label'] = 'Std peak hip flexion angle (deg)' +squat_scalars['peak_hip_flexion_angle_std']['colors'] = ["green", "yellow", "red"] +squat_scalars['peak_hip_flexion_angle_std']['min_limit'] = float(std_threshold_min) +squat_scalars['peak_hip_flexion_angle_std']['max_limit'] = float(std_threshold_max) + +squat_scalars['peak_knee_adduction_angle_mean'] = {'value': max_hip_adduction_angle_mean_mean} +squat_scalars['peak_knee_adduction_angle_mean']['label'] = 'Mean peak knee adduction angle (deg)' +squat_scalars['peak_knee_adduction_angle_mean']['colors'] = ["red", "green", "red"] +knee_adduction_angle_threshold = 5 +squat_scalars['peak_knee_adduction_angle_mean']['min_limit'] = float(-knee_adduction_angle_threshold) +squat_scalars['peak_knee_adduction_angle_mean']['max_limit'] = float(knee_adduction_angle_threshold) + +squat_scalars['peak_knee_adduction_angle_std'] = {'value': max_hip_adduction_angle_mean_std} +squat_scalars['peak_knee_adduction_angle_std']['label'] = 'Std peak knee adduction angle (deg)' +squat_scalars['peak_knee_adduction_angle_std']['colors'] = ["green", "yellow", "red"] +squat_scalars['peak_knee_adduction_angle_std']['min_limit'] = float(std_threshold_min) +squat_scalars['peak_knee_adduction_angle_std']['max_limit'] = float(std_threshold_max) + +squat_scalars['rom_knee_flexion_angle_mean'] = {'value': rom_knee_flexion_angle_mean_mean} +squat_scalars['rom_knee_flexion_angle_mean']['label'] = 'Mean range of motion knee flexion angle (deg)' +squat_scalars['rom_knee_flexion_angle_mean']['colors'] = ["red", "yellow", "green"] +rom_knee_flexion_angle_threshold_min = 85 +rom_knee_flexion_angle_threshold_max = 115 +squat_scalars['rom_knee_flexion_angle_mean']['min_limit'] = float(rom_knee_flexion_angle_threshold_min) +squat_scalars['rom_knee_flexion_angle_mean']['max_limit'] = float(rom_knee_flexion_angle_threshold_max) + +squat_scalars['rom_knee_flexion_angle_std'] = {'value': rom_knee_flexion_angle_mean_std} +squat_scalars['rom_knee_flexion_angle_std']['label'] = 'Std range of motion knee flexion angle (deg)' +squat_scalars['rom_knee_flexion_angle_std']['colors'] = ["green", "yellow", "red"] +squat_scalars['rom_knee_flexion_angle_std']['min_limit'] = float(std_threshold_min) +squat_scalars['rom_knee_flexion_angle_std']['max_limit'] = float(std_threshold_max) + +# dump to json +import json +with open('squat_scalars.json', 'w') as fp: + json.dump(squat_scalars, fp) + + +# %% Example plots. +squat_joint_kinematics = squat.get_coordinates_segmented_normalized_time() +squat_com_kinematics = squat.get_center_of_mass_segmented_normalized_time() + +plot_dataframe_with_shading( + [squat_joint_kinematics['mean']], + [squat_joint_kinematics['sd']], + xlabel = '% repetition', + title = 'Joint kinematics (m or deg)') + +plot_dataframe_with_shading( + [squat_com_kinematics['mean']], + [squat_com_kinematics['sd']], + xlabel = '% repetition', + title = 'center of mass kinematics (m)') \ No newline at end of file diff --git a/utilsKinematics.py b/utilsKinematics.py index f2629a67..d75b0c39 100644 --- a/utilsKinematics.py +++ b/utilsKinematics.py @@ -65,7 +65,7 @@ def __init__(self, sessionDir, trialName, motionPath = os.path.join(sessionDir, 'OpenSimData', 'Kinematics', '{}.mot'.format(trialName)) - # Create time-series table with coordinate values. + # Create time-series table with coordinate values. self.table = opensim.TimeSeriesTable(motionPath) tableProcessor = opensim.TableProcessor(self.table) self.columnLabels = list(self.table.getColumnLabels()) @@ -105,7 +105,7 @@ def __init__(self, sessionDir, trialName, self.Qds[:,i] = splineD1(self.time) # Coordinate accelerations. splineD2 = spline.derivative(n=2) - self.Qdds[:,i] = splineD2(self.time) + self.Qdds[:,i] = splineD2(self.time) # Add coordinate speeds to table. columnLabel_speed = columnLabel[:-5] + 'speed' self.table.appendColumn( @@ -121,14 +121,14 @@ def __init__(self, sessionDir, trialName, existingLabels = self.table.getColumnLabels() for stateVariableNameStr in stateVariableNamesStr: if not stateVariableNameStr in existingLabels: - vec_0 = opensim.Vector([0] * self.table.getNumRows()) + vec_0 = opensim.Vector([0] * self.table.getNumRows()) self.table.appendColumn(stateVariableNameStr, vec_0) # Number of muscles. self.nMuscles = 0 self.forceSet = self.model.getForceSet() for i in range(self.forceSet.getSize()): - c_force_elt = self.forceSet.get(i) + c_force_elt = self.forceSet.get(i) if 'Muscle' in c_force_elt.getConcreteClassName(): self.nMuscles += 1 @@ -182,6 +182,33 @@ def get_marker_dict(self, session_dir, trial_name, return markerDict + def get_body_transform_dict(self): + + states_traj = self.stateTrajectory() + states_table = states_traj.exportToTable(self.model) + + body_dict = {} + body_dict['time'] = np.array(states_table.getIndependentColumn()) + + body_list = [] + body_transforms_dict = {} + for body in self.model.getBodySet(): + body_list.append(body.getName()) + body_transforms_dict[body.getName()] = [] + body_dict['body_names'] = body_list + + for i in range(self.table.getNumRows()): + this_state = states_traj[i] + self.model.realizePosition(this_state) + + for body in self.model.getBodySet(): + this_body_transform = body.getTransformInGround(this_state) + body_transforms_dict[body.getName()].append(this_body_transform) + + body_dict['body_transforms'] = body_transforms_dict + + return body_dict + def rotate_marker_dict(self, markerDict, euler_angles): # euler_angles is a dictionary with keys being the axes of rotation # (x, y, z) and values being the angles in degrees. e.g. {'x': 90, 'y': 180}