From 7ae93dd836f83311f817a3ae6c3de2e723e15ba5 Mon Sep 17 00:00:00 2001 From: Shankari Date: Fri, 3 Jan 2025 21:50:48 -0800 Subject: [PATCH] Munge incoming entries to avoid keys with dots On around Dec 21st 2024, it looks like firebase changed the format of their push notifications to add in some metadata into the `additionalData` field. This metadata has keys with dots. Since we also use the `additionalData` field to pass in the survey or popup message for custom push notifications, we store the entire `additionalData` into the notification stat. When this notification is pushed up to the server, it cannot be stored in the database, since MongoDB/DocumentDB do not support keys with dots. https://stackoverflow.com/questions/66369545/documentdb-updateone-fails-with-163-name-is-not-valid-for-storage While trying to save the entry, we get the error ``` Traceback (most recent call last): File "/usr/src/app/emission/net/api/bottle.py", line 997, in _handle out = route.call(**args) File "/usr/src/app/emission/net/api/bottle.py", line 1998, in wrapper rv = callback(*a, **ka) File "/usr/src/app/emission/net/api/cfc_webapp.py", line 249, in putIntoCache return usercache.sync_phone_to_server(user_uuid, from_phone) File "/usr/src/app/emission/net/api/usercache.py", line 54, in sync_phone_to_server result = usercache_db.update_one(update_query, File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 1041, in update_one self._update_retryable( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 836, in _update_retryable return self.__database.client._retryable_write( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1476, in _retryable_write return self._retry_with_session(retryable, func, s, None) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1349, in _retry_with_session return self._retry_internal(retryable, func, session, bulk) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/_csot.py", line 105, in csot_wrapper return func(self, *args, **kwargs) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1390, in _retry_internal return func(session, sock_info, retryable) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 817, in _update return self._update( File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/collection.py", line 782, in _update _check_write_command_response(result) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/helpers.py", line 217, in _check_write_command_response _raise_last_write_error(write_errors) File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pymongo/helpers.py", line 190, in _raise_last_write_error raise WriteError(error.get("errmsg"), error.get("code"), error) pymongo.errors.WriteError: Name is not valid for storage, full error: {'index': 0, 'code': 163, 'errmsg': 'Name is not valid for storage'} ``` This is bad because this error interrupts the processing of the incoming data, and causes the `/usercache/put` call to fail. The phone keeps trying to upload this data over and over, and failing over and over, so the pipeline never makes progress, and deployers are not able to see newly processed data in their admin dashboards. To fix this, and make the ingestion code more robust in general, we check the incoming data for keys with dots and munge them. This will fix this immediately, and will also ensure that we don't Testing done: - Added a new unit test that invokes the function directly - Added a new integration test that creates entries and calls `sync_phone_to_server` on them Both tests pass --- emission/net/api/usercache.py | 17 +++++ .../TestBuiltinUserCacheHandlerInput.py | 72 +++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/emission/net/api/usercache.py b/emission/net/api/usercache.py index b2066b243..c16e62acc 100644 --- a/emission/net/api/usercache.py +++ b/emission/net/api/usercache.py @@ -29,6 +29,19 @@ def sync_server_to_phone(uuid): # logging.debug("retrievedData = %s" % retrievedData) return retrievedData +def _remove_dots(entry_doc): + for key in entry_doc: + # print(f"Checking {key=}") + if isinstance(entry_doc[key], dict): + # print(f"Found dict for {key=}, recursing") + _remove_dots(entry_doc[key]) + if '.' in key: + munged_key = key.replace(".", "_") + logging.info(f"Found {key=} with dot, munged to {munged_key=}") + # Get and delete in one swoop + # https://stackoverflow.com/a/11277439 + entry_doc[munged_key] = entry_doc.pop(key, None) + def sync_phone_to_server(uuid, data_from_phone): """ Puts the blob from the phone into the cache @@ -44,6 +57,10 @@ def sync_phone_to_server(uuid, data_from_phone): if "ts" in data["data"] and ecc.isMillisecs(data["data"]["ts"]): data["data"]["ts"] = old_div(float(data["data"]["ts"]), 1000) + + # mongodb/documentDB don't support field names with `.` + # let's convert them all to `_` + _remove_dots(data) # logging.debug("After updating with UUId, we get %s" % data) document = {'$set': data} diff --git a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py index 3d024be06..8c4c82ad7 100644 --- a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py +++ b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py @@ -14,6 +14,7 @@ import uuid import attrdict as ad import time +import copy import geojson as gj # This change should be removed in the next server update, by which time hopefully the new geojson version will incorporate the long-term fix for their default precision # See - jazzband/geojson#177 @@ -273,6 +274,77 @@ def testTwoLongTermCalls(self): self.assertEqual(edb.get_timeseries_db().estimated_document_count(), 120) self.assertEqual(edb.get_timeseries_error_db().estimated_document_count(), 0) + def testRemoteDots(self): + test_template = {"ts":1735934360.256, + "client_app_version":"1.9.6", + "name":"open_notification", + "client_os_version":"15.5", + "reading":{ + "additionalData":{ + "google.c.sender.id":"766801492494", + "coldstart":False, + "notId":"1735934357036293", + "payload":1735934357036293, + "content-available":1, + "foreground":False, + "google.c.fid":"cW3YWb11wds", + "gcm.message_id":"1735934360053131"}}} + test_1 = copy.copy(test_template) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm.message_id", + test_1["reading"]["additionalData"]) + mauc._remove_dots(test_1) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google_c_sender_id", + test_1["reading"]["additionalData"]) + self.assertIn("google_c_fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm_message_id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertNotIn("gcm.message_id", + test_1["reading"]["additionalData"]) + + metadata_template = {'plugin': 'none', + 'write_ts': self.curr_ts - 25, + 'time_zone': u'America/Los_Angeles', + 'platform': u'ios', + 'key': u'stats/client_time', + 'read_ts': self.curr_ts - 27, + 'type': u'message'} + + # there are 30 entries in the setup function + self.assertEqual(len(self.uc1.getMessage()), 30) + + three_entries_with_dots = [] + for i in range(3): + curr_md = copy.copy(metadata_template) + curr_md['write_ts'] = self.curr_ts - 25 + i + three_entries_with_dots.append({ + 'user_id': self.testUserUUID1, + 'data': copy.copy(test_template), + 'metadata': curr_md}) + + print(f"AFTER {[e.get('metadata', None) for e in three_entries_with_dots]}") + + mauc.sync_phone_to_server(self.testUserUUID1, three_entries_with_dots) + # we have munged, so these new entries should also be saved + # and we should have 33 entries in the usercache + self.assertEqual(len(self.uc1.getMessage()), 33) + self.assertEqual(len(list(self.ts1.find_entries())), 0) + enuah.UserCacheHandler.getUserCacheHandler(self.testUserUUID1).moveToLongTerm() + # since they were munged before saving into the usercache, + # there should be no errors while copying to the timeseries + self.assertEqual(len(self.uc1.getMessage()), 0) + self.assertEqual(len(list(self.ts1.find_entries())), 33) + if __name__ == '__main__': import emission.tests.common as etc