diff --git a/tap_hubspot/client.py b/tap_hubspot/client.py index 15920f9..77c386c 100644 --- a/tap_hubspot/client.py +++ b/tap_hubspot/client.py @@ -5,6 +5,7 @@ import sys import requests import datetime +import pytz from typing import Any, Callable @@ -163,36 +164,45 @@ def prepare_request_payload( # Only filter in case we have a value to filter on # https://developers.hubspot.com/docs/api/crm/search ts = datetime.datetime.strptime(self.get_starting_replication_key_value(context), "%Y-%m-%dT%H:%M:%S.%fZ") + ts = pytz.utc.localize(ts) + if next_page_token: + # Hubspot wont return more than 10k records so when we hit 10k we + # need to reset our epoch to most recent and not send the next_page_token + if int(next_page_token) + 100 >= 10000: + ts = self.get_replication_key_signpost(context) + else: + body["after"] = next_page_token # The SDK rounds up when handling datetimes sometimes so we need to subtract a second to be safe ts = ts - datetime.timedelta(seconds=1) - # TODO: Hubspot seems to convert our epoch to local timezone when filtering so we need to offset - # before making the request. - body = { - "filterGroups": [ - { - "filters": [ - { - "propertyName": "lastmodifieddate", - "operator": "GTE", - # Timestamps need to be in milliseconds - # https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis - "value": str(int(ts.timestamp() * 1000)), - } - ] - } - ], - "sorts": [ - { - # This is inside the properties object - "propertyName": self.replication_key, - "direction": "ASCENDING", - } - ], - # Hubspot sets a limit of most 100 per request. Default is 10 - "limit": 100, - } - if next_page_token: - body["after"] = next_page_token + epoch_ts = str(int(ts.timestamp() * 1000)) + + body.update( + { + "filterGroups": [ + { + "filters": [ + { + "propertyName": self.replication_key, + "operator": "GTE", + # Timestamps need to be in milliseconds + # https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis + "value": epoch_ts, + } + ] + } + ], + "sorts": [ + { + # This is inside the properties object + "propertyName": self.replication_key, + "direction": "ASCENDING", + } + ], + # Hubspot sets a limit of most 100 per request. Default is 10 + "limit": 100, + } + ) + return body class DynamicHubspotStream(HubspotStream): @@ -254,7 +264,7 @@ def post_process( Returns: The resulting record dict, or `None` if the record should be excluded. """ - if self.replication_key: + if self._is_incremental_search(context): val = None if props := row.get("properties"): val = props[self.replication_key] @@ -286,11 +296,12 @@ def get_url_params( Returns: A dictionary of URL query parameters. """ + if self._is_incremental_search(context): + return {} params = super().get_url_params(context, next_page_token) if self.hs_properties: params["properties"] = ",".join(self.hs_properties) - if self._is_incremental_search(context): - return {} + return params def prepare_request_payload(