From cce16ee18d5e4470d626d71e2027b30a2b6ed18b Mon Sep 17 00:00:00 2001 From: Will Baker Date: Mon, 4 Nov 2024 13:10:29 -0500 Subject: [PATCH] source-hubspot-native: search API breaks out of cycles with more than 10k records There is an undocumented failure of the search API when trying to page beyond 10,000 results. It's also possible that there are more than 10,000 records modified at the same time. Previously the only way out of this was to re-backfill the binding. This adds a cycle-breaker routine that walks ids in ascending order that were all modified at the exact same time to avoid looping forever. --- .../source_hubspot_native/api.py | 109 +++++++++++++++++- 1 file changed, 104 insertions(+), 5 deletions(-) diff --git a/source-hubspot-native/source_hubspot_native/api.py b/source-hubspot-native/source_hubspot_native/api.py index c0db2e71e..aa1d15b3f 100644 --- a/source-hubspot-native/source_hubspot_native/api.py +++ b/source-hubspot-native/source_hubspot_native/api.py @@ -485,10 +485,8 @@ async def fetch_search_objects( Multiple records can have the same "last modified" property value, and indeed large runs of these may have the same value. So a "new" search will - inevitably grab some records again, and deduplication is handled here. - Technically it is possible for this strategy to not work at all if there are - more than 10,000 records with the same modification time, but we'll cross - our fingers and hope that is impossible in practice. + inevitably grab some records again, and deduplication is handled here via + the set. ''' url = f"{HUB}/crm/v3/objects/{object_name}/search" @@ -560,6 +558,26 @@ async def fetch_search_objects( # the search API, so a new search must be initiated to complete the # request. cursor = None + + if since == max_updated: + log.info( + "cycle detected for lastmodifieddate, fetching all ids for records modified at that instant", + {"object_name": object_name, "instant": since}, + ) + output_items.update(await fetch_search_objects_modified_at( + object_name, log, http, max_updated, last_modified_property_name + )) + + # HubSpot APIs use millisecond resolution, so move the time + # cursor forward by that minimum amount now that we know we have + # all of the records modified at the common `max_updated` time. + max_updated = max_updated + timedelta(milliseconds=1) + + if until and max_updated > until: + # Unlikely edge case, but this would otherwise result in an + # error from the API. + break + since = max_updated log.info( "initiating a new search to satisfy requested time range", @@ -573,12 +591,93 @@ async def fetch_search_objects( }, ) - # Sort newest to oldest to match the convention of most of "fetch ID" # functions. return sorted(list(output_items), reverse=True), None +async def fetch_search_objects_modified_at( + object_name: str, + log: Logger, + http: HTTPSession, + modified: datetime, + last_modified_property_name: str = "hs_lastmodifieddate" +) -> set[tuple[datetime, str]]: + ''' + Fetch all of the ids of the given object that were modified at the given + time. Used exclusively for breaking out of cycles in the search API + resulting from more than 10,000 records being modified at the same time, + which is unfortunately a thing that can happen. + + To simplify the pagination strategy, the actual `paging` result isn't used + at all other than to see when we have reached the end, and the search query + just always asks for ids larger than what it had previously seen. + ''' + + url = f"{HUB}/crm/v3/objects/{object_name}/search" + limit = 200 + output_items: set[tuple[datetime, str]] = set() + id_cursor: int | None = None + round = 0 + + while True: + filters: list[dict[str, Any]] = [{ + "propertyName": last_modified_property_name, + "operator": "EQ", + "value": modified.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + }] + + if id_cursor: + filters.append({ + "propertyName": "hs_object_id", + "operator": "GT", + "value": id_cursor, + }) + + input = { + "filters": filters, + "sorts": [ + { + "propertyName": "hs_object_id", + "direction": "ASCENDING" + } + ], + "limit": limit, + } + + # Log every 10,000 returned records, since there are 200 per page. + if round % 50 == 0: + log.info( + "fetching ids for records modified at instant", + { + "object_name": object_name, + "instant": modified, + "count": len(output_items), + "remaining": result.total, + } + ) + + result: SearchPageResult[CustomObjectSearchResult] = SearchPageResult[CustomObjectSearchResult].model_validate_json( + await http.request(log, url, method="POST", json=input) + ) + + for r in result.results: + if id_cursor and r.id <= id_cursor: + # This should _really_ never happen, but HubSpot is weird so if + # it does I want to know about it and will come back later and + # figure it out. + raise Exception(f"unexpected id order: {r.id} <= {id_cursor}") + id_cursor = r.id + output_items.add((r.properties.hs_lastmodifieddate, str(r.id))) + + if not result.paging: + break + + round += 1 + + return output_items + + def fetch_recent_custom_objects( object_name: str, log: Logger, http: HTTPSession, since: datetime, until: datetime | None ) -> AsyncGenerator[tuple[datetime, str, CustomObject], None]: