Skip to content

Commit

Permalink
supoort imcremental syncs using search endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Jan 3, 2024
1 parent 578b9e9 commit 5c69a69
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 67 deletions.
148 changes: 141 additions & 7 deletions tap_hubspot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
from __future__ import annotations

import sys
import requests
import datetime

from typing import Any, Callable

import requests
from singer_sdk import typing as th
from singer_sdk.pagination import BaseAPIPaginator
from singer_sdk.streams import RESTStream
from singer_sdk import typing as th

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from singer_sdk.authenticators import BearerTokenAuthenticator
from singer_sdk.streams.core import REPLICATION_INCREMENTAL
from tap_hubspot.auth import HubSpotOAuthAuthenticator

_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]
Expand Down Expand Up @@ -100,6 +103,17 @@ def get_next_page_token(
next_page_token = None
return next_page_token

def prepare_request(
self,
context: dict | None,
next_page_token: _TToken | None,
) -> requests.PreparedRequest:
if self._is_incremental_search(context):
# Search endpoints use POST request
self.path = self.incremental_path
self.rest_method = "POST"
return super().prepare_request(context, next_page_token)

def get_url_params(
self,
context: dict | None,
Expand All @@ -115,14 +129,72 @@ def get_url_params(
A dictionary of URL query parameters.
"""
params: dict = {}
params["limit"] = 100
if next_page_token:
params["after"] = next_page_token
if self.replication_key:
params["sort"] = "asc"
params["order_by"] = self.replication_key

return params

def _is_incremental_search(self, context):
return self.replication_method == REPLICATION_INCREMENTAL and self.get_starting_replication_key_value(context) and hasattr(self, "incremental_path") and self.incremental_path

def prepare_request_payload(
self,
context: dict | None,
next_page_token: _TToken | None,
) -> dict | None:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
Developers may override this method if the API requires a custom payload along
with the request. (This is generally not required for APIs which use the
HTTP 'GET' method.)
Args:
context: Stream partition or context dictionary.
next_page_token: Token, page number or any request argument to request the
next page of data.
"""
body = {}
if self._is_incremental_search(context):
# Only filter in case we have a value to filter on
# https://developers.hubspot.com/docs/api/crm/search
ts = datetime.datetime.strptime(self.get_starting_replication_key_value(context), "%Y-%m-%dT%H:%M:%S.%fZ")
# The SDK rounds up when handling datetimes sometimes so we need to subtract a second to be safe
ts = ts - datetime.timedelta(seconds=1)
# TODO: Hubspot seems to convert our epoch to local timezone when filtering so we need to offset
# before making the request.
body = {
"filterGroups": [
{
"filters": [
{
"propertyName": "lastmodifieddate",
"operator": "GTE",
# Timestamps need to be in milliseconds
# https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis
"value": str(int(ts.timestamp() * 1000)),
}
]
}
],
"sorts": [
{
# This is inside the properties object
"propertyName": self.replication_key,
"direction": "ASCENDING",
}
],
# Hubspot sets a limit of most 100 per request. Default is 10
"limit": 100,
}
if next_page_token:
body["after"] = next_page_token
return body

class DynamicHubspotStream(HubspotStream):
"""DynamicHubspotStream"""

Expand All @@ -142,15 +214,52 @@ def schema(self) -> dict:
hs_props.append(
th.Property(name, self._get_datatype(type))
)
return th.PropertiesList(
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property(
"properties", th.ObjectType(*hs_props),
),
th.Property("createdAt", th.StringType),
th.Property("updatedAt", th.StringType),
th.Property("createdAt", th.DateTimeType),
th.Property("updatedAt", th.DateTimeType),
th.Property("archived", th.BooleanType),
).to_dict()
)
if self.replication_key:
schema.append(
th.Property(
self.replication_key,
th.DateTimeType,
)
)
return schema.to_dict()

def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Optional. This method gives developers an opportunity to "clean up" the results
prior to returning records to the downstream tap - for instance: cleaning,
renaming, or appending properties to the raw record result returned from the
API.
Developers may also return `None` from this method to filter out
invalid or not-applicable records from the stream.
Args:
row: Individual record in the stream.
context: Stream partition or context dictionary.
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""
if self.replication_key:
val = None
if props := row.get("properties"):
val = props[self.replication_key]
row[self.replication_key] = val
return row

def _get_available_properties(self) -> dict[str, str]:
session = requests.Session()
Expand Down Expand Up @@ -180,4 +289,29 @@ def get_url_params(
params = super().get_url_params(context, next_page_token)
if self.hs_properties:
params["properties"] = ",".join(self.hs_properties)
if self._is_incremental_search(context):
return {}
return params

def prepare_request_payload(
self,
context: dict | None,
next_page_token: _TToken | None,
) -> dict | None:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
Developers may override this method if the API requires a custom payload along
with the request. (This is generally not required for APIs which use the
HTTP 'GET' method.)
Args:
context: Stream partition or context dictionary.
next_page_token: Token, page number or any request argument to request the
next page of data.
"""
body = super().prepare_request_payload(context, next_page_token)
if self._is_incremental_search(context):
body["properties"] = list(self.hs_properties.keys())
return body
Loading

0 comments on commit 5c69a69

Please sign in to comment.