Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-zendesk-support: fix Groups and Organizations streams #2007

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions source-zendesk-support/source_zendesk_support/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from abc import ABC
from collections import deque
from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from functools import partial
from math import ceil
from pickle import PickleError, dumps
Expand Down Expand Up @@ -74,6 +74,18 @@ def to_int(s):
return s


def _s_to_dt_str(s: int) -> str:
"""
Converts a UNIX timestamp in seconds to a date-time formatted string.
"""
return datetime.fromtimestamp(s, tz=UTC).isoformat(' ')

def _dt_str_to_s(dt_str: str) -> int:
"""
Converts a date-time formatted string to a UNIX timestamp in seconds.
"""
return int(datetime.fromisoformat(dt_str).timestamp())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually going to return seconds - the float result has the integer part as seconds, and the fractional part as microseconds. We have similar helpers elsewhere that account for this to get millisecond epoch times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, thanks for catching that. I was wrong in that function description. Zendesk's timestamps are in seconds, not milliseconds. I'll update the function description to say "...to a UNIX timestamp in seconds".

Also, the reason I stored the state as a date-time formatted string instead of an actual datetime objects was because when I retrieved the state after restarting the task, the retrieved state wasn't a datetime object but was the ISO formatted string. I'll update the helper function names to be clearer that they're accepting/returning a string and not a datetime object.


class SourceZendeskException(Exception):
"""default exception of custom SourceZendesk logic"""

Expand Down Expand Up @@ -507,6 +519,74 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield record


class SourceZendeskSupportIncrementalTimeExportStream(SourceZendeskSupportFullRefreshStream, IncrementalMixin):
"""
Incremental time-based export streams.
API docs: https://developer.zendesk.com/documentation/ticketing/managing-tickets/using-the-incremental-export-api/#time-based-incremental-exports
Airbyte Incremental Stream Docs: https://docs.airbyte.com/connector-development/cdk-python/incremental-stream for some background.

Uses IncrementalMixin's state setter & getter to persist cursors between requests.

Note: Incremental time-based export streams theoretically can get stuck in a loop if
1000+ resources are updated at the exact same time.
"""
state_checkpoint_interval = 1000
_cursor_value = ""

@property
def cursor_field(self) -> str:
"""Name of the field associated with the state"""
return "updated_at"

@property
def state(self) -> Mapping[str, Any]:
return {self.cursor_field: self._cursor_value}

@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = value[self.cursor_field]

def path(self, **kwargs) -> str:
return f"incremental/{self.response_list_name}"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None

response_json = response.json()

pagination_complete = response_json.get(END_OF_STREAM_KEY, False)
if pagination_complete:
return None

next_start_time = response_json.get("end_time", None)
if next_start_time:
return {"start_time": next_start_time}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
if next_page_token:
return next_page_token

