diff --git a/ecg_data_manager/modules/utils.py b/ecg_data_manager/modules/utils.py index f4e6c71..df599c9 100644 --- a/ecg_data_manager/modules/utils.py +++ b/ecg_data_manager/modules/utils.py @@ -83,6 +83,63 @@ def process_ecg_data(db: Client, data: pd.DataFrame) -> pd.DataFrame: return processed_data +def fetch_symptoms_single(observation_data: dict) -> dict: + """ + Extracts symptoms information from the components array of a single observation data + dictionary where HKElectrocardiogram.SymptomsStatus is 'present'. Returns 'UserId', + 'ResourceId', and 'Symptoms'. This data is suitable for merging with a main DataFrame. + + Args: + observation_data: A dictionary containing observation data. + + Returns: + dict: A dictionary with 'UserId', 'ResourceId', and 'Symptoms' if symptoms are present. + Returns an empty dictionary if no symptoms are present or if SymptomsStatus is + not 'present'. + """ + components = observation_data.get("component", []) + user_id = observation_data.get(ColumnNames.USER_ID.value) + resource_id = observation_data.get(ColumnNames.RESOURCE_ID.value) + + # Check for SymptomsStatus + symptoms_status = next( + ( + comp.get("valueString") + for comp in components + if comp.get("code", {}).get("coding", [{}])[0].get("code") + == "HKElectrocardiogram.SymptomsStatus" + ), + None, + ) + + # If SymptomsStatus is "present", extract symptoms + if symptoms_status == "present": + symptoms = [ + f"{comp.get('code', {}).get('coding', [{}])[0].get('display')}:" + f"{comp.get('valueString')}" + for comp in components + if "HKCategoryTypeIdentifier" + in comp.get("code", {}).get("coding", [{}])[0].get("code", "") + ] + if symptoms: # Check if symptoms list is not empty + return { + ColumnNames.USER_ID.value: user_id, + ColumnNames.RESOURCE_ID.value: resource_id, + "Symptoms": ", ".join(symptoms), + } + return { + ColumnNames.USER_ID.value: user_id, + ColumnNames.RESOURCE_ID.value: resource_id, + "Symptoms": "No symptoms.", + } + + return { + ColumnNames.USER_ID.value: user_id, + ColumnNames.RESOURCE_ID.value: resource_id, + "Symptoms": "No symptoms.", + } + + def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches db: Client, input_df: pd.DataFrame, @@ -91,7 +148,7 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches ) -> pd.DataFrame: """ Fetch diagnosis data from the Firestore database and extend the input DataFrame with new - columns. + columns, including a 'Symptoms' column. Args: db (Client): Firestore database client. @@ -101,7 +158,7 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches ECG_DATA_SUBCOLLECTION. Returns: - pd.DataFrame: Extended DataFrame containing the fetched diagnosis data. + pd.DataFrame: Extended DataFrame containing the fetched diagnosis data and symptoms. """ collection_ref = db.collection(collection_name) resources = [] @@ -126,89 +183,81 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches ) ).stream() + # Process the FHIR documents and store observation data for doc in fhir_docs: observation_data = doc.to_dict() - observation_data["user_id"] = user_id - observation_data["ResourceId"] = doc.id + observation_data[ColumnNames.USER_ID.value] = user_id + observation_data[ColumnNames.RESOURCE_ID.value] = doc.id + + # Extract effective period start time + effective_start = observation_data.get("effectivePeriod", {}).get( + "start", "" + ) + if effective_start: + observation_data["EffectiveDateTimeHHMM"] = effective_start + + # Extract symptoms information HERE + symptoms_info = fetch_symptoms_single(observation_data) + if symptoms_info: + observation_data.update(symptoms_info) + + # Extract diagnosis information from diagnosis subcollection diagnosis_docs = list( doc.reference.collection(DIAGNOSIS_DATA_SUBCOLLECTION).stream() ) - if diagnosis_docs: - physician_initials_list = [ - diagnosis_doc.to_dict().get("physicianInitials") - for diagnosis_doc in diagnosis_docs - if diagnosis_doc.to_dict().get("physicianInitials") - ] - observation_data["NumberOfReviewers"] = len(physician_initials_list) - observation_data["Reviewers"] = physician_initials_list - else: - observation_data["NumberOfReviewers"] = 0 - observation_data["Reviewers"] = [] - + physician_initials_list = [ + diagnosis_doc.to_dict().get("physicianInitials", "") + for diagnosis_doc in diagnosis_docs + ] + observation_data["NumberOfReviewers"] = len(physician_initials_list) + observation_data["Reviewers"] = physician_initials_list observation_data["ReviewStatus"] = ( "Incomplete review" if observation_data["NumberOfReviewers"] < 3 else "Complete review" ) - resources.append(observation_data) + # Add new columns from diagnosis documents for i, diagnosis_doc in enumerate(diagnosis_docs): - if diagnosis_doc: - doc_data = diagnosis_doc.to_dict() - for key, value in doc_data.items(): - col_name = f"Diagnosis{i+1}_{key}" - new_columns.add(col_name) - observation_data[col_name] = value + doc_data = diagnosis_doc.to_dict() + for key, value in doc_data.items(): + col_name = f"Diagnosis{i+1}_{key}" + new_columns.add(col_name) + observation_data[col_name] = value + + resources.append(observation_data) except Exception as e: # pylint: disable=broad-exception-caught print(f"An error occurred while processing user {user_id}: {str(e)}") + fetched_df = pd.DataFrame(resources) + + # Define columns for the final DataFrame columns = [ ColumnNames.USER_ID.value, - "ResourceId", + ColumnNames.RESOURCE_ID.value, "EffectiveDateTimeHHMM", ColumnNames.APPLE_ELECTROCARDIOGRAM_CLASSIFICATION.value, "NumberOfReviewers", "Reviewers", "ReviewStatus", + "Symptoms", ] + list(new_columns) - data = [] - - for resource in resources: - row_data = [ - resource.get(ColumnNames.USER_ID.value, None), - resource.get("id", None), - ( - resource.get("effectivePeriod", {}).get("start", None) - if resource.get("effectivePeriod") - else None - ), - ( - resource.get("component", [{}])[2].get("valueString", None) - if len(resource.get("component", [])) > 2 - else None - ), - resource.get("NumberOfReviewers", None), - resource.get("Reviewers", None), - resource.get("ReviewStatus", None), - ] - for col in new_columns: - row_data.append(resource.get(col, None)) - - data.append(row_data) - - fetched_df = pd.DataFrame(data, columns=columns) + fetched_df = fetched_df.reindex( + columns=columns, fill_value=None + ) # Ensure columns are in order and filled - # Extend the input_df with new columns based on ResourceId + # Extend the input DataFrame with new columns extended_df = input_df.copy() additional_columns = [ - "ResourceId", + ColumnNames.RESOURCE_ID.value, "NumberOfReviewers", "Reviewers", "ReviewStatus", "EffectiveDateTimeHHMM", + "Symptoms", ] + list(new_columns) for col in additional_columns: @@ -216,8 +265,10 @@ def fetch_diagnosis_data( # pylint: disable=too-many-locals, too-many-branches extended_df[col] = None for index, row in extended_df.iterrows(): - resource_id = row["ResourceId"] - fetched_row = fetched_df[fetched_df["ResourceId"] == resource_id] + resource_id = row[ColumnNames.RESOURCE_ID.value] + fetched_row = fetched_df[ + fetched_df[ColumnNames.RESOURCE_ID.value] == resource_id + ] if not fetched_row.empty: for col in additional_columns: if col in fetched_row.columns: diff --git a/ecg_data_manager/modules/visualization.py b/ecg_data_manager/modules/visualization.py index c974a65..94b9f1a 100644 --- a/ecg_data_manager/modules/visualization.py +++ b/ecg_data_manager/modules/visualization.py @@ -7,11 +7,9 @@ # """ -This module provides classes and associated functions for viewing, filtering, and -analyzing ECG data. The primary class, ECGDataViewer, allows users to interact with -ECG data through a graphical interface, enabling the review, diagnosis, and visualization -of ECG recordings. The module also includes functions for plotting single lead ECGs and -configuring the appearance of the plots. +This module provides classes and functions for viewing, filtering, and analyzing ECG data. The +primary class, ECGDataViewer, allows users to interact with ECG data through a graphical interface, +enabling the review, diagnosis, and visualization of ECG recordings. """ # Standard library imports @@ -351,6 +349,8 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals else "Unknown" ) + symptoms = row.get("Symptoms", "No symptoms reported.") + group_class = row[AGE_GROUP_STRING] user_id_html = widgets.HTML( value=f"{group_class} " @@ -360,11 +360,15 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals heart_rate_html = widgets.HTML( value=f"Average HR: {heart_rate} bpm" ) + + symptoms_html = widgets.HTML( + value=f"Symptoms: {symptoms}" + ) + interpretation_html = widgets.HTML( value="Classification: " ) - # Conditional color for non-sinusRhythm classifications if ecg_interpretation != SINUS_RHYTHM: interpretation_html.value += ( f"{ecg_interpretation}" @@ -374,7 +378,7 @@ def plot_single_ecg(self, row): # pylint: disable=too-many-locals interpretation_html.value += "" - display(user_id_html, heart_rate_html, interpretation_html) + display(user_id_html, heart_rate_html, symptoms_html, interpretation_html) # Add review status diagnosis_collection_ref = ( @@ -474,7 +478,6 @@ def hide_widgets(b): # pylint: disable=unused-argument ) ) - # Hide the widgets if not all selections have been made initials = ( self.initials_dropdown.value if self.initials_dropdown.value != WidgetStrings.OTHER.value @@ -489,10 +492,8 @@ def hide_widgets(b): # pylint: disable=unused-argument tracing_quality_dropdown.layout.visibility = "hidden" notes_textarea.layout.visibility = "hidden" - # Attach the hide_widgets function to the button's on_click event save_button.on_click(hide_widgets) - # Display the widgets widgets_box = widgets.VBox( [ diagnosis_dropdown,