-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-zendesk-support: fix Groups and Organizations streams #2007
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ | |
from abc import ABC | ||
from collections import deque | ||
from concurrent.futures import Future, ProcessPoolExecutor | ||
from datetime import datetime, timedelta | ||
from datetime import datetime, timedelta, UTC | ||
from functools import partial | ||
from math import ceil | ||
from pickle import PickleError, dumps | ||
|
@@ -74,6 +74,18 @@ def to_int(s): | |
return s | ||
|
||
|
||
def _s_to_dt_str(s: int) -> str: | ||
""" | ||
Converts a UNIX timestamp in seconds to a date-time formatted string. | ||
""" | ||
return datetime.fromtimestamp(s, tz=UTC).isoformat(' ') | ||
|
||
def _dt_str_to_s(dt_str: str) -> int: | ||
""" | ||
Converts a date-time formatted string to a UNIX timestamp in seconds. | ||
""" | ||
return int(datetime.fromisoformat(dt_str).timestamp()) | ||
|
||
class SourceZendeskException(Exception): | ||
"""default exception of custom SourceZendesk logic""" | ||
|
||
|
@@ -507,6 +519,74 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp | |
yield record | ||
|
||
|
||
class SourceZendeskSupportIncrementalTimeExportStream(SourceZendeskSupportFullRefreshStream, IncrementalMixin): | ||
""" | ||
Incremental time-based export streams. | ||
API docs: https://developer.zendesk.com/documentation/ticketing/managing-tickets/using-the-incremental-export-api/#time-based-incremental-exports | ||
Airbyte Incremental Stream Docs: https://docs.airbyte.com/connector-development/cdk-python/incremental-stream for some background. | ||
|
||
Uses IncrementalMixin's state setter & getter to persist cursors between requests. | ||
|
||
Note: Incremental time-based export streams theoretically can get stuck in a loop if | ||
1000+ resources are updated at the exact same time. | ||
""" | ||
state_checkpoint_interval = 1000 | ||
_cursor_value = "" | ||
|
||
@property | ||
def cursor_field(self) -> str: | ||
"""Name of the field associated with the state""" | ||
return "updated_at" | ||
|
||
@property | ||
def state(self) -> Mapping[str, Any]: | ||
return {self.cursor_field: self._cursor_value} | ||
|
||
@state.setter | ||
def state(self, value: Mapping[str, Any]): | ||
self._cursor_value = value[self.cursor_field] | ||
|
||
def path(self, **kwargs) -> str: | ||
return f"incremental/{self.response_list_name}" | ||
|
||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: | ||
if self._ignore_pagination: | ||
return None | ||
|
||
response_json = response.json() | ||
|
||
pagination_complete = response_json.get(END_OF_STREAM_KEY, False) | ||
if pagination_complete: | ||
return None | ||
|
||
next_start_time = response_json.get("end_time", None) | ||
if next_start_time: | ||
return {"start_time": next_start_time} | ||
|
||
def request_params( | ||
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, | ||
) -> MutableMapping[str, Any]: | ||
if next_page_token: | ||
return next_page_token | ||
|
||
start_time_state = self.state.get(self.cursor_field, None) | ||
if start_time_state: | ||
return {"start_time": _dt_str_to_s(start_time_state)} | ||
else: | ||
return {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} | ||
|
||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: | ||
response_json = response.json() | ||
|
||
records = response_json.get(self.response_list_name, []) | ||
for record in records: | ||
yield record | ||
|
||
cursor = response_json.get("end_time", None) | ||
if cursor and len(records) != 0: | ||
self.state = {self.cursor_field: _s_to_dt_str(cursor)} | ||
|
||
|
||
class SourceZendeskSupportIncrementalCursorExportStream(SourceZendeskIncrementalExportStream, IncrementalMixin): | ||
""" | ||
Incremental cursor export for Users and Tickets streams | ||
|
@@ -654,6 +734,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp | |
yield event | ||
|
||
|
||
class Organizations(SourceZendeskSupportIncrementalTimeExportStream): | ||
""" | ||
API docs: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-organization-export | ||
|
||
Note: This stream theoretically can get stuck in a loop if 1000+ organizations are | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a blocker, but per the API docs it kind of implies that this won't actually be possible, since it sounds like it'll keep returning results until one of them has a different timestamp?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, the API docs do make it seem like it wouldn't be an issue. But we have seen other streams that used this time-based incremental export method get stuck in a loop if 1000+ resources are updated at the same time (ex: we had to move the I don't remember the exact number of tickets that were updated at the same time in that instance, but I think it was multiple thousands. I suspect there's a limit to how much higher that 1000-item limit can go. So if there are more resources updated in the same second than 1000 + whatever the flex amount is, the stream will get stuck. |
||
updated at the exact same time. I don't anticipate this will be an issue, but we | ||
should keep this in mind if we notice an Organizations stream is stuck. | ||
""" | ||
response_list_name = "organizations" | ||
|
||
|
||
class OrganizationMemberships(SourceZendeskSupportCursorPaginationStream): | ||
"""OrganizationMemberships stream: https://developer.zendesk.com/api-reference/ticketing/organizations/organization_memberships/""" | ||
|
||
|
@@ -718,10 +809,6 @@ class Users(SourceZendeskSupportIncrementalCursorExportStream): | |
response_list_name: str = "users" | ||
|
||
|
||
class Organizations(SourceZendeskSupportStream): | ||
"""Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" | ||
|
||
|
||
class Tickets(SourceZendeskSupportIncrementalCursorExportStream): | ||
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-cursor-based""" | ||
|
||
|
@@ -756,6 +843,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp | |
|
||
class Groups(SourceZendeskSupportStream): | ||
"""Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/""" | ||
def request_params( | ||
self, | ||
stream_state: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> MutableMapping[str, Any]: | ||
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs) | ||
# Zendesk by default excludes deleted groups. To include deleted groups in the API response, we have to | ||
# use the exclude_deleted query param. | ||
params.update({"exclude_deleted": False}) | ||
return params | ||
|
||
|
||
class GroupMemberships(SourceZendeskSupportCursorPaginationStream): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually going to return seconds - the float result has the integer part as seconds, and the fractional part as microseconds. We have similar helpers elsewhere that account for this to get millisecond epoch times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, thanks for catching that. I was wrong in that function description. Zendesk's timestamps are in seconds, not milliseconds. I'll update the function description to say "...to a UNIX timestamp in seconds".
Also, the reason I stored the state as a date-time formatted string instead of an actual
datetime
objects was because when I retrieved the state after restarting the task, the retrieved state wasn't adatetime
object but was the ISO formatted string. I'll update the helper function names to be clearer that they're accepting/returning a string and not adatetime
object.