start_time_state = self.state.get(self.cursor_field, None)
if start_time_state:
return {"start_time": _dt_str_to_s(start_time_state)}
else:
return {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()

records = response_json.get(self.response_list_name, [])
for record in records:
yield record

cursor = response_json.get("end_time", None)
if cursor and len(records) != 0:
self.state = {self.cursor_field: _s_to_dt_str(cursor)}


class SourceZendeskSupportIncrementalCursorExportStream(SourceZendeskIncrementalExportStream, IncrementalMixin):
"""
Incremental cursor export for Users and Tickets streams
Expand Down Expand Up @@ -654,6 +734,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield event


class Organizations(SourceZendeskSupportIncrementalTimeExportStream):
"""
API docs: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-organization-export

Note: This stream theoretically can get stuck in a loop if 1000+ organizations are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but per the API docs it kind of implies that this won't actually be possible, since it sounds like it'll keep returning results until one of them has a different timestamp?

... The 1000-item limit may be exceeded if items share the same timestamp ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the API docs do make it seem like it wouldn't be an issue. But we have seen other streams that used this time-based incremental export method get stuck in a loop if 1000+ resources are updated at the same time (ex: we had to move the Tickets stream to a different pagination method because it was stuck requesting the same timestamp - PR was #1789).

I don't remember the exact number of tickets that were updated at the same time in that instance, but I think it was multiple thousands. I suspect there's a limit to how much higher that 1000-item limit can go. So if there are more resources updated in the same second than 1000 + whatever the flex amount is, the stream will get stuck.

updated at the exact same time. I don't anticipate this will be an issue, but we
should keep this in mind if we notice an Organizations stream is stuck.
"""
response_list_name = "organizations"


class OrganizationMemberships(SourceZendeskSupportCursorPaginationStream):
"""OrganizationMemberships stream: https://developer.zendesk.com/api-reference/ticketing/organizations/organization_memberships/"""

Expand Down Expand Up @@ -718,10 +809,6 @@ class Users(SourceZendeskSupportIncrementalCursorExportStream):
response_list_name: str = "users"


class Organizations(SourceZendeskSupportStream):
"""Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""


class Tickets(SourceZendeskSupportIncrementalCursorExportStream):
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-cursor-based"""

Expand Down Expand Up @@ -756,6 +843,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp

class Groups(SourceZendeskSupportStream):
"""Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/"""
def request_params(
self,
stream_state: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
**kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
# Zendesk by default excludes deleted groups. To include deleted groups in the API response, we have to
# use the exclude_deleted query param.
params.update({"exclude_deleted": False})
return params


class GroupMemberships(SourceZendeskSupportCursorPaginationStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"created_at": "2024-07-26T15:41:00Z",
"deleted_at": null,
"details": "",
"domain_names": [],
"external_id": null,
Expand Down
6 changes: 3 additions & 3 deletions source-zendesk-support/tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from requests.exceptions import ConnectionError
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Macros, Organizations
from source_zendesk_support.streams import Macros, Organizations, Groups

STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_sleep_time():
pages = 4

start = datetime.datetime.now()
stream = Organizations(**STREAM_ARGS)
stream = Groups(**STREAM_ARGS)
stream.page_size = page_size

def record_gen(start=0, end=100):
Expand All @@ -164,7 +164,7 @@ def record_gen(start=0, end=100):
{
"status_code": 200,
"headers": {},
"text": json.dumps({"organizations": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
"text": json.dumps({"groups": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
}
for page in range(pages)
]
Expand Down
27 changes: 10 additions & 17 deletions source-zendesk-support/tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def test_streams(self, expected_stream_cls):
(GroupMemberships, "group_memberships"),
(Groups, "groups"),
(Macros, "macros"),
(Organizations, "organizations"),
(Organizations, "incremental/organizations"),
(OrganizationMemberships, "organization_memberships"),
(SatisfactionRatings, "satisfaction_ratings"),
(SlaPolicies, "slas/policies.json"),
Expand Down Expand Up @@ -340,14 +340,12 @@ class TestSourceZendeskSupportStream:
"stream_cls",
[
(Macros),
(Organizations),
(Groups),
(SatisfactionRatings),
(TicketFields),
],
ids=[
"Macros",
"Organizations",
"Groups",
"SatisfactionRatings",
"TicketFields",
Expand Down Expand Up @@ -390,19 +388,12 @@ def test_url_base(self, stream_cls):
"stream_cls, current_state, last_record, expected",
[
(Macros, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
(
Organizations,
{"updated_at": "2022-03-17T16:03:07Z"},
{"updated_at": "2023-03-17T16:03:07Z"},
{"updated_at": "2023-03-17T16:03:07Z"},
),
(Groups, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
(SatisfactionRatings, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
(TicketFields, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
],
ids=[
"Macros",
"Organizations",
"Groups",
"SatisfactionRatings",
"TicketFields",
Expand All @@ -417,13 +408,11 @@ def test_get_updated_state(self, stream_cls, current_state, last_record, expecte
"stream_cls, expected",
[
(Macros, None),
(Organizations, None),
(Groups, None),
(TicketFields, None),
],
ids=[
"Macros",
"Organizations",
"Groups",
"TicketFields",
],
Expand All @@ -437,13 +426,11 @@ def test_next_page_token(self, stream_cls, expected):
"stream_cls, expected",
[
(Macros, {"start_time": 1622505600}),
(Organizations, {"start_time": 1622505600}),
(Groups, {"start_time": 1622505600}),
(Groups, {"start_time": 1622505600, "exclude_deleted": False}),
(TicketFields, {"start_time": 1622505600}),
],
ids=[
"Macros",
"Organizations",
"Groups",
"TicketFields",
],
Expand Down Expand Up @@ -682,11 +669,13 @@ def test_path(self, stream_cls, expected):
(Users),
(Tickets),
(TicketMetrics),
(Organizations),
],
ids=[
"Users",
"Tickets",
"TicketMetrics"
"TicketMetrics",
"Organizations",
],
)
def test_next_page_token(self, requests_mock, stream_cls):
Expand All @@ -703,11 +692,13 @@ def test_next_page_token(self, requests_mock, stream_cls):
(Users, {"start_time": 1622505600}),
(Tickets, {"start_time": 1622505600}),
(TicketMetrics, {"start_time": 1622505600, "include": "metric_sets"}),
(Organizations, {"start_time": 1622505600}),
],
ids=[
"Users",
"Tickets",
"TicketMetrics"
"TicketMetrics",
"Organizations",
],
)
def test_request_params(self, stream_cls, expected):
Expand All @@ -720,10 +711,12 @@ def test_request_params(self, stream_cls, expected):
[
(Users),
(Tickets),
(Organizations),
],
ids=[
"Users",
"Tickets",
"Organizations",
],
)
def test_parse_response(self, requests_mock, stream_cls):
Expand Down
Loading