From 5c69a696e476aa3565d5baa1debebcea3bfaaef3 Mon Sep 17 00:00:00 2001 From: pnadolny13 Date: Wed, 3 Jan 2024 11:19:44 -0500 Subject: [PATCH] supoort imcremental syncs using search endpoints --- tap_hubspot/client.py | 148 +++++++++++++++++++++++++++++++++++++++-- tap_hubspot/streams.py | 79 ++++++---------------- 2 files changed, 160 insertions(+), 67 deletions(-) diff --git a/tap_hubspot/client.py b/tap_hubspot/client.py index 8ada9df..15920f9 100644 --- a/tap_hubspot/client.py +++ b/tap_hubspot/client.py @@ -3,12 +3,14 @@ from __future__ import annotations import sys +import requests +import datetime + from typing import Any, Callable -import requests +from singer_sdk import typing as th from singer_sdk.pagination import BaseAPIPaginator from singer_sdk.streams import RESTStream -from singer_sdk import typing as th if sys.version_info >= (3, 8): from functools import cached_property @@ -16,6 +18,7 @@ from cached_property import cached_property from singer_sdk.authenticators import BearerTokenAuthenticator +from singer_sdk.streams.core import REPLICATION_INCREMENTAL from tap_hubspot.auth import HubSpotOAuthAuthenticator _Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest] @@ -100,6 +103,17 @@ def get_next_page_token( next_page_token = None return next_page_token + def prepare_request( + self, + context: dict | None, + next_page_token: _TToken | None, + ) -> requests.PreparedRequest: + if self._is_incremental_search(context): + # Search endpoints use POST request + self.path = self.incremental_path + self.rest_method = "POST" + return super().prepare_request(context, next_page_token) + def get_url_params( self, context: dict | None, @@ -115,14 +129,72 @@ def get_url_params( A dictionary of URL query parameters. """ params: dict = {} + params["limit"] = 100 if next_page_token: params["after"] = next_page_token if self.replication_key: params["sort"] = "asc" params["order_by"] = self.replication_key - return params + def _is_incremental_search(self, context): + return self.replication_method == REPLICATION_INCREMENTAL and self.get_starting_replication_key_value(context) and hasattr(self, "incremental_path") and self.incremental_path + + def prepare_request_payload( + self, + context: dict | None, + next_page_token: _TToken | None, + ) -> dict | None: + """Prepare the data payload for the REST API request. + + By default, no payload will be sent (return None). + + Developers may override this method if the API requires a custom payload along + with the request. (This is generally not required for APIs which use the + HTTP 'GET' method.) + + Args: + context: Stream partition or context dictionary. + next_page_token: Token, page number or any request argument to request the + next page of data. + """ + body = {} + if self._is_incremental_search(context): + # Only filter in case we have a value to filter on + # https://developers.hubspot.com/docs/api/crm/search + ts = datetime.datetime.strptime(self.get_starting_replication_key_value(context), "%Y-%m-%dT%H:%M:%S.%fZ") + # The SDK rounds up when handling datetimes sometimes so we need to subtract a second to be safe + ts = ts - datetime.timedelta(seconds=1) + # TODO: Hubspot seems to convert our epoch to local timezone when filtering so we need to offset + # before making the request. + body = { + "filterGroups": [ + { + "filters": [ + { + "propertyName": "lastmodifieddate", + "operator": "GTE", + # Timestamps need to be in milliseconds + # https://legacydocs.hubspot.com/docs/faq/how-should-timestamps-be-formatted-for-hubspots-apis + "value": str(int(ts.timestamp() * 1000)), + } + ] + } + ], + "sorts": [ + { + # This is inside the properties object + "propertyName": self.replication_key, + "direction": "ASCENDING", + } + ], + # Hubspot sets a limit of most 100 per request. Default is 10 + "limit": 100, + } + if next_page_token: + body["after"] = next_page_token + return body + class DynamicHubspotStream(HubspotStream): """DynamicHubspotStream""" @@ -142,15 +214,52 @@ def schema(self) -> dict: hs_props.append( th.Property(name, self._get_datatype(type)) ) - return th.PropertiesList( + schema = th.PropertiesList( th.Property("id", th.StringType), th.Property( "properties", th.ObjectType(*hs_props), ), - th.Property("createdAt", th.StringType), - th.Property("updatedAt", th.StringType), + th.Property("createdAt", th.DateTimeType), + th.Property("updatedAt", th.DateTimeType), th.Property("archived", th.BooleanType), - ).to_dict() + ) + if self.replication_key: + schema.append( + th.Property( + self.replication_key, + th.DateTimeType, + ) + ) + return schema.to_dict() + + def post_process( + self, + row: dict, + context: dict | None = None, # noqa: ARG002 + ) -> dict | None: + """As needed, append or transform raw data to match expected structure. + + Optional. This method gives developers an opportunity to "clean up" the results + prior to returning records to the downstream tap - for instance: cleaning, + renaming, or appending properties to the raw record result returned from the + API. + + Developers may also return `None` from this method to filter out + invalid or not-applicable records from the stream. + + Args: + row: Individual record in the stream. + context: Stream partition or context dictionary. + + Returns: + The resulting record dict, or `None` if the record should be excluded. + """ + if self.replication_key: + val = None + if props := row.get("properties"): + val = props[self.replication_key] + row[self.replication_key] = val + return row def _get_available_properties(self) -> dict[str, str]: session = requests.Session() @@ -180,4 +289,29 @@ def get_url_params( params = super().get_url_params(context, next_page_token) if self.hs_properties: params["properties"] = ",".join(self.hs_properties) + if self._is_incremental_search(context): + return {} return params + + def prepare_request_payload( + self, + context: dict | None, + next_page_token: _TToken | None, + ) -> dict | None: + """Prepare the data payload for the REST API request. + + By default, no payload will be sent (return None). + + Developers may override this method if the API requires a custom payload along + with the request. (This is generally not required for APIs which use the + HTTP 'GET' method.) + + Args: + context: Stream partition or context dictionary. + next_page_token: Token, page number or any request argument to request the + next page of data. + """ + body = super().prepare_request_payload(context, next_page_token) + if self._is_incremental_search(context): + body["properties"] = list(self.hs_properties.keys()) + return body diff --git a/tap_hubspot/streams.py b/tap_hubspot/streams.py index 7863c11..3da4009 100644 --- a/tap_hubspot/streams.py +++ b/tap_hubspot/streams.py @@ -35,8 +35,9 @@ class ContactStream(DynamicHubspotStream): name = "contacts" path = "/objects/contacts" + incremental_path = "/objects/contacts/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -66,8 +67,6 @@ class UsersStream(HubspotStream): name = "users" path = "/users" primary_keys = ["id"] - replication_key = "id" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -104,8 +103,6 @@ class OwnersStream(HubspotStream): name = "owners" path = "/owners" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -145,8 +142,6 @@ class TicketPipelineStream(HubspotStream): name = "ticket_pipelines" path = "/pipelines/tickets" primary_keys = ["createdAt"] - replication_key = "createdAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -207,8 +202,6 @@ class DealPipelineStream(HubspotStream): name = "deal_pipelines" path = "/pipelines/deals" primary_keys = ["createdAt"] - replication_key = "createdAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -269,8 +262,6 @@ class EmailSubscriptionStream(HubspotStream): name = "email_subscriptions" path = "/subscriptions" primary_keys = ["id"] - replication_key = "id" - replication_method = "INCREMENTAL" records_jsonpath = "$[subscriptionDefinitions][*]" # Or override `parse_response`. schema = PropertiesList( @@ -312,8 +303,6 @@ class PropertyTicketStream(HubspotStream): name = "property_tickets" path = "/properties/tickets" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -382,8 +371,6 @@ class PropertyDealStream(HubspotStream): name = "property_deals" path = "/properties/deals" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -453,8 +440,6 @@ class PropertyContactStream(HubspotStream): name = "property_contacts" path = "/properties/contacts" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -523,8 +508,6 @@ class PropertyCompanyStream(HubspotStream): name = "property_companies" path = "/properties/company" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -593,8 +576,6 @@ class PropertyProductStream(HubspotStream): name = "property_products" path = "/properties/product" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -663,8 +644,6 @@ class PropertyLineItemStream(HubspotStream): name = "property_line_items" path = "/properties/line_item" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -733,8 +712,6 @@ class PropertyEmailStream(HubspotStream): name = "property_emails" path = "/properties/email" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -803,8 +780,6 @@ class PropertyPostalMailStream(HubspotStream): name = "property_postal_mails" path = "/properties/postal_mail" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -873,8 +848,6 @@ class PropertyCallStream(HubspotStream): name = "property_calls" path = "/properties/call" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -943,8 +916,6 @@ class PropertyMeetingStream(HubspotStream): name = "property_meetings" path = "/properties/meeting" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1013,8 +984,6 @@ class PropertyTaskStream(HubspotStream): name = "property_tasks" path = "/properties/task" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1083,8 +1052,6 @@ class PropertyCommunicationStream(HubspotStream): name = "property_communications" path = "/properties/communication" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1153,8 +1120,6 @@ class PropertyNotesStream(HubspotStream): name = "properties" path = "/properties/notes" primary_keys = ["label"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1261,8 +1226,9 @@ class CompanyStream(DynamicHubspotStream): name = "companies" path = "/objects/companies" + incremental_path = "/objects/companies/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1290,9 +1256,10 @@ class DealStream(DynamicHubspotStream): name = "deals" path = "/objects/deals" + incremental_path = "/objects/deals/search" primary_keys = ["id"] replication_key = "updatedAt" - replication_method = "INCREMENTAL" + replication_method = "lastmodifieddate" records_jsonpath = "$[results][*]" # Or override `parse_response`. @property @@ -1320,8 +1287,6 @@ class FeedbackSubmissionsStream(HubspotStream): name = "feedback_submissions" path = "/objects/feedback_submissions" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1369,8 +1334,6 @@ class LineItemStream(HubspotStream): name = "line_items" path = "/objects/line_items" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1418,8 +1381,6 @@ class ProductStream(HubspotStream): name = "products" path = "/objects/products" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1467,8 +1428,6 @@ class TicketStream(HubspotStream): name = "tickets" path = "/objects/tickets" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1515,8 +1474,6 @@ class QuoteStream(HubspotStream): name = "quotes" path = "/objects/quotes" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1564,8 +1521,6 @@ class GoalStream(HubspotStream): name = "goals" path = "/objects/goal_targets" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1611,8 +1566,9 @@ class CallStream(DynamicHubspotStream): name = "calls" path = "/objects/calls" + incremental_path = "/objects/calls/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1639,9 +1595,10 @@ class CommunicationStream(DynamicHubspotStream): """ name = "communications" - path = "/objects/Communications" + path = "/objects/communications" + incremental_path = "/objects/communications/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1670,8 +1627,6 @@ class EmailStream(HubspotStream): name = "emails" path = "/objects/emails" primary_keys = ["id"] - replication_key = "updatedAt" - replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. schema = PropertiesList( @@ -1724,8 +1679,9 @@ class MeetingStream(DynamicHubspotStream): name = "meetings" path = "/objects/meetings" + incremental_path = "/objects/meetings/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1753,8 +1709,9 @@ class NoteStream(DynamicHubspotStream): name = "notes" path = "/objects/notes" + incremental_path = "/objects/notes/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1782,8 +1739,9 @@ class PostalMailStream(DynamicHubspotStream): name = "postal_mail" path = "/objects/postal_mail" + incremental_path = "/objects/postal_mail/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`. @@ -1811,8 +1769,9 @@ class TaskStream(DynamicHubspotStream): name = "tasks" path = "/objects/tasks" + incremental_path = "/objects/tasks/search" primary_keys = ["id"] - replication_key = "updatedAt" + replication_key = "lastmodifieddate" replication_method = "INCREMENTAL" records_jsonpath = "$[results][*]" # Or override `parse_response`.