Skip to content

Commit

Permalink
source-zendesk-support: switch Groups stream to use cursor pagination
Browse files Browse the repository at this point in the history
Previously, the `Groups` stream was inheriting from the
`BaseSourceZendeskSupportStream` class, meaning it paginated by
retrieving the total number of results from a `/count` endpoint,
calculated how many pages there should be based on that count, then
concurrently sent requests for those pages of results. When I added the
`exclude_deleted` param earlier to all `Groups` requests, I didn't
ensure that the `/count` endpoint would include deleted groups. It does
not, and it doesn't accept the `exclude_deleted` query param. So when
the connector was calculating how many pages there should be, it wasn't
including deleted groups in the total count, and we were missing results.

Instead of keeping `Groups` as a `BaseSourceZendeskSupportStream` that
uses this older method of pagination, I switched `Groups` to use cursor
pagination. The stream's behaviour and document schema remained the
same, only how it queries the Zendesk API changed. We should move
`Macros` and `TicketFields` off of `BaseSourceZendeskSupportStream` too,
but that can be done at a later time.
  • Loading branch information
Alex-Bair committed Oct 2, 2024
1 parent 4e0f52f commit 9955e11
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
9 changes: 5 additions & 4 deletions source-zendesk-support/source_zendesk_support/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,15 +841,16 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield record


class Groups(SourceZendeskSupportStream):
class Groups(SourceZendeskSupportCursorPaginationStream):
"""Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/"""

def request_params(
self,
stream_state: Mapping[str, Any] = None,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
**kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
# Zendesk by default excludes deleted groups. To include deleted groups in the API response, we have to
# use the exclude_deleted query param.
params.update({"exclude_deleted": False})
Expand Down
4 changes: 2 additions & 2 deletions source-zendesk-support/tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def test_sleep_time():
pages = 4

start = datetime.datetime.now()
stream = Groups(**STREAM_ARGS)
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size

def record_gen(start=0, end=100):
Expand All @@ -164,7 +164,7 @@ def record_gen(start=0, end=100):
{
"status_code": 200,
"headers": {},
"text": json.dumps({"groups": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
"text": json.dumps({"macros": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
}
for page in range(pages)
]
Expand Down
16 changes: 8 additions & 8 deletions source-zendesk-support/tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,11 @@ class TestSourceZendeskSupportStream:
"stream_cls",
[
(Macros),
(Groups),
(SatisfactionRatings),
(TicketFields),
],
ids=[
"Macros",
"Groups",
"SatisfactionRatings",
"TicketFields",
],
Expand All @@ -365,15 +363,13 @@ def test_parse_response(self, requests_mock, stream_cls):
[
(Macros),
(Organizations),
(Groups),
(SatisfactionRatings),
(TicketFields),
(TicketMetrics),
],
ids=[
"Macros",
"Organizations",
"Groups",
"SatisfactionRatings",
"TicketFields",
"TicketMetrics",
Expand Down Expand Up @@ -408,12 +404,10 @@ def test_get_updated_state(self, stream_cls, current_state, last_record, expecte
"stream_cls, expected",
[
(Macros, None),
(Groups, None),
(TicketFields, None),
],
ids=[
"Macros",
"Groups",
"TicketFields",
],
)
Expand All @@ -426,12 +420,10 @@ def test_next_page_token(self, stream_cls, expected):
"stream_cls, expected",
[
(Macros, {"start_time": 1622505600}),
(Groups, {"start_time": 1622505600, "exclude_deleted": False}),
(TicketFields, {"start_time": 1622505600}),
],
ids=[
"Macros",
"Groups",
"TicketFields",
],
)
Expand Down Expand Up @@ -522,13 +514,15 @@ class TestSourceZendeskSupportCursorPaginationStream:
@pytest.mark.parametrize(
"stream_cls, current_state, last_record, expected",
[
(Groups, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
(GroupMemberships, {}, {"updated_at": "2022-03-17T16:03:07Z"}, {"updated_at": "2022-03-17T16:03:07Z"}),
(TicketForms, {}, {"updated_at": "2023-03-17T16:03:07Z"}, {"updated_at": "2023-03-17T16:03:07Z"}),
(TicketMetricEvents, {}, {"time": "2024-03-17T16:03:07Z"}, {"time": "2024-03-17T16:03:07Z"}),
(TicketAudits, {}, {"created_at": "2025-03-17T16:03:07Z"}, {"created_at": "2025-03-17T16:03:07Z"}),
(OrganizationMemberships, {}, {"updated_at": "2025-03-17T16:03:07Z"}, {"updated_at": "2025-03-17T16:03:07Z"}),
],
ids=[
"Groups",
"GroupMemberships",
"TicketForms",
"TicketMetricEvents",
Expand All @@ -544,6 +538,7 @@ def test_get_updated_state(self, stream_cls, current_state, last_record, expecte
@pytest.mark.parametrize(
"stream_cls, response, expected",
[
(Groups, {}, None),
(GroupMemberships, {}, None),
(TicketForms, {}, None),
(TicketMetricEvents, {}, None),
Expand All @@ -562,6 +557,7 @@ def test_get_updated_state(self, stream_cls, current_state, last_record, expecte
),
],
ids=[
"Groups",
"GroupMemberships",
"TicketForms",
"TicketMetricEvents",
Expand All @@ -581,13 +577,15 @@ def test_next_page_token(self, requests_mock, stream_cls, response, expected):
@pytest.mark.parametrize(
"stream_cls, expected",
[
(Groups, 1622505600),
(GroupMemberships, 1622505600),
(TicketForms, 1622505600),
(TicketMetricEvents, 1622505600),
(TicketAudits, 1622505600),
(OrganizationMemberships, 1622505600),
],
ids=[
"Groups",
"GroupMemberships",
"TicketForms",
"TicketMetricEvents",
Expand All @@ -603,6 +601,7 @@ def test_check_stream_state(self, stream_cls, expected):
@pytest.mark.parametrize(
"stream_cls, expected",
[
(Groups, {"page[size]": 100, "start_time": 1622505600, "exclude_deleted": False}),
(GroupMemberships, {"page[size]": 100, "start_time": 1622505600, "sort_by": "asc"}),
(TicketForms, {}),
(TicketMetricEvents, {"page[size]": 1000, "start_time": 1622505600}),
Expand All @@ -611,6 +610,7 @@ def test_check_stream_state(self, stream_cls, expected):
(OrganizationMemberships, {"page[size]": 100, "start_time": 1622505600})
],
ids=[
"Groups",
"GroupMemberships",
"TicketForms",
"TicketMetricEvents",
Expand Down

0 comments on commit 9955e11

Please sign in to comment.