Skip to content

Commit

Permalink
source-hubspot-native: search API breaks out of cycles with more than…
Browse files Browse the repository at this point in the history
… 10k records

There is an undocumented failure of the search API when trying to page beyond
10,000 results. It's also possible that there are more than 10,000 records
modified at the same time.

Previously the only way out of this was to re-backfill the binding. This adds a
cycle-breaker routine that walks ids in ascending order that were all modified
at the exact same time to avoid looping forever.
  • Loading branch information
williamhbaker committed Nov 4, 2024
1 parent 6575dbd commit cce16ee
Showing 1 changed file with 104 additions and 5 deletions.
109 changes: 104 additions & 5 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,8 @@ async def fetch_search_objects(
Multiple records can have the same "last modified" property value, and
indeed large runs of these may have the same value. So a "new" search will
inevitably grab some records again, and deduplication is handled here.
Technically it is possible for this strategy to not work at all if there are
more than 10,000 records with the same modification time, but we'll cross
our fingers and hope that is impossible in practice.
inevitably grab some records again, and deduplication is handled here via
the set.
'''

url = f"{HUB}/crm/v3/objects/{object_name}/search"
Expand Down Expand Up @@ -560,6 +558,26 @@ async def fetch_search_objects(
# the search API, so a new search must be initiated to complete the
# request.
cursor = None

if since == max_updated:
log.info(
"cycle detected for lastmodifieddate, fetching all ids for records modified at that instant",
{"object_name": object_name, "instant": since},
)
output_items.update(await fetch_search_objects_modified_at(
object_name, log, http, max_updated, last_modified_property_name
))

# HubSpot APIs use millisecond resolution, so move the time
# cursor forward by that minimum amount now that we know we have
# all of the records modified at the common `max_updated` time.
max_updated = max_updated + timedelta(milliseconds=1)

if until and max_updated > until:
# Unlikely edge case, but this would otherwise result in an
# error from the API.
break

since = max_updated
log.info(
"initiating a new search to satisfy requested time range",
Expand All @@ -573,12 +591,93 @@ async def fetch_search_objects(
},
)


# Sort newest to oldest to match the convention of most of "fetch ID"
# functions.
return sorted(list(output_items), reverse=True), None


async def fetch_search_objects_modified_at(
object_name: str,
log: Logger,
http: HTTPSession,
modified: datetime,
last_modified_property_name: str = "hs_lastmodifieddate"
) -> set[tuple[datetime, str]]:
'''
Fetch all of the ids of the given object that were modified at the given
time. Used exclusively for breaking out of cycles in the search API
resulting from more than 10,000 records being modified at the same time,
which is unfortunately a thing that can happen.
To simplify the pagination strategy, the actual `paging` result isn't used
at all other than to see when we have reached the end, and the search query
just always asks for ids larger than what it had previously seen.
'''

url = f"{HUB}/crm/v3/objects/{object_name}/search"
limit = 200
output_items: set[tuple[datetime, str]] = set()
id_cursor: int | None = None
round = 0

while True:
filters: list[dict[str, Any]] = [{
"propertyName": last_modified_property_name,
"operator": "EQ",
"value": modified.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
}]

if id_cursor:
filters.append({
"propertyName": "hs_object_id",
"operator": "GT",
"value": id_cursor,
})

input = {
"filters": filters,
"sorts": [
{
"propertyName": "hs_object_id",
"direction": "ASCENDING"
}
],
"limit": limit,
}

# Log every 10,000 returned records, since there are 200 per page.
if round % 50 == 0:
log.info(
"fetching ids for records modified at instant",
{
"object_name": object_name,
"instant": modified,
"count": len(output_items),
"remaining": result.total,
}
)

result: SearchPageResult[CustomObjectSearchResult] = SearchPageResult[CustomObjectSearchResult].model_validate_json(
await http.request(log, url, method="POST", json=input)
)

for r in result.results:
if id_cursor and r.id <= id_cursor:
# This should _really_ never happen, but HubSpot is weird so if
# it does I want to know about it and will come back later and
# figure it out.
raise Exception(f"unexpected id order: {r.id} <= {id_cursor}")
id_cursor = r.id
output_items.add((r.properties.hs_lastmodifieddate, str(r.id)))

if not result.paging:
break

round += 1

return output_items


def fetch_recent_custom_objects(
object_name: str, log: Logger, http: HTTPSession, since: datetime, until: datetime | None
) -> AsyncGenerator[tuple[datetime, str, CustomObject], None]:
Expand Down

0 comments on commit cce16ee

Please sign in to comment.