Skip to content

Commit

Permalink
Merge branch 'refactor-label-pipeline' into label-scalability
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Nov 17, 2023
2 parents d973da3 + f924776 commit d042e1b
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
26 changes: 14 additions & 12 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide
return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue

# predict_two_stage_bin_cluster but with the above reduction in confidence
def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
def predict_cluster_confidence_discounting(user_id, trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
return labels

confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
labels_n_list = eamur.predict_labels_with_n(user_id, trip_list, model_type, model_storage)
labels_list = []
for labels, n in labels_n_list:
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
labels_list.append(labels)
continue
confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
labels_list.append(labels)
return labels_list
61 changes: 37 additions & 24 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip

cleaned_trip_list = self.toPredictTrips
inferred_trip_list = []
results_list = []
ensemble_list = []

# Create list of inferred trips
for cleaned_trip in cleaned_trip_list:
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)
inferred_trip_list.append(inferred_trip)

# Computing outside loop by passing trip_list to ensure model loads once
# Run the algorithms and the ensemble, store results
results_list = self.compute_and_save_algorithms(user_id, inferred_trip_list)
ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_list)

for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list):
# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
Expand All @@ -75,28 +84,32 @@ def run_prediction_pipeline(self, user_id, time_range):
# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
def compute_and_save_algorithms(self, user_id, trip_list):
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
prediction_list = algorithm_fn(user_id, trip_list)
for trip, prediction in zip(trip_list, prediction_list):
lp = ecwl.Labelprediction()
lp.algorithm_id = algorithm_id
lp.trip_id = trip.get_id()
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
def compute_and_save_ensemble(self, trip_list, predictions_list):
il_list = []
for trip, predictions in zip(trip_list, prediction_list):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
il_list.append(il)
return il_list
23 changes: 16 additions & 7 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def update_trip_model(


def predict_labels_with_n(
trip: ecwc.Confirmedtrip,
user_id: UUID,
trip_list: List[ecwc.Confirmedtrip],
model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING,
model_storage = eamums.ModelStorage.DOCUMENT_DATABASE,
model_config = None):
Expand All @@ -110,13 +111,21 @@ def predict_labels_with_n(
:param model_config: optional configuration for model, for debugging purposes
:return: a list of predictions
"""
user_id = trip['user_id']
# user_id = trip['user_id']
# Start timer
start = time.process_time()
model = _load_stored_trip_model(user_id, model_type, model_storage, model_config)
if model is None:
return [], -1
else:
predictions, n = model.predict(trip)
return predictions, n
print(f"Inside predict_labels_n: model load time = {time.process_time() - start}")
# End timer
predictions_list = []
for trip in trip_list:
if model is None:
predictions_list.append(([], -1))
continue
else:
predictions, n = model.predict(trip)
predictions_list.append((predictions, n))
return predictions_list


def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]):
Expand Down

0 comments on commit d042e1b

Please sign in to comment.