Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add form submissions endpoint #96

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This tap:
- [Email Events](http://developers.hubspot.com/docs/methods/email/get_events)
- [Engagements](https://developers.hubspot.com/docs/methods/engagements/get-all-engagements)
- [Forms](http://developers.hubspot.com/docs/methods/forms/v2/get_forms)
- [Forms Submissions](https://developers.hubspot.com/docs/methods/forms/get-submissions-for-a-form)
- [Keywords](http://developers.hubspot.com/docs/methods/keywords/get_keywords)
- [Owners](http://developers.hubspot.com/docs/methods/owners/get_owners)
- [Subscription Changes](http://developers.hubspot.com/docs/methods/email/get_subscriptions_timeline)
Expand Down
69 changes: 69 additions & 0 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class StateFields:
"email_events": "/email/public/v1/events",
"contact_lists": "/contacts/v1/lists",
"forms": "/forms/v2/forms",
"form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}",
"workflows": "/automation/v3/workflows",
"owners": "/owners/v2/owners",
}
Expand Down Expand Up @@ -767,6 +768,73 @@ def sync_deal_pipelines(STATE, ctx):
singer.write_state(STATE)
return STATE

def sync_form_submissions(STATE, ctx):
data = request(get_url("forms")).json()

for row in data:
STATE = _sync_form_submissions_by_form_id(STATE, row['guid'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs say this can only be used for some types of forms. Can/Should that be checked here to prevent possible errors?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use the endpoint /forms/v2/forms to get the list of forms to iterate through. By default non-marketing forms are filtered out of this endpoint according to the docs so I think we're okay.

singer.write_state(STATE)

return STATE

def _sync_form_submissions_by_form_id(STATE, form_guid):
schema = load_schema("form_submissions")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the schemas/form_submissions.json file needs added to the PR?

bookmark_key = 'last_max_submitted_at'

singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key])
end = utils.strptime_with_tz(get_start(STATE, form_guid, bookmark_key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utils.strptime_to_utc is the recommended function to use for consistency (unless you need the original timezone).

max_bk_value = end
up_to_date = False

LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end)

url = get_url("form_submissions", form_guid=form_guid)
path = 'results'
params = {
'count': 50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be limit? I don't see a count parameter available.

}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
while up_to_date == False:
form_offset = singer.get_offset(STATE, form_guid)

if bool(form_offset) and form_offset.get('after') != None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool(form_offset) can just be changed to form_offset

params['after'] = form_offset.get('after')
data = request(url, params).json()
for row in data[path]:
if len(row) == 0:
continue

submitted_at = utils.strptime_with_tz(
_transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING))

if submitted_at > max_bk_value:
max_bk_value = submitted_at

if submitted_at <= end:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a comment documenting that the stream is returned in reverse order. (It just tripped me up reading it heh)

STATE = singer.clear_offset(STATE, form_guid)
up_to_date = True
LOGGER.info("Reached the end of new form submissions")
break

record = {
'guid': form_guid,
'submittedAt': row['submittedAt'],
'pageUrl': row['pageUrl'],
'values': row['values']
}
record = bumble_bee.transform(record, schema)
singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now())
if 'paging' in data:
STATE = singer.set_offset(STATE, form_guid, 'after', data['paging']['next']['after'])
singer.write_state(STATE)
else:
STATE = singer.clear_offset(STATE, form_guid)
singer.write_state(STATE)
LOGGER.info("No more submissions for this form")
break
STATE = singer.write_bookmark(STATE, form_guid, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a subtle edge case with this replication strategy that I'm working on right now in #98. If you expect this stream to be high in volume, it could take awhile to sync and there's a kind of race condition with how the updates come in that can cause some to get skipped. #91 has a short illustration of how this can play out.

The best solution I have right now that's resilient to tap failures is storing the current_sync_start in the state and making sure the bookmark doesn't get set above that value. (see #98) It might be worth it to try that pattern here, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the bookmark should probably not be written to state until after all forms have been checked, to limit the strange edge cases that can come up if the tap is interrupted. This can return the max_bk_value and accept it as a parameter to maintain it over the whole stream sync. My comment above was assuming that would be the case.

return STATE

@attr.s
class Stream(object):
tap_stream_id = attr.ib()
Expand All @@ -779,6 +847,7 @@ class Stream(object):
# Do these first as they are incremental
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'),
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'),
Stream('form_submissions', sync_form_submissions, ['guid', 'submittedAt', 'pageUrl'], 'submittedAt', 'INCREMENTAL'),

# Do these last as they are full table
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'),
Expand Down