From 66a266bc7598d3c1d313a3da7cf83dcdc491b473 Mon Sep 17 00:00:00 2001 From: Paolo Quadri Date: Tue, 8 Oct 2024 17:42:41 +0200 Subject: [PATCH 1/2] fix: remove empty strings when lifting properties (#1) --- .gitignore | 2 + tap_hubspot/__init__.py | 85 ++++++++++++++++++++--- tap_hubspot/schemas/form_submissions.json | 29 ++++++++ 3 files changed, 107 insertions(+), 9 deletions(-) create mode 100644 tap_hubspot/schemas/form_submissions.json diff --git a/.gitignore b/.gitignore index 675377ad..da9c3b5f 100644 --- a/.gitignore +++ b/.gitignore @@ -98,3 +98,5 @@ config.json .autoenv.zsh *~ env-vars* +.venv +.tool-versions diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 89dcaf9e..ecbc4179 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -1,23 +1,27 @@ #!/usr/bin/env python3 import datetime -import pytz import itertools +import json import os import re import sys -import json + # pylint: disable=import-error,too-many-statements import attr import backoff +import pytz import requests import singer import singer.messages -from singer import metrics -from singer import metadata -from singer import utils -from singer import (transform, - UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, - Transformer, _transform_datetime) +from singer import ( + UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, + Transformer, + _transform_datetime, + metadata, + metrics, + transform, + utils, +) LOGGER = singer.get_logger() SESSION = requests.Session() @@ -97,6 +101,7 @@ class StateFields: "email_events": "/email/public/v1/events", "contact_lists": "/contacts/v1/lists", "forms": "/forms/v2/forms", + "form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", "workflows": "/automation/v3/workflows", "owners": "/crm/v3/owners/", @@ -375,7 +380,7 @@ def request(url, params=None): def lift_properties_and_versions(record): for key, value in record.get('properties', {}).items(): computed_key = "property_{}".format(key) - record[computed_key] = value + record[computed_key] = value if value != '' else None if isinstance(value, dict): versions = value.get('versions') if versions: @@ -983,6 +988,67 @@ def sync_contact_lists(STATE, ctx): return STATE +def sync_form_submissions_by_form_id(STATE, form_guid): + schema = load_schema("form_submissions") + bookmark_key = 'last_max_submitted_at' + form_state_key = f"form_submissions_{form_guid}" + + singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key]) + end = utils.strptime_to_utc(get_start(STATE, form_state_key, bookmark_key)) + max_bk_value = end + up_to_date = False + + + LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end) + + url = get_url("form_submissions", form_guid=form_guid) + path = 'results' + params = { + 'limit': 50 + } + with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: + while not up_to_date: + form_offset = singer.get_offset(STATE, form_state_key) + + if form_offset and form_offset.get('after') is not None: + params['after'] = form_offset.get('after') + data = request(url, params).json() + for row in data[path]: + if len(row) == 0: + continue + + submitted_at = utils.strptime_with_tz( + _transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)) + + if submitted_at > max_bk_value: + max_bk_value = submitted_at + + # since this stream returns in reverse order check to see if we've reached the data already loaded + if submitted_at < end: + STATE = singer.clear_offset(STATE, form_state_key) + up_to_date = True + LOGGER.info("Reached the end of new form submissions") + break + + record = { + 'guid': form_guid, + 'submittedAt': row['submittedAt'], + 'pageUrl': row['pageUrl'], + 'values': row['values'] + } + record = bumble_bee.transform(record, schema) + singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now()) + if 'paging' in data: + STATE = singer.set_offset(STATE, form_state_key, 'after', data['paging']['next']['after']) + singer.write_state(STATE) + else: + STATE = singer.clear_offset(STATE, form_state_key) + singer.write_state(STATE) + LOGGER.info("No more submissions for this form") + break + STATE = singer.write_bookmark(STATE, form_state_key, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S")) + return STATE + def sync_forms(STATE, ctx): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) @@ -1003,6 +1069,7 @@ def sync_forms(STATE, ctx): # store the current sync start in the state and not move the bookmark past this value. sync_start_time = utils.now() for row in data: + STATE = sync_form_submissions_by_form_id(STATE, row['guid']) record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata) if record[bookmark_key] >= start: diff --git a/tap_hubspot/schemas/form_submissions.json b/tap_hubspot/schemas/form_submissions.json new file mode 100644 index 00000000..9800763c --- /dev/null +++ b/tap_hubspot/schemas/form_submissions.json @@ -0,0 +1,29 @@ +{ + "type": "object", + "properties": { + "guid": { + "type": ["null", "string"] + }, + "submittedAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "values": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": ["null", "string"] + }, + "value": { + "type": ["null", "string"] + } + } + } + }, + "pageUrl": { + "type": ["null", "string"] + } + } +} From 794ca359bf86f60b240267c0aaf42b93d922bec9 Mon Sep 17 00:00:00 2001 From: Paolo Quadri Date: Tue, 8 Oct 2024 17:45:34 +0200 Subject: [PATCH 2/2] chore: indent --- tap_hubspot/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index ecbc4179..ec7e9d3d 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -101,7 +101,7 @@ class StateFields: "email_events": "/email/public/v1/events", "contact_lists": "/contacts/v1/lists", "forms": "/forms/v2/forms", - "form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", + "form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", "workflows": "/automation/v3/workflows", "owners": "/crm/v3/owners/",