Skip to content

Commit

Permalink
feat: add form submissions (#2)
Browse files Browse the repository at this point in the history
* fix: remove empty strings when lifting properties (#1)

* chore: indent
  • Loading branch information
pquadri authored Oct 8, 2024
1 parent 5cb8649 commit 5ad53d1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,5 @@ config.json
*~
env-vars*
.venv
.tool-versions

83 changes: 75 additions & 8 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
#!/usr/bin/env python3
import datetime
import pytz
import itertools
import json
import os
import re
import sys
import json

# pylint: disable=import-error,too-many-statements
import attr
import backoff
import pytz
import requests
import singer
import singer.messages
from singer import metrics
from singer import metadata
from singer import utils
from singer import (transform,
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
Transformer, _transform_datetime)
from singer import (
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
Transformer,
_transform_datetime,
metadata,
metrics,
transform,
utils,
)

LOGGER = singer.get_logger()
SESSION = requests.Session()
Expand Down Expand Up @@ -97,6 +101,7 @@ class StateFields:
"email_events": "/email/public/v1/events",
"contact_lists": "/contacts/v1/lists",
"forms": "/forms/v2/forms",
"form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}",
"workflows": "/automation/v3/workflows",
"owners": "/crm/v3/owners/",

Expand Down Expand Up @@ -983,6 +988,67 @@ def sync_contact_lists(STATE, ctx):

return STATE

def sync_form_submissions_by_form_id(STATE, form_guid):
schema = load_schema("form_submissions")
bookmark_key = 'last_max_submitted_at'
form_state_key = f"form_submissions_{form_guid}"

singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key])
end = utils.strptime_to_utc(get_start(STATE, form_state_key, bookmark_key))
max_bk_value = end
up_to_date = False


LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end)

url = get_url("form_submissions", form_guid=form_guid)
path = 'results'
params = {
'limit': 50
}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
while not up_to_date:
form_offset = singer.get_offset(STATE, form_state_key)

if form_offset and form_offset.get('after') is not None:
params['after'] = form_offset.get('after')
data = request(url, params).json()
for row in data[path]:
if len(row) == 0:
continue

submitted_at = utils.strptime_with_tz(
_transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING))

if submitted_at > max_bk_value:
max_bk_value = submitted_at

# since this stream returns in reverse order check to see if we've reached the data already loaded
if submitted_at < end:
STATE = singer.clear_offset(STATE, form_state_key)
up_to_date = True
LOGGER.info("Reached the end of new form submissions")
break

record = {
'guid': form_guid,
'submittedAt': row['submittedAt'],
'pageUrl': row['pageUrl'],
'values': row['values']
}
record = bumble_bee.transform(record, schema)
singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now())
if 'paging' in data:
STATE = singer.set_offset(STATE, form_state_key, 'after', data['paging']['next']['after'])
singer.write_state(STATE)
else:
STATE = singer.clear_offset(STATE, form_state_key)
singer.write_state(STATE)
LOGGER.info("No more submissions for this form")
break
STATE = singer.write_bookmark(STATE, form_state_key, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S"))
return STATE

def sync_forms(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
Expand All @@ -1003,6 +1069,7 @@ def sync_forms(STATE, ctx):
# store the current sync start in the state and not move the bookmark past this value.
sync_start_time = utils.now()
for row in data:
STATE = sync_form_submissions_by_form_id(STATE, row['guid'])
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)

if record[bookmark_key] >= start:
Expand Down
29 changes: 29 additions & 0 deletions tap_hubspot/schemas/form_submissions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "object",
"properties": {
"guid": {
"type": ["null", "string"]
},
"submittedAt": {
"type": ["null", "string"],
"format": "date-time"
},
"values": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": ["null", "string"]
},
"value": {
"type": ["null", "string"]
}
}
}
},
"pageUrl": {
"type": ["null", "string"]
}
}
}

0 comments on commit 5ad53d1

Please sign in to comment.