Skip to content

Commit

Permalink
Refactoring label inference pipeline for scalability issues
Browse files Browse the repository at this point in the history
Changes after refactoring label inference pipeline to load model only once. Will be merging with previous PR branch.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Nov 17, 2023
1 parent 81c4314 commit f924776
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
26 changes: 14 additions & 12 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide
return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue

# predict_two_stage_bin_cluster but with the above reduction in confidence
def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
def predict_cluster_confidence_discounting(user_id, trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
return labels

confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
labels_n_list = eamur.predict_labels_with_n(user_id, trip_list, model_type, model_storage)
labels_list = []
for labels, n in labels_n_list:
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
labels_list.append(labels)
continue
confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
labels_list.append(labels)
return labels_list
61 changes: 37 additions & 24 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip

cleaned_trip_list = self.toPredictTrips
inferred_trip_list = []
results_list = []
ensemble_list = []

# Create list of inferred trips
for cleaned_trip in cleaned_trip_list:
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)
inferred_trip_list.append(inferred_trip)

# Computing outside loop by passing trip_list to ensure model loads once
# Run the algorithms and the ensemble, store results
results_list = self.compute_and_save_algorithms(user_id, inferred_trip_list)
ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_list)

for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list):
# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
Expand All @@ -75,28 +84,32 @@ def run_prediction_pipeline(self, user_id, time_range):
# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
def compute_and_save_algorithms(self, user_id, trip_list):
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
prediction_list = algorithm_fn(user_id, trip_list)
for trip, prediction in zip(trip_list, prediction_list):
lp = ecwl.Labelprediction()
lp.algorithm_id = algorithm_id
lp.trip_id = trip.get_id()
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
def compute_and_save_ensemble(self, trip_list, predictions_list):
il_list = []
for trip, predictions in zip(trip_list, prediction_list):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
il_list.append(il)
return il_list
23 changes: 16 additions & 7 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def update_trip_model(


def predict_labels_with_n(
trip: ecwc.Confirmedtrip,
user_id: UUID,
trip_list: List[ecwc.Confirmedtrip],
model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING,
model_storage = eamums.ModelStorage.DOCUMENT_DATABASE,
model_config = None):
Expand All @@ -110,13 +111,21 @@ def predict_labels_with_n(
:param model_config: optional configuration for model, for debugging purposes
:return: a list of predictions
"""
user_id = trip['user_id']
# user_id = trip['user_id']
# Start timer
start = time.process_time()
model = _load_stored_trip_model(user_id, model_type, model_storage, model_config)
if model is None:
return [], -1
else:
predictions, n = model.predict(trip)
return predictions, n
print(f"Inside predict_labels_n: model load time = {time.process_time() - start}")
# End timer
predictions_list = []
for trip in trip_list:
if model is None:
predictions_list.append(([], -1))
continue
else:
predictions, n = model.predict(trip)
predictions_list.append((predictions, n))
return predictions_list


def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]):
Expand Down

0 comments on commit f924776

Please sign in to comment.