From 5ad53d1102ddef8809a4def008ed232626ff694a Mon Sep 17 00:00:00 2001 From: Paolo Quadri Date: Tue, 8 Oct 2024 18:12:24 +0200 Subject: [PATCH] feat: add form submissions (#2) * fix: remove empty strings when lifting properties (#1) * chore: indent --- .gitignore | 2 + tap_hubspot/__init__.py | 83 ++++++++++++++++++++--- tap_hubspot/schemas/form_submissions.json | 29 ++++++++ 3 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 tap_hubspot/schemas/form_submissions.json diff --git a/.gitignore b/.gitignore index 91431767..467b7a3b 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,5 @@ config.json *~ env-vars* .venv +.tool-versions + diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 41227dde..ec7e9d3d 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -1,23 +1,27 @@ #!/usr/bin/env python3 import datetime -import pytz import itertools +import json import os import re import sys -import json + # pylint: disable=import-error,too-many-statements import attr import backoff +import pytz import requests import singer import singer.messages -from singer import metrics -from singer import metadata -from singer import utils -from singer import (transform, - UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, - Transformer, _transform_datetime) +from singer import ( + UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, + Transformer, + _transform_datetime, + metadata, + metrics, + transform, + utils, +) LOGGER = singer.get_logger() SESSION = requests.Session() @@ -97,6 +101,7 @@ class StateFields: "email_events": "/email/public/v1/events", "contact_lists": "/contacts/v1/lists", "forms": "/forms/v2/forms", + "form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", "workflows": "/automation/v3/workflows", "owners": "/crm/v3/owners/", @@ -983,6 +988,67 @@ def sync_contact_lists(STATE, ctx): return STATE +def sync_form_submissions_by_form_id(STATE, form_guid): + schema = load_schema("form_submissions") + bookmark_key = 'last_max_submitted_at' + form_state_key = f"form_submissions_{form_guid}" + + singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key]) + end = utils.strptime_to_utc(get_start(STATE, form_state_key, bookmark_key)) + max_bk_value = end + up_to_date = False + + + LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end) + + url = get_url("form_submissions", form_guid=form_guid) + path = 'results' + params = { + 'limit': 50 + } + with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: + while not up_to_date: + form_offset = singer.get_offset(STATE, form_state_key) + + if form_offset and form_offset.get('after') is not None: + params['after'] = form_offset.get('after') + data = request(url, params).json() + for row in data[path]: + if len(row) == 0: + continue + + submitted_at = utils.strptime_with_tz( + _transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)) + + if submitted_at > max_bk_value: + max_bk_value = submitted_at + + # since this stream returns in reverse order check to see if we've reached the data already loaded + if submitted_at < end: + STATE = singer.clear_offset(STATE, form_state_key) + up_to_date = True + LOGGER.info("Reached the end of new form submissions") + break + + record = { + 'guid': form_guid, + 'submittedAt': row['submittedAt'], + 'pageUrl': row['pageUrl'], + 'values': row['values'] + } + record = bumble_bee.transform(record, schema) + singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now()) + if 'paging' in data: + STATE = singer.set_offset(STATE, form_state_key, 'after', data['paging']['next']['after']) + singer.write_state(STATE) + else: + STATE = singer.clear_offset(STATE, form_state_key) + singer.write_state(STATE) + LOGGER.info("No more submissions for this form") + break + STATE = singer.write_bookmark(STATE, form_state_key, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S")) + return STATE + def sync_forms(STATE, ctx): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) @@ -1003,6 +1069,7 @@ def sync_forms(STATE, ctx): # store the current sync start in the state and not move the bookmark past this value. sync_start_time = utils.now() for row in data: + STATE = sync_form_submissions_by_form_id(STATE, row['guid']) record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata) if record[bookmark_key] >= start: diff --git a/tap_hubspot/schemas/form_submissions.json b/tap_hubspot/schemas/form_submissions.json new file mode 100644 index 00000000..9800763c --- /dev/null +++ b/tap_hubspot/schemas/form_submissions.json @@ -0,0 +1,29 @@ +{ + "type": "object", + "properties": { + "guid": { + "type": ["null", "string"] + }, + "submittedAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "values": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": ["null", "string"] + }, + "value": { + "type": ["null", "string"] + } + } + } + }, + "pageUrl": { + "type": ["null", "string"] + } + } +}