diff --git a/emission/analysis/classification/inference/labels/inferrers.py b/emission/analysis/classification/inference/labels/inferrers.py index c6b939671..38fdd31f3 100644 --- a/emission/analysis/classification/inference/labels/inferrers.py +++ b/emission/analysis/classification/inference/labels/inferrers.py @@ -141,18 +141,20 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue # predict_two_stage_bin_cluster but with the above reduction in confidence -def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): +def predict_cluster_confidence_discounting(user_id, trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None): # load application config model_type = eamtc.get_model_type() model_storage = eamtc.get_model_storage() - labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage) - if n <= 0: # No model data or trip didn't match a cluster - logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") - return labels - - confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) - logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") - - labels = copy.deepcopy(labels) - for l in labels: l["p"] *= confidence_coeff - return labels + labels_n_list = eamur.predict_labels_with_n(user_id, trip_list, model_type, model_storage) + labels_list = [] + for labels, n in labels_n_list: + if n <= 0: # No model data or trip didn't match a cluster + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") + labels_list.append(labels) + continue + confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") + labels = copy.deepcopy(labels) + for l in labels: l["p"] *= confidence_coeff + labels_list.append(labels) + return labels_list diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index e7bda7b2c..e6520afba 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -55,15 +55,24 @@ def run_prediction_pipeline(self, user_id, time_range): self.ts = esta.TimeSeries.get_time_series(user_id) self.toPredictTrips = esda.get_entries( esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) - for cleaned_trip in self.toPredictTrips: - # Create an inferred trip + + cleaned_trip_list = self.toPredictTrips + inferred_trip_list = [] + results_list = [] + ensemble_list = [] + + # Create list of inferred trips + for cleaned_trip in cleaned_trip_list: cleaned_trip_dict = copy.copy(cleaned_trip)["data"] inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict) - - # Run the algorithms and the ensemble, store results - results = self.compute_and_save_algorithms(inferred_trip) - ensemble = self.compute_and_save_ensemble(inferred_trip, results) + inferred_trip_list.append(inferred_trip) + + # Computing outside loop by passing trip_list to ensure model loads once + # Run the algorithms and the ensemble, store results + results_list = self.compute_and_save_algorithms(user_id, inferred_trip_list) + ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_list) + for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list): # Put final results into the inferred trip and store it inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] @@ -75,28 +84,32 @@ def run_prediction_pipeline(self, user_id, time_range): # This is where the labels for a given trip are actually predicted. # Though the only information passed in is the trip object, the trip object can provide the # user_id and other potentially useful information. - def compute_and_save_algorithms(self, trip): + def compute_and_save_algorithms(self, user_id, trip_list): predictions = [] for algorithm_id, algorithm_fn in primary_algorithms.items(): - prediction = algorithm_fn(trip) - lp = ecwl.Labelprediction() - lp.trip_id = trip.get_id() - lp.algorithm_id = algorithm_id - lp.prediction = prediction - lp.start_ts = trip["data"]["start_ts"] - lp.end_ts = trip["data"]["end_ts"] - self.ts.insert_data(self.user_id, "inference/labels", lp) - predictions.append(lp) + prediction_list = algorithm_fn(user_id, trip_list) + for trip, prediction in zip(trip_list, prediction_list): + lp = ecwl.Labelprediction() + lp.algorithm_id = algorithm_id + lp.trip_id = trip.get_id() + lp.prediction = prediction + lp.start_ts = trip["data"]["start_ts"] + lp.end_ts = trip["data"]["end_ts"] + self.ts.insert_data(self.user_id, "inference/labels", lp) + predictions.append(lp) return predictions # Combine all our predictions into a single ensemble prediction. # As a placeholder, we just take the first prediction. # TODO: implement a real combination algorithm. - def compute_and_save_ensemble(self, trip, predictions): - il = ecwl.Labelprediction() - il.trip_id = trip.get_id() - il.start_ts = trip["data"]["start_ts"] - il.end_ts = trip["data"]["end_ts"] - (il.algorithm_id, il.prediction) = ensemble(trip, predictions) - self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) - return il + def compute_and_save_ensemble(self, trip_list, predictions_list): + il_list = [] + for trip, predictions in zip(trip_list, prediction_list): + il = ecwl.Labelprediction() + il.trip_id = trip.get_id() + il.start_ts = trip["data"]["start_ts"] + il.end_ts = trip["data"]["end_ts"] + (il.algorithm_id, il.prediction) = ensemble(trip, predictions) + self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) + il_list.append(il) + return il_list diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index e3e2b1c4e..37605a158 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -97,7 +97,8 @@ def update_trip_model( def predict_labels_with_n( - trip: ecwc.Confirmedtrip, + user_id: UUID, + trip_list: List[ecwc.Confirmedtrip], model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING, model_storage = eamums.ModelStorage.DOCUMENT_DATABASE, model_config = None): @@ -110,13 +111,21 @@ def predict_labels_with_n( :param model_config: optional configuration for model, for debugging purposes :return: a list of predictions """ - user_id = trip['user_id'] + # user_id = trip['user_id'] + # Start timer + start = time.process_time() model = _load_stored_trip_model(user_id, model_type, model_storage, model_config) - if model is None: - return [], -1 - else: - predictions, n = model.predict(trip) - return predictions, n + print(f"Inside predict_labels_n: model load time = {time.process_time() - start}") + # End timer + predictions_list = [] + for trip in trip_list: + if model is None: + predictions_list.append(([], -1)) + continue + else: + predictions, n = model.predict(trip) + predictions_list.append((predictions, n)) + return predictions_list def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]):