-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add form submissions endpoint #96
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,7 @@ class StateFields: | |
"email_events": "/email/public/v1/events", | ||
"contact_lists": "/contacts/v1/lists", | ||
"forms": "/forms/v2/forms", | ||
"form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", | ||
"workflows": "/automation/v3/workflows", | ||
"owners": "/owners/v2/owners", | ||
} | ||
|
@@ -767,6 +768,73 @@ def sync_deal_pipelines(STATE, ctx): | |
singer.write_state(STATE) | ||
return STATE | ||
|
||
def sync_form_submissions(STATE, ctx): | ||
data = request(get_url("forms")).json() | ||
|
||
for row in data: | ||
STATE = _sync_form_submissions_by_form_id(STATE, row['guid']) | ||
singer.write_state(STATE) | ||
|
||
return STATE | ||
|
||
def _sync_form_submissions_by_form_id(STATE, form_guid): | ||
schema = load_schema("form_submissions") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like the |
||
bookmark_key = 'last_max_submitted_at' | ||
|
||
singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key]) | ||
end = utils.strptime_with_tz(get_start(STATE, form_guid, bookmark_key)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
max_bk_value = end | ||
up_to_date = False | ||
|
||
LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end) | ||
|
||
url = get_url("form_submissions", form_guid=form_guid) | ||
path = 'results' | ||
params = { | ||
'count': 50 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be limit? I don't see a count parameter available. |
||
} | ||
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: | ||
while up_to_date == False: | ||
form_offset = singer.get_offset(STATE, form_guid) | ||
|
||
if bool(form_offset) and form_offset.get('after') != None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
params['after'] = form_offset.get('after') | ||
data = request(url, params).json() | ||
for row in data[path]: | ||
if len(row) == 0: | ||
continue | ||
|
||
submitted_at = utils.strptime_with_tz( | ||
_transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)) | ||
|
||
if submitted_at > max_bk_value: | ||
max_bk_value = submitted_at | ||
|
||
if submitted_at <= end: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could use a comment documenting that the stream is returned in reverse order. (It just tripped me up reading it heh) |
||
STATE = singer.clear_offset(STATE, form_guid) | ||
up_to_date = True | ||
LOGGER.info("Reached the end of new form submissions") | ||
break | ||
|
||
record = { | ||
'guid': form_guid, | ||
'submittedAt': row['submittedAt'], | ||
'pageUrl': row['pageUrl'], | ||
'values': row['values'] | ||
} | ||
record = bumble_bee.transform(record, schema) | ||
singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now()) | ||
if 'paging' in data: | ||
STATE = singer.set_offset(STATE, form_guid, 'after', data['paging']['next']['after']) | ||
singer.write_state(STATE) | ||
else: | ||
STATE = singer.clear_offset(STATE, form_guid) | ||
singer.write_state(STATE) | ||
LOGGER.info("No more submissions for this form") | ||
break | ||
STATE = singer.write_bookmark(STATE, form_guid, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a subtle edge case with this replication strategy that I'm working on right now in #98. If you expect this stream to be high in volume, it could take awhile to sync and there's a kind of race condition with how the updates come in that can cause some to get skipped. #91 has a short illustration of how this can play out. The best solution I have right now that's resilient to tap failures is storing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the bookmark should probably not be written to state until after all forms have been checked, to limit the strange edge cases that can come up if the tap is interrupted. This can return the |
||
return STATE | ||
|
||
@attr.s | ||
class Stream(object): | ||
tap_stream_id = attr.ib() | ||
|
@@ -779,6 +847,7 @@ class Stream(object): | |
# Do these first as they are incremental | ||
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'), | ||
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'), | ||
Stream('form_submissions', sync_form_submissions, ['guid', 'submittedAt', 'pageUrl'], 'submittedAt', 'INCREMENTAL'), | ||
|
||
# Do these last as they are full table | ||
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs say this can only be used for some types of forms. Can/Should that be checked here to prevent possible errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use the endpoint
/forms/v2/forms
to get the list of forms to iterate through.By default non-marketing forms are filtered out of this endpoint
according to the docs so I think we're okay.