Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store limited number of models and clear older ones #948

Merged
merged 14 commits into from
Feb 10, 2024
7 changes: 5 additions & 2 deletions .docker/setup_config.sh
shankari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ source setup/setup.sh
## 10/02 - Mukul
## - Above comments talk about manually updating cryptography to version 40
## - I have upgraded to 41.0.4 as per latest vulnerability fixes.
conda install -c conda-forge cryptography=41.0.4 wheel=0.40.0
conda install -c conda-forge cryptography=41.0.7 wheel=0.40.0

## Remove the old, unused packages to avoid tripping up the checker
rm -rf /root/miniconda-23.1.0/pkgs/cryptography-38.0.4-py39h9ce1e76_0
rm -rf /root/miniconda-23.1.0/pkgs/wheel-0.37.1-pyhd3eb1b0_0
rm -rf /root/miniconda-23.5.2/pkgs/cryptography-39.0.1-py39h9ce1e76_2
rm -rf /root/miniconda-23.5.2/pkgs/certifi-2023.5.7-py39h06a4308_0
rm -rf /root/miniconda-23.5.2/pkgs/conda-23.5.2-py39h06a4308_0/lib/python3.9/site-packages/tests/
rm -rf /root/miniconda-23.5.2/pkgs/conda-23.5.2-py39h06a4308_0/lib/python3.9/site-packages/tests
rm -rf /root/miniconda-23.5.2/pkgs/urllib3-1.26.16-py39h06a4308_0
rm -rf /root/miniconda-23.5.2/pkgs/urllib3-1.26.17-pyhd8ed1ab_0
rm -rf /root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/urllib3-1.26.17.dist-info
rm -rf /root/miniconda-23.5.2/lib/python3.9/site-packages/urllib3-1.26.16.dist-info
rm -rf /root/miniconda-23.5.2/lib/python3.9/site-packages/tests

# Clean up the conda install
conda clean -t
find /root/miniconda-*/pkgs -wholename \*info/test\* -type d | xargs rm -rf
find ~/miniconda-23.5.2 -name \*tests\* -path '*/site-packages/*' | grep ".*/site-packages/tests" | xargs rm -rf

if [ -d "webapp/www/" ]; then
cp /index.html webapp/www/index.html
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
shankari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# python 3
FROM ubuntu:jammy-20231004
FROM ubuntu:jammy-20231211.1

MAINTAINER K. Shankari ([email protected])

Expand Down
12 changes: 9 additions & 3 deletions emission/analysis/classification/inference/labels/inferrers.py
shankari marked this conversation as resolved.
Show resolved Hide resolved
shankari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import copy
import time
import arrow
from uuid import UUID
shankari marked this conversation as resolved.
Show resolved Hide resolved

import emission.analysis.modelling.tour_model_first_only.load_predict as lp
import emission.analysis.modelling.trip_model.run_model as eamur
Expand Down Expand Up @@ -171,14 +172,19 @@ def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first
user_id_list = []
for trip in trip_list:
user_id_list.append(trip['user_id'])
assert user_id_list.count(user_id_list[0]) == len(user_id_list), "Multiple user_ids found for trip_list, expected unique user_id for all trips"
error_message = f"""
Multiple user_ids found for trip_list, expected unique user_id for all trips.
Unique user_ids count = {len(set(user_id_list))}
{set(user_id_list)}
"""
assert user_id_list.count(user_id_list[0]) == len(user_id_list), error_message
# Assertion successful, use unique user_id
user_id = user_id_list[0]

# load model
start_model_load_time = time.process_time()
model = eamur._load_stored_trip_model(user_id, model_type, model_storage)
print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}")
logging.debug(f"{arrow.now()} Inside predict_cluster_confidence_discounting: Model load time = {time.process_time() - start_model_load_time}")

