diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..4851be5d6 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -1,5 +1,5 @@ { - "model_type": "greedy", + "model_type": "forest", "model_storage": "document_database", "minimum_trips": 14, "model_parameters": { @@ -8,6 +8,24 @@ "similarity_threshold_meters": 500, "apply_cutoff": false, "incremental_evaluation": false + }, + "forest": { + "loc_feature" : "coordinates", + "radius": 100, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":null, + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":true, + "random_state":42, + "use_start_clusters":false, + "use_trip_clusters":true } } } \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/clustering.py b/emission/analysis/modelling/trip_model/clustering.py index d3924f32a..33f770e4e 100644 --- a/emission/analysis/modelling/trip_model/clustering.py +++ b/emission/analysis/modelling/trip_model/clustering.py @@ -6,12 +6,12 @@ import logging # import clustering algorithms -import sklearn.metrics.pairwise as smp -import sklearn.cluster as sc -from sklearn import metrics -from sklearn import svm -from sklearn.pipeline import make_pipeline -from sklearn.preprocessing import StandardScaler +import sklearn.metrics.pairwise as sklmp +import sklearn.cluster as sklc +import sklearn.metrics.cluster as sklmc +import sklearn.svm as skls +import sklearn.pipeline as sklpl +import sklearn.preprocessing as sklpp # our imports # NOTE: this requires changing the branch of e-mission-server to @@ -102,7 +102,7 @@ def add_loc_clusters( dist_matrix_meters = get_distance_matrix(loc_df, loc_type) for r in radii: - model = sc.DBSCAN(r, metric="precomputed", + model = sklc.DBSCAN(r, metric="precomputed", min_samples=min_samples).fit(dist_matrix_meters) labels = model.labels_ # print(model.n_features_in_) @@ -150,7 +150,7 @@ def add_loc_clusters( dist_matrix_meters = get_distance_matrix(loc_df, loc_type) for r in radii: - labels = sc.OPTICS( + labels = sklc.OPTICS( min_samples=optics_min_samples, max_eps=r, xi=optics_xi, @@ -178,7 +178,7 @@ def add_loc_clusters( dist_matrix_meters = get_distance_matrix(p_loc_df, loc_type) for r in radii: - labels = sc.DBSCAN( + labels = sklc.DBSCAN( r, metric="precomputed", min_samples=min_samples).fit(dist_matrix_meters).labels_ @@ -231,7 +231,7 @@ def add_loc_clusters( # what the bandwidth roughly corresponds to in the real world/make # the value a little more interpretable. LATLON_TO_M = 1 / 111139 - labels = sc.MeanShift( + labels = sklc.MeanShift( bandwidth=LATLON_TO_M * r, min_bin_freq=min_samples, cluster_all=False, @@ -325,9 +325,9 @@ def add_loc_SVM(loc_df, ]] y_train = labeled_points_in_cluster.purpose_confirm.to_list() - labels = make_pipeline( - StandardScaler(), - svm.SVC( + labels = sklpl.make_pipeline( + sklpp.StandardScaler(), + skls.SVC( kernel='rbf', gamma=svm_gamma, C=svm_C, @@ -381,7 +381,7 @@ def get_distance_matrix(loc_df, loc_type): radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]]) dist_matrix_meters = pd.DataFrame( - smp.haversine_distances(radians_lat_lon, radians_lat_lon) * + sklmp.haversine_distances(radians_lat_lon, radians_lat_lon) * EARTH_RADIUS) return dist_matrix_meters @@ -404,7 +404,7 @@ def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'): def purity_score(y_true, y_pred): - contingency_matrix = metrics.cluster.contingency_matrix(y_true, y_pred) + contingency_matrix = sklmc.contingency_matrix(y_true, y_pred) purity = np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix) return purity diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py new file mode 100644 index 000000000..16eee014f --- /dev/null +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -0,0 +1,197 @@ +import joblib +from typing import Dict, List, Optional, Tuple +import sklearn.metrics.pairwise as smp +import emission.core.wrapper.confirmedtrip as ecwc +import logging +from io import BytesIO + +import json +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.config as eamtc +import emission.storage.timeseries.builtin_timeseries as estb +import emission.storage.decorations.trip_queries as esdtq +import emission.analysis.modelling.trip_model.models as eamtm + +EARTH_RADIUS = 6371000 + +class ForestClassifierModel(eamuu.TripModel): + + def __init__(self,config=None): + + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.forest') + logging.debug(f'ForestClassifier loaded model config from file') + else: + logging.debug(f'ForestClassifier using model config argument') + + random_forest_expected_keys = [ + 'loc_feature', + 'n_estimators', + 'criterion', + 'min_samples_split', + 'min_samples_leaf', + 'max_features', + 'bootstrap', + ] + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + + # cluster_expected_keys= [ + # 'radius', + # 'size_thresh', + # 'purity_thresh', + # 'gamma', + # 'C', + # 'use_start_clusters', + # 'use_trip_clusters', + # ] + # + # if config['loc_feature'] == 'cluster': + # for k in cluster_expected_keys: + # if config.get(k) is None: + # msg = f"cluster trip model config missing expected key {k}" + # raise KeyError(msg) + ####################################### + for k in random_forest_expected_keys: + if config.get(k) is None: + msg = f"forest trip model config missing expected key {k}" + raise KeyError(msg) + self.model=eamtm.ForestClassifier(**config) + + + def fit(self,trips: List[ecwc.Confirmedtrip]): + ''' + trips : List of Entry type data + ''' + # check and raise exception if no data to fit + logging.debug(f'fit called with {len(trips)} trips') + + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + + #Convert List of Entry to dataframe + data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) + labeled_trip_df = esdtq.filter_labeled_trips(data_df) + expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df) + #fit models on dataframe + self.model.fit(expanded_labeled_trip_df) + + + def predict(self, trip: List[float]) -> Tuple[List[Dict], int]: + ''' + trip : A single trip whose mode, pupose and replaced mode are required + returns. + ''' + + #check if theres no trip to predict + logging.debug(f"forest classifier predict called with {len(trip)} trips") + if len(trip) == 0: + msg = f'model.predict cannot be called with an empty trip' + raise Exception(msg) + # CONVERT TRIP TO dataFrame + test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip]) + predcitions_df= self.model.predict(test_df) + + # the predictions_df currently holds the highest probable options + # individually in all three categories. the predictions_df are in the form + # + # purpose_pred | purpose_proba | mode_pred | mode_proba | replaced_pred | replaced proba + # dog-park | 1.0 | e-bike | 0.99 | walk | 1.1 + # + # + # However, to keep the trip model general, the forest model is expected to return + # + #PREDICTIONS [ {'labels': {'mode_confirm': 'e-bike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'}, + # 'p': ( Currently average of the 3 probabilities)}] + labels= { + 'mode_confirm': predcitions_df['mode_pred'].iloc[0], + 'replaced_mode' : predcitions_df['replaced_pred'].iloc[0], + 'purpose_confirm' : predcitions_df['purpose_pred'].iloc[0] + } + + avg_proba = predcitions_df[['purpose_proba','mode_proba','replaced_proba']].mean(axis=1).iloc[0] + predictions =[{ + 'labels' : labels, + 'p' : avg_proba + }] + return predictions, len(predictions) + + def to_dict(self): + """ + Convert the model to a dictionary suitable for storage. + """ + data={} + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + + for attribute_name in attr: + if not hasattr(self.model,attribute_name): + raise ValueError(f"Attribute {attribute_name} not found in the model") + + buffer=BytesIO() + try: + joblib.dump(getattr(self.model,attribute_name),buffer) + except Exception as e: + raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}") + buffer.seek(0) + data[attribute_name]=buffer.getvalue() + + return data + + def from_dict(self,model: Dict): + """ + Load the model from a dictionary. + """ + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## TODO : confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + for attribute_name in attr: + if attribute_name not in model: + raise ValueError(f"Attribute {attribute_name} missing in the model") + try: + buffer = BytesIO(model[attribute_name]) + setattr(self.model,attribute_name, joblib.load(buffer)) + except Exception as e: + raise RuntimeError(f"Error deserializing { attribute_name}: {str(e)}") + # If we do not wish to raise the exception after logging the error, comment the line above + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + """ + extract the relevant features for learning from a trip for this model instance + + :param trip: the trip to extract features from + :type trip: Confirmedtrip + :return: a vector containing features to predict from + :rtype: List[float] + """ + # ForestClassifier class in models.py file handles features extraction. + pass + + def is_incremental(self) -> bool: + """ + whether this model requires the complete user history to build (False), + or, if only the incremental data since last execution is required (True). + + :return: if the model is incremental. the current timestamp will be recorded + in the analysis pipeline. the next call to this model will only include + trip data for trips later than the recorded timestamp. + :rtype: bool + """ + pass \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py index b5e761fb0..16f27ae78 100644 --- a/emission/analysis/modelling/trip_model/model_type.py +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -3,6 +3,7 @@ import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.similarity.od_similarity as eamso import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug +import emission.analysis.modelling.trip_model.forest_classifier as eamuf SIMILARITY_THRESHOLD_METERS=500 @@ -11,6 +12,7 @@ class ModelType(Enum): # ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS' GREEDY_SIMILARITY_BINNING = 'GREEDY' + RANDOM_FOREST_CLASSIFIER = 'FOREST' def build(self, config=None) -> eamuu.TripModel: """ @@ -24,16 +26,16 @@ def build(self, config=None) -> eamuu.TripModel: :raises KeyError: if the requested model name does not exist """ # Dict[ModelType, TripModel] - MODELS = { - ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config) - } + MODELS = { + ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning, + ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifierModel + } model = MODELS.get(self) if model is None: - model_names = list(lambda e: e.name, MODELS.keys()) - models = ",".join(model_names) - raise KeyError(f"ModelType {self.name} not found in factory, please add to build method") - - return model + available_models = ', '.join([ e.name for e in ModelType]) + raise KeyError(f"ModelType {self.name} not found in factory, Available models are {available_models}."\ + "Otherwise please add new model to build method") + return model(config) @classmethod def names(cls): diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py index e5fc08b46..8dc487391 100644 --- a/emission/analysis/modelling/trip_model/models.py +++ b/emission/analysis/modelling/trip_model/models.py @@ -5,21 +5,21 @@ import copy # sklearn imports -from sklearn.pipeline import make_pipeline -from sklearn.preprocessing import StandardScaler, OneHotEncoder -from sklearn.impute import SimpleImputer -from sklearn.metrics.pairwise import haversine_distances -from sklearn.cluster import DBSCAN -from sklearn import svm -from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier -from sklearn.tree import DecisionTreeClassifier -from sklearn.exceptions import NotFittedError +import sklearn.pipeline as sklpl +import sklearn.preprocessing as sklpp +import sklearn.impute as skli +import sklearn.metrics.pairwise as sklmp +import sklearn.cluster as sklc +import sklearn.svm as skls +import sklearn.ensemble as sklen +import sklearn.tree as sklt +import sklearn.exceptions as sklex # our imports -from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity +import emission.analysis.modelling.trip_model.clustering as eamtcl import emission.analysis.modelling.trip_model.data_wrangling as eamtd import emission.storage.decorations.trip_queries as esdtq -from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting +import emission.analysis.classification.inference.labels.inferrers as eacili import emission.core.wrapper.entry as ecwe import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg import emission.core.common as ecc @@ -28,7 +28,6 @@ import emission.analysis.modelling.trip_model.run_model as eamur -import emission.analysis.modelling.trip_model.clustering as eamtc # NOTE: tour_model_extended.similarity is on the # eval-private-data-compatibility branch in e-mission-server @@ -334,7 +333,7 @@ def fit(self, unused,train_entry_list=None): # fit the bins self.sim_model= eamtg.GreedySimilarityBinning(model_config) - cleaned_trip_entry= eamtc.cleanEntryTypeData(self.train_df,train_entry_list) + cleaned_trip_entry= eamtcl.cleanEntryTypeData(self.train_df,train_entry_list) self.sim_model.fit(cleaned_trip_entry) labels = [int(l) for l in self.sim_model.tripLabels] @@ -526,8 +525,8 @@ def fit(self, train_df,unused=None): ######################### ### get base clusters ### ######################### - dist_matrix_meters = get_distance_matrix(self.train_df, self.loc_type) - self.base_model = DBSCAN(self.radius, + dist_matrix_meters = eamtcl.get_distance_matrix(self.train_df, self.loc_type) + self.base_model = sklc.DBSCAN(self.radius, metric="precomputed", min_samples=1).fit(dist_matrix_meters) base_clusters = self.base_model.labels_ @@ -557,7 +556,7 @@ def fit(self, train_df,unused=None): continue # only do SVM if purity is below threshold - purity = single_cluster_purity(points_in_cluster, + purity = eamtcl.single_cluster_purity(points_in_cluster, label_col='purpose_true') if purity < self.purity_thresh: X = points_in_cluster[[ @@ -565,9 +564,9 @@ def fit(self, train_df,unused=None): ]] y = points_in_cluster.purpose_true.to_list() - svm_model = make_pipeline( - StandardScaler(), - svm.SVC( + svm_model = sklpl.make_pipeline( + sklpp.StandardScaler(), + skls.SVC( kernel='rbf', gamma=self.gamma, C=self.C, @@ -655,7 +654,7 @@ def _NN_predict(self, test_df): new_loc_radians = np.radians( row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) new_loc_radians = np.reshape(new_loc_radians, (1, 2)) - dist_matrix_meters = haversine_distances( + dist_matrix_meters = sklmp.haversine_distances( new_loc_radians, train_radians) * EARTH_RADIUS shortest_dist_idx = np.argmin(dist_matrix_meters) @@ -738,7 +737,7 @@ def predict_proba(self, test_df): replaced_distribs = [] for trip in test_trips: - trip_prediction = predict_cluster_confidence_discounting(trip) + trip_prediction = eacili.predict_cluster_confidence_discounting(trip) if len(trip_prediction) == 0: # model could not find cluster for the trip @@ -1259,7 +1258,7 @@ def predict_proba(self, test_df): mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() - except NotFittedError as e: + except sklex.NotFittedError as e: # if we can't predict purpose, we can still try to predict mode and # replaced-mode without one-hot encoding the purpose @@ -1277,7 +1276,7 @@ def predict_proba(self, test_df): and replaced_pred.dtype == np.float64): # this indicates that all the predictions are np.nan so none of the # random forest classifiers were fitted - raise NotFittedError + raise sklex.NotFittedError # TODO: move this to a Mixin for cluster-based predictors and use the # 'cluster' column of the proba_df outputs @@ -1379,7 +1378,7 @@ class probabilities for mode and replaced-mode respectively [self.X_test_for_mode, onehot_mode_df], axis=1) replaced_proba = self._try_predict_proba_replaced() - except NotFittedError as e: + except sklex.NotFittedError as e: mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) @@ -1409,7 +1408,7 @@ def _try_predict_proba_replaced(self): replaced_proba = pd.DataFrame( replaced_proba_raw, columns=self.replaced_predictor.classes_) - except NotFittedError as e: + except sklex.NotFittedError as e: replaced_proba_raw = np.full((len(self.X_test_for_replaced), 1), 0) replaced_proba = pd.DataFrame(replaced_proba_raw, columns=[np.nan]) @@ -1436,7 +1435,7 @@ def _clusterable(self, test_df): # haversine distance, so we have to reimplement it ourselves new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) new_loc_radians = np.reshape(new_loc_radians, (1, 2)) - dist_matrix_meters = haversine_distances( + dist_matrix_meters = sklmp.haversine_distances( new_loc_radians, train_radians) * EARTH_RADIUS shortest_dist = np.min(dist_matrix_meters) @@ -1514,7 +1513,7 @@ def __init__( self.C = C self.n_estimators = n_estimators self.criterion = criterion - self.max_depth = max_depth + self.max_depth = max_depth if max_depth!= 'null' else None self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.max_features = max_features @@ -1524,36 +1523,42 @@ def __init__( self.use_start_clusters = use_start_clusters self.use_trip_clusters = use_trip_clusters - if self.loc_feature == 'cluster': - # clustering algorithm to generate end clusters - self.end_cluster_model = DBSCANSVMCluster( - loc_type='end', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_start_clusters or self.use_trip_clusters: - # clustering algorithm to generate start clusters - self.start_cluster_model = DBSCANSVMCluster( - loc_type='start', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_trip_clusters: - # helper class to generate trip-level clusters - self.trip_grouper = TripGrouper( - start_cluster_col='start_cluster_idx', - end_cluster_col='end_cluster_idx') - - # wrapper class to generate one-hot encodings for cluster indices - self.cluster_enc = OneHotWrapper(sparse=False, - handle_unknown='ignore') + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not tested. + ############################### + # if self.loc_feature == 'cluster': + # # clustering algorithm to generate end clusters + # self.end_cluster_model = DBSCANSVMCluster( + # loc_type='end', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_start_clusters or self.use_trip_clusters: + # # clustering algorithm to generate start clusters + # self.start_cluster_model = DBSCANSVMCluster( + # loc_type='start', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_trip_clusters: + # # helper class to generate trip-level clusters + # self.trip_grouper = TripGrouper( + # start_cluster_col='start_cluster_idx', + # end_cluster_col='end_cluster_idx') + + # # wrapper class to generate one-hot encodings for cluster indices + # self.cluster_enc = OneHotWrapper(sparse=False, + # handle_unknown='ignore') + ############################################################################# + # wrapper class to generate one-hot encodings for purposes and modes self.purpose_enc = OneHotWrapper(impute_missing=True, sparse=False, @@ -1563,7 +1568,7 @@ def __init__( handle_unknown='error') # ensemble classifiers for each label category - self.purpose_predictor = RandomForestClassifier( + self.purpose_predictor = sklen.RandomForestClassifier( n_estimators=self.n_estimators, criterion=self.criterion, max_depth=self.max_depth, @@ -1572,7 +1577,7 @@ def __init__( max_features=self.max_features, bootstrap=self.bootstrap, random_state=self.random_state) - self.mode_predictor = RandomForestClassifier( + self.mode_predictor = sklen.RandomForestClassifier( n_estimators=self.n_estimators, criterion=self.criterion, max_depth=self.max_depth, @@ -1581,7 +1586,7 @@ def __init__( max_features=self.max_features, bootstrap=self.bootstrap, random_state=self.random_state) - self.replaced_predictor = RandomForestClassifier( + self.replaced_predictor = sklen.RandomForestClassifier( n_estimators=self.n_estimators, criterion=self.criterion, max_depth=self.max_depth, @@ -1836,33 +1841,33 @@ def __init__( sparse=False, handle_unknown='error') - self.purpose_predictor = AdaBoostClassifier( + self.purpose_predictor = sklen.AdaBoostClassifier( n_estimators=self.n_estimators, learning_rate=self.learning_rate, random_state=self.random_state, - base_estimator=DecisionTreeClassifier( + base_estimator=sklt.DecisionTreeClassifier( criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, max_features=self.max_features, random_state=self.random_state)) - self.mode_predictor = AdaBoostClassifier( + self.mode_predictor = sklen.AdaBoostClassifier( n_estimators=self.n_estimators, learning_rate=self.learning_rate, random_state=self.random_state, - base_estimator=DecisionTreeClassifier( + base_estimator=sklt.DecisionTreeClassifier( criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, max_features=self.max_features, random_state=self.random_state)) - self.replaced_predictor = AdaBoostClassifier( + self.replaced_predictor = sklen.AdaBoostClassifier( n_estimators=self.n_estimators, learning_rate=self.learning_rate, random_state=self.random_state, - base_estimator=DecisionTreeClassifier( + base_estimator=sklt.DecisionTreeClassifier( criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, @@ -2018,13 +2023,13 @@ def __init__( ): self.impute_missing = impute_missing if self.impute_missing: - self.encoder = make_pipeline( - SimpleImputer(missing_values=np.nan, + self.encoder = sklpl.make_pipeline( + skli.SimpleImputer(missing_values=np.nan, strategy='constant', fill_value='missing'), - OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) + sklpp.OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) else: - self.encoder = OneHotEncoder(sparse=sparse, + self.encoder = sklpp.OneHotEncoder(sparse=sparse, handle_unknown=handle_unknown) def fit_transform(self, train_df, output_col_prefix=None): diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index 7356aa597..a1a48ef12 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -74,7 +74,8 @@ def update_trip_model( epq.mark_trip_model_failed(user_id) else: - # train and store the model + # train and store the model. pass only List of event and only convert + # to dataframe type data whereever required. model.fit(trips) model_data_next = model.to_dict() diff --git a/emission/tests/modellingTests/TestForestModelIntegration.py b/emission/tests/modellingTests/TestForestModelIntegration.py new file mode 100644 index 000000000..e08345f5d --- /dev/null +++ b/emission/tests/modellingTests/TestForestModelIntegration.py @@ -0,0 +1,138 @@ +import unittest +import numpy as np +import time +import logging +import bson.objectid as boi +import emission.analysis.classification.inference.labels.pipeline as eacilp +import emission.analysis.classification.inference.labels.inferrers as eacili +import emission.core.wrapper.labelprediction as ecwl +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.decorations.trip_queries as esdt +import emission.storage.timeseries.timequery as estt +import emission.core.get_database as edb +import emission.tests.common as etc +import emission.pipeline.intake_stage as epi +import emission.analysis.modelling.trip_model.config as eamtc +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.timeseries.abstract_timeseries as esta + +class TestForestModelIntegration(unittest.TestCase): + """ + This tests the label inference pipeline. It uses real data and placeholder inference algorithms. + Test if the forest model for label prediction is smoothly integrated with the inference pipeline. + In the initial setup, build a dummy forest model. Then run the pipeline on real example data. + Finally in the test, assert the type of label predictions expected. + The label_data dict and mock_trip_data are copied over from TestRunGreedyModel.py + """ + def setUp(self): + np.random.seed(91) + self.test_algorithms = eacilp.primary_algorithms + forest_model_config = eamtc.get_config_value_or_raise('model_parameters.forest') + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") ##maybe use a different file + ts = esta.TimeSeries.get_time_series(self.testUUID) + + # Generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + # Configuration values for randomly-generated test data copied over from TestRunGreedyModel.py + mock_trip_data = etmm.generate_mock_trips( + user_id=self.testUUID, + trips=100, + origin=(-105.1705977, 39.7402654), + destination=(-105.1755606, 39.7673075), + trip_part='od', + label_data=label_data, + within_threshold= 33, + threshold=0.004, # ~400m + has_label_p=0.9 + ) + + # Required for Forest model inference + for result_entry in mock_trip_data: + result_entry['data']['start_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['end_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['start_place']=boi.ObjectId() + result_entry['data']['end_place']=boi.ObjectId() + + split = int(len(mock_trip_data)*0.7) + mock_train_data = mock_trip_data[:split] + self.mock_test_data = mock_trip_data[split:] + + ts.bulk_insert(mock_train_data) + + # Build and train model + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.testUUID, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=14, + model_config=forest_model_config + ) + + # Run inference pipeline + self.run_pipeline(self.test_algorithms) + time_range = estt.TimeQuery("metadata.write_ts", None, time.time()) + self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.testUUID, time_query=time_range) + + def tearDown(self): + etc.dropAllCollections(edb._get_current_db()) + + def run_pipeline(self, algorithms): + default_primary_algorithms = eacilp.primary_algorithms + eacilp.primary_algorithms = algorithms + epi.run_intake_pipeline_for_user(self.testUUID,skip_if_no_new_data = False) + eacilp.primary_algorithms = default_primary_algorithms + + def testForestAlgorithm(self): + ''' + Tests that forest algorithm runs successfully when called from the analysis pipeline + The tests are based on the existing tests in TestLabelInferencePipeline.py + ''' + valid_modes = ['ebike', 'bike'] + valid_purposes = ['happy-hour', 'dog-park'] + + for trip in self.inferred_trips: + entries = esdt.get_sections_for_trip("inference/labels", self.testUUID, trip.get_id()) + self.assertEqual(len(entries), len(self.test_algorithms)) + for entry in entries: + # Test 1: Check that non-empty prediction list is generated + self.assertGreater(len(entry["data"]["prediction"]), 0, "Prediction list should not be empty - model failed to generate any predictions") + + # Test 2: Check for equality of trip inferred labels and prediction value in entry + self.assertEqual(trip["data"]["inferred_labels"], entry["data"]["prediction"]) + + # Test 3: Check that prediction value in entry is equal to the prediction generated by the algorithm + this_algorithm = ecwl.AlgorithmTypes(entry["data"]["algorithm_id"]) + self.assertIn(this_algorithm, self.test_algorithms) + self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm]([trip])[0]) + + for singleprediction in entry["data"]["prediction"]: + # Test 4: Check that the prediction is a dictionary + self.assertIsInstance(singleprediction, dict, "should be an instance of the dictionary class") + self.assertIsInstance(singleprediction['labels'], dict, "should be an instance of the dictionary class") + + # Test 5: Check that the prediction dictionary contains the required keys + self.assertIn('mode_confirm', singleprediction['labels'].keys()) + self.assertIn('replaced_mode', singleprediction['labels'].keys()) + self.assertIn('purpose_confirm', singleprediction['labels'].keys()) + + # Test 6: Check that the prediction dictionary contains the correct values + self.assertIn(singleprediction['labels']['mode_confirm'], valid_modes) + self.assertIn(singleprediction['labels']['purpose_confirm'], valid_purposes) + +def main(): + etc.configLogging() + unittest.main() + +if __name__ == "__main__": + main() diff --git a/emission/tests/modellingTests/TestForestModelLoadandSave.py b/emission/tests/modellingTests/TestForestModelLoadandSave.py new file mode 100644 index 000000000..e92d4273a --- /dev/null +++ b/emission/tests/modellingTests/TestForestModelLoadandSave.py @@ -0,0 +1,184 @@ +from typing import ByteString +import unittest +import logging +import unittest.mock as um +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.config as eamtc +import uuid +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.analysis.modelling.trip_model.run_model as eamtr +import emission.tests.common as etc + +class TestForestModelLoadandSave(unittest.TestCase): + """ + Tests to make sure the model load and save properly + The label_data dict and mock_trip_data are copied over from TestRunGreedyModel.py + """ + def setUp(self): + self.user_id = 'TestForestModelLoadAndSave-TestData' + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + ts = esta.TimeSeries.get_time_series(self.user_id) + + # Generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + # Configuration values for randomly-generated test data copied over from TestRunGreedyModel.py + mock_trip_data = etmm.generate_mock_trips( + user_id=self.user_id, + trips=100, + origin=(-105.1705977, 39.7402654,), + destination=(-105.1755606, 39.7673075), + trip_part='od', + label_data=label_data, + within_threshold=33, + threshold=0.004, # ~400m + has_label_p=0.9 + ) + + # Required for Forest model inference + for result_entry in mock_trip_data: + result_entry['data']['start_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['end_local_dt']=result_entry['metadata']['write_local_dt'] + + split = int(len(mock_trip_data)*0.7) + mock_train_data = mock_trip_data[:split] + self.mock_test_data = mock_trip_data[split:] + + ts.bulk_insert(mock_train_data) + + self.forest_model_config= eamtc.get_config_value_or_raise('model_parameters.forest') + + # Build and train model + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=14, + model_config=self.forest_model_config + ) + + self.model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + + def tearDown(self): + etc.dropAllCollections(edb._get_current_db()) + + def testForestModelPredictionsEquality(self): + """ + EqualityTest : Serialising an object with 'to_dict' and then immediately + deserialize it with 'from_dict'. After deserialization, the object should have + the same state as original + + TypePreservationTest: To ensure that the serialization and deserialization + process maintains the data types of all model attributes. + The type of deserialized model attributes and the predictions of this must match + those of initial model. + """ + predictions_list = eamur.predict_labels_with_n( + trip_list = self.mock_test_data, + model=self.model + ) + + model_data=self.model.to_dict() + deserialized_model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + deserialized_model = deserialized_model_type.build(self.forest_model_config) + deserialized_model.from_dict(model_data) + + predictions_deserialized_model_list = eamur.predict_labels_with_n( + trip_list = self.mock_test_data, + model=deserialized_model + ) + + # Test if the types are correct + for attr in ['purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df']: + deserialized_attr_value=getattr(deserialized_model.model,attr) + original_attr_value=getattr(self.model.model,attr) + # Check type preservation + self.assertIsInstance(deserialized_attr_value,type(original_attr_value), f"Type mismatch for {attr} ") + + # Test if the values are the same + self.assertEqual(predictions_list, predictions_deserialized_model_list, " should be equal") + + def testForestModelConsistency(self): + """ + ConsistencyTest : To Verify that the serialization and deserialization process + is consistent across multiple executions + """ + predictions_list_model1 = eamur.predict_labels_with_n( + trip_list = self.mock_test_data, + model=self.model + ) + + model_iter2 = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + + predictions_list_model2 = eamur.predict_labels_with_n( + trip_list = self.mock_test_data, + model=model_iter2 + ) + + self.assertEqual(predictions_list_model1, predictions_list_model2, " should be equal") + + def testSerializationDeserializationErrorHandling(self): + """ + SerializationErrorHandling : To verify that any errors during + serialising an object with 'to_dict' are handled. + + DeserializationErrorHandling : To verify that any errors during + deserialising an object with 'from_dict' are handled. + """ + # Test 1: SerializationErrorHandling + # Defining a side effect function to simulate a serialization error + def mock_dump(*args,**kwargs): + raise Exception("Serialization Error") + + # patch is used to temporarily replace joblib.dump with a + # mock function that raises an exception + # + # side_effect, which is set to mock_dump, is called instead of + # real joblib.dump function when 'to_dict' is invoked + with um.patch('joblib.dump',side_effect=mock_dump): + with self.assertRaises(RuntimeError): + self.model.to_dict() + + # Test 2: DeserializationErrorHandling + # Defining a side effect function to simulate a deserialization error + def mock_load(*args,**kwargs): + raise Exception("Deserialization Error") + + model_data=self.model.to_dict() + deserialized_model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + deserialized_model = deserialized_model_type.build(self.forest_model_config) + + # patch is used to temporarily replace joblib.load with a + # mock function that raises an exception + # + # side_effect, which is set to mock_load, is called instead of + # real joblib.load function when 'to_dict' is invoked + with um.patch('joblib.load',side_effect=mock_load): + with self.assertRaises(RuntimeError): + deserialized_model.from_dict(model_data) + +if __name__ == '__main__': + etc.configLogging() + unittest.main() diff --git a/emission/tests/modellingTests/TestRunForestModel.py b/emission/tests/modellingTests/TestRunForestModel.py new file mode 100644 index 000000000..6ecad60a5 --- /dev/null +++ b/emission/tests/modellingTests/TestRunForestModel.py @@ -0,0 +1,214 @@ +import unittest +import logging + +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.models as eamtm +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.storage.pipeline_queries as epq +import emission.core.wrapper.pipelinestate as ecwp +import emission.analysis.modelling.trip_model.forest_classifier as eamtf +import sklearn.ensemble as sklen + +class TestRunForestModel(unittest.TestCase): + """ + Tests to ensure Pipeline builds and runs with zero + or more trips + """ + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data + """ + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunForestModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + # for a negative test, below + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + + # test data can be saved between test invocations, check if data exists before generating + ts = esta.TimeSeries.get_time_series(user_id) + test_data = list(ts.find_entries(["analysis/confirmed_trip"])) + if len(test_data) == 0: + # generate test data for the database + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + #values required by forest model + for entry in train: + entry['data']['start_local_dt']=entry['metadata']['write_local_dt'] + entry['data']['end_local_dt']=entry['metadata']['write_local_dt'] + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + """ + clean up database + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testBuildForestModelFromConfig(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + built_model = eamumt.ModelType.RANDOM_FOREST_CLASSIFIER.build() + attributes={'purpose_predictor': sklen.RandomForestClassifier , + 'mode_predictor' :sklen.RandomForestClassifier, + 'replaced_predictor':sklen.RandomForestClassifier, + 'purpose_enc' : eamtm.OneHotWrapper, + 'mode_enc':eamtm.OneHotWrapper + } + self.assertIsInstance(built_model,eamtf.ForestClassifierModel) + for attr in attributes: + #logging.debug(f'{attr,attributes[attr]}') + x=getattr(built_model.model,attr) + self.assertIsInstance(x, attributes[attr]) + # success if it didn't throw + + def testTrainForestModelWithZeroTrips(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + # pass along debug model configuration + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'~~~~ do nothing ~~~~') + eamur.update_trip_model( + user_id=self.unused_user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + # user had no entries so their pipeline state should not have been set + # if it was set, the time query here would + stage = ecwp.PipelineStages.TRIP_MODEL + pipeline_state = epq.get_current_state(self.unused_user_id, stage) + self.assertIsNone( + pipeline_state['curr_run_ts'], + "pipeline should not have a current timestamp for the test user") + + + def testRoundPredictForestModel(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + logging.debug(f'(TEST) testing prediction of stored model') + test = esda.get_entries(key="analysis/confirmed_trip", user_id=self.user_id, time_query=None) + model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=forest_model_config + ) + + predictions_list = eamur.predict_labels_with_n( + trip_list = test, + model=model + ) + for prediction, n in predictions_list: + [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] + self.assertNotEqual(len(prediction), 0, "Prediction list should not be empty - model failed to generate any predictions") + self.assertIn('labels',prediction[0].keys()) + self.assertIn('p',prediction[0].keys()) + self.assertIsInstance(prediction[0], dict, "should be an instance of the dictionary class") + self.assertIsInstance(prediction[0]['labels'], dict, "should be an instance of the dictionary class") + self.assertIn('mode_confirm',prediction[0]['labels'].keys()) + self.assertIn('replaced_mode',prediction[0]['labels'].keys()) + self.assertIn('purpose_confirm',prediction[0]['labels'].keys()) \ No newline at end of file diff --git a/emission/tests/modellingTests/modellingTestAssets.py b/emission/tests/modellingTests/modellingTestAssets.py index 252b2ad34..2bb1a958e 100644 --- a/emission/tests/modellingTests/modellingTestAssets.py +++ b/emission/tests/modellingTests/modellingTestAssets.py @@ -166,7 +166,10 @@ def build_mock_trip( "type": "Point", "coordinates": destination }, - "user_input": labels + #necessary valued for random forest model + "user_input": labels, + "duration": end_ts-start_ts, + "distance": ecc.calDistance(origin,destination) } return ecwe.Entry.create_fake_entry(user_id, key, data, write_ts=time.time())