labels_n_list = eamur.predict_labels_with_n(trip_list, model)
predictions_list = []
Expand All @@ -192,4 +198,4 @@ def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
predictions_list.append(labels)
return predictions_list
return predictions_list
4 changes: 2 additions & 2 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def predict_labels_with_n(
"""

predictions_list = []
print(f"{arrow.now()} Inside predict_labels_n: Predicting...")
logging.debug(f"{arrow.now()} Inside predict_labels_n: Predicting...")
start_predict_time = time.process_time()
for trip in trip_list:
if model is None:
Expand All @@ -118,7 +118,7 @@ def predict_labels_with_n(
else:
predictions, n = model.predict(trip)
predictions_list.append((predictions, n))
print(f"{arrow.now()} Inside predict_labels_n: Predictions complete for trip_list in time = {time.process_time() - start_predict_time}")
logging.debug(f"{arrow.now()} Inside predict_labels_n: Predictions complete for trip_list in time = {time.process_time() - start_predict_time}")
return predictions_list


Expand Down
6 changes: 6 additions & 0 deletions emission/storage/modifiable/abstract_model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ def get_current_model(self, key:str) -> Optional[Dict]:
: return: the most recent database entry for this key
"""
pass

def trim_model_entries(self, key:str):
"""
:param: the metadata key for the entries, used to identify the model type
"""
pass
49 changes: 48 additions & 1 deletion emission/storage/modifiable/builtin_model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

import emission.core.get_database as edb
import emission.storage.modifiable.abstract_model_storage as esma
import emission.storage.decorations.analysis_timeseries_queries as esda
shankari marked this conversation as resolved.
Show resolved Hide resolved

import emission.core.wrapper.entry as ecwe
import emission.core.wrapper.wrapperbase as ecwb

class BuiltinModelStorage(esma.ModelStorage):
# TODO: Discuss how to decide on model_count limit
K_MODEL_COUNT = 10

def __init__(self, user_id):
super(BuiltinModelStorage, self).__init__(user_id)
self.key_query = lambda key: {"metadata.key": key}
Expand All @@ -23,9 +27,13 @@ def upsert_model(self, key:str, model: ecwb.WrapperBase):
"""
logging.debug("upsert_doc called with key %s" % key)
entry = ecwe.Entry.create_entry(self.user_id, key, model)
## TODO: Cleanup old/obsolete models
shankari marked this conversation as resolved.
Show resolved Hide resolved
# Cleaning up older models, before inserting new model
self.trim_model_entries(key)
logging.debug("Inserting entry %s into model DB" % entry)
ins_result = edb.get_model_db().insert_one(entry)
## TODO: Cleanup old/obsolete models
new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
logging.debug("New model count for user %s = %s" % (self.user_id, new_model_count))
shankari marked this conversation as resolved.
Show resolved Hide resolved
return ins_result.inserted_id

def get_current_model(self, key:str) -> Optional[Dict]:
Expand All @@ -47,4 +55,43 @@ def get_current_model(self, key:str) -> Optional[Dict]:
first_entry = result_list[0]
del first_entry["_id"]
return first_entry

def get_model_limit(self):
return self.K_MODEL_COUNT

def trim_model_entries(self, key:str):
"""
:param: the metadata key for the entries, used to identify the model type
This function is called inside the model insertion function just before the
model is inserted, to ensure older models are removed before inserting newer ones.

The flow of model insertion function calls is:
eamur.update_trip_model() -> eamums.save_model() -> esma.upsert_model() -> esma.trim_model_entries()
"""

current_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
logging.debug("Before trimming, model count for user %s = %s" % (self.user_id, current_model_count))
find_query = {"user_id": self.user_id, "metadata.key": key}
result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1)
result_list = list(result_it)

if current_model_count >= self.K_MODEL_COUNT:
# Specify the last or minimum timestamp of Kth model entry
write_ts_limit = result_list[self.K_MODEL_COUNT - 1]['metadata']['write_ts']
logging.debug(f"Write ts limit = {write_ts_limit}")

filter_clause = {
"user_id" : self.user_id,
"metadata.key" : key,
"metadata.write_ts" : { "$lte" : write_ts_limit }
}

models_to_delete = edb.get_model_db().delete_many(filter_clause)

if models_to_delete.deleted_count > 0:
logging.debug(f"{models_to_delete.deleted_count} documents deleted successfully\n")
else:
logging.debug("No documents found or none deleted\n")

new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
logging.debug("After trimming, model count for user %s = %s" % (self.user_id, new_model_count))
shankari marked this conversation as resolved.
Show resolved Hide resolved
139 changes: 139 additions & 0 deletions emission/tests/storageTests/TestModelStorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
# Standard imports
from future import standard_library
standard_library.install_aliases()
from builtins import *
import unittest
import datetime as pydt
import logging
import json
import pymongo
import uuid

# Our imports
import emission.core.get_database as edb
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.run_model as eamur
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.modellingTests.modellingTestAssets as etmm
from emission.storage.modifiable.builtin_model_storage import BuiltinModelStorage as esmb

# Test imports
import emission.tests.common as etc

class TestModelStorage(unittest.TestCase):
'''
Copied over the below code in setup() and testTrimModelEntries()
for model creation using mock dummy trips data from
emission.tests.modellingTests.TestRunGreedyModel.py
'''
def setUp(self):
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
level=logging.DEBUG)

# configuration for randomly-generated test data
self.user_id = user_id = 'TestRunGreedyModel-TestData'
self.origin = (-105.1705977, 39.7402654,)
self.destination = (-105.1755606, 39.7673075)
self.min_trips = 14
self.total_trips = 100
self.clustered_trips = 33 # bins must have at least self.min_trips similar trips by default
self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant
# $clustered_trips * $has_label_percent > self.min_trips
# must be correct or else this test could fail under some random test cases.

# for a negative test, below
self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl'

# test data can be saved between test invocations, check if data exists before generating
shankari marked this conversation as resolved.
Show resolved Hide resolved
ts = esta.TimeSeries.get_time_series(user_id)
test_data = list(ts.find_entries(["analysis/confirmed_trip"]))
if len(test_data) == 0:
# generate test data for the database
logging.debug(f"inserting mock Confirmedtrips into database")

# generate labels with a known sample weight that we can rely on in the test
label_data = {
"mode_confirm": ['ebike', 'bike'],
"purpose_confirm": ['happy-hour', 'dog-park'],
"replaced_mode": ['walk'],
"mode_weights": [0.9, 0.1],
"purpose_weights": [0.1, 0.9]
}

train = etmm.generate_mock_trips(
user_id=user_id,
trips=self.total_trips,
origin=self.origin,
destination=self.destination,
trip_part='od',
label_data=label_data,
within_threshold=self.clustered_trips,
threshold=0.004, # ~400m
has_label_p=self.has_label_percent
)

ts.bulk_insert(train)

# confirm data write did not fail
test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None)
if len(test_data) != self.total_trips:
logging.debug(f'test invariant failed after generating test data')
self.fail()
shankari marked this conversation as resolved.
Show resolved Hide resolved
else:
logging.debug(f'found {self.total_trips} trips in database')

def tearDown(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id})
edb.get_model_db().delete_many({'user_id': self.user_id})
edb.get_pipeline_state_db().delete_many({'user_id': self.user_id})

def testTrimModelEntries(self):
"""
Took this code from emission.tests.modellingTests.TestRunGreedyModel.py
with the objective of inserting multiple models into the model_db.
The test involves building and inserting 20 models, which is greater than
the K_MODEL_COUNT (= 10) limit defined in emission.storage.modifiable.builtin_model_storage.py

train a model, save it, load it, and use it for prediction, using
the high-level training/testing API provided via
run_model.py:update_trip_model() # train
run_model.py:predict_labels_with_n() # test

for clustering, use the default greedy similarity binning model
"""

# pass along debug model configuration
greedy_model_config = {
"metric": "od_similarity",
"similarity_threshold_meters": 500,
"apply_cutoff": False,
"clustering_way": 'origin-destination',
"incremental_evaluation": False
}

logging.debug(f'(TRAIN) creating a model based on trips in database')
for i in range(20):
shankari marked this conversation as resolved.
Show resolved Hide resolved
logging.debug(f"Creating dummy model no. {i}")
eamur.update_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
min_trips=self.min_trips,
model_config=greedy_model_config
)
current_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
if i <= (esmb.K_MODEL_COUNT - 1):
self.assertEqual(current_model_count, i+1)
else:
self.assertEqual(current_model_count, esmb.K_MODEL_COUNT)

if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
unittest.main()
Loading