From 0e1b88a37d756dec7f671cb6789f8a2f82ea525d Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Mon, 11 Nov 2024 16:39:12 -0500 Subject: [PATCH] source-genesys: only use the jobs endpoint for retrieving conversations The code for retrieving conversations in real-time from the query endpoint is left in the previous commit in case we want to add this functionality later when the CDK has the capability to have a delayed stream / a second cursor. --- source-genesys/source_genesys/api.py | 177 ++---------------- source-genesys/source_genesys/models.py | 15 +- source-genesys/source_genesys/resources.py | 21 +-- .../snapshots__discover__capture.stdout.json | 5 + 4 files changed, 29 insertions(+), 189 deletions(-) diff --git a/source-genesys/source_genesys/api.py b/source-genesys/source_genesys/api.py index a2058df8e..d2258d7dc 100644 --- a/source-genesys/source_genesys/api.py +++ b/source-genesys/source_genesys/api.py @@ -10,7 +10,6 @@ User, UserResponse, Conversation, - ConversationQueryResponse, CreateJobResponse, CheckJobStatusResponse, JobResultsResponse, @@ -18,17 +17,11 @@ COMMON_API = "https://api" DATETIME_STRING_FORMAT = "%Y-%m-%dT%H:%M:%SZ" -# Some of the /v2/analytics/*/query endpoints only allow queries covering intervals of 7 days or fewer. -MAX_QUERY_DATE_WINDOW = 7 -MAX_QUERY_PAGE_SIZE = 100 def _dt_to_str(dt: datetime) -> str: return dt.strftime(DATETIME_STRING_FORMAT) -def _str_to_dt(date: str) -> datetime: - return datetime.fromisoformat(date) - async def snapshot_users( http: HTTPSession, @@ -68,113 +61,25 @@ async def fetch_conversations( log_cursor: LogCursor, ) -> AsyncGenerator[Conversation | LogCursor, None]: """ - There is not a single endpoint that can be used to retrieve updated conversations in real-time, - so `fetch_conversations` relies on two separate endpoints: - - /api/v2/analytics/conversations/details/query - provides created conversations in real-time - /api/v2/analytics/conversations/details/jobs - provides updated converversations with a delay + Submits an asynchronous job for all conversations + that have ended between start_date and the present. """ assert isinstance(log_cursor, datetime) - most_recent_created_cursor = log_cursor + updated_cursor = log_cursor - async for record in fetch_created_conversations(http, domain, log, log_cursor): - record_dt = record.conversationStart - if record_dt > log_cursor: - most_recent_created_cursor = record.conversationStart + async for record in _perform_conversation_job( + http, domain, log, log_cursor + ): + # Async analytics jobs return any conversations that started or ended within the requested date window. + # The stream's cursor should be updated to the record's most recent datetime. + most_recent_conversation_dt = record.conversationEnd or record.conversationStart + if most_recent_conversation_dt > log_cursor: + updated_cursor = max(updated_cursor, most_recent_conversation_dt) yield record - # If any real-time documents were yielded, check the past two days for conversations that have ended. - if most_recent_created_cursor != log_cursor: - async for record in fetch_updated_conversations(http, domain, log, log_cursor - timedelta(days=1)): - yield record - - yield most_recent_created_cursor - - -async def fetch_created_conversations( - http: HTTPSession, - domain: str, - log: Logger, - log_cursor: LogCursor, -) -> AsyncGenerator[Conversation, None]: - """ - Retrieves created conversations from the log_cursor to the present. - - API docs - https://developer.genesys.cloud/routing/conversations/conversations-apis#post-api-v2-analytics-conversations-details-query - """ - assert isinstance(log_cursor, datetime) - url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/query" - - lower_bound = log_cursor - now = datetime.now(tz=UTC) - - # Iterate from our last seen cursor value to the present in 7 day windows. - while lower_bound < now: - upper_bound = min(lower_bound + timedelta(days=MAX_QUERY_DATE_WINDOW), now) - pageNumber = 1 - - # Paginate through this date window's results. - while True: - # This endpoint returns results in ascending order of the conversationStart field by default, - # so we don't need to specify any ordering options. - body = { - "interval": f"{_dt_to_str(lower_bound)}/{_dt_to_str(upper_bound)}", - "paging": { - "pageSize": MAX_QUERY_PAGE_SIZE, - "pageNumber": pageNumber, - } - } - - response = ConversationQueryResponse.model_validate_json( - await http.request(log, url, method="POST", json=body) - ) - - conversations = response.conversations - - # If there are no more results to page through, move on to the next date window. - if conversations is None: - break - - for conversation in conversations: - yield conversation - - pageNumber += 1 - - lower_bound = upper_bound - - -def _build_conversation_job_body( - start_date: datetime, - end_date: datetime, - exclude_ongoing_conversations: bool = True, -) -> dict[str, Any]: - body = { - "interval": f"{_dt_to_str(start_date)}/{_dt_to_str(end_date)}", - } - - if exclude_ongoing_conversations: - filter_condition = { - # Order results in ascending order of when they completed. - "orderBy": "conversationEnd", - # Only include conversations that have ended. - "conversationFilters": [ - { - "predicates": [ - { - "type": "dimension", - "dimension": "conversationEnd", - "operator": "exists" - } - ], - "type": "and" - } - ] - } - - body.update(filter_condition) - - return body + if updated_cursor > log_cursor: + yield updated_cursor async def _perform_conversation_job( @@ -183,7 +88,6 @@ async def _perform_conversation_job( log: Logger, start_date: datetime, end_date: datetime = datetime.now(tz=UTC), - exclude_ongoing_conversations: bool = True, ) -> AsyncGenerator[Conversation, None]: """ Requests the Genesys API to perform an async job & paginates through the results. @@ -192,7 +96,9 @@ async def _perform_conversation_job( """ # Submit job. url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/jobs" - body = _build_conversation_job_body(start_date, end_date, exclude_ongoing_conversations) + body = { + "interval": f"{_dt_to_str(start_date)}/{_dt_to_str(end_date)}", + } response = CreateJobResponse.model_validate_json( await http.request(log, url, method="POST", json=body) @@ -247,54 +153,3 @@ async def _perform_conversation_job( params.update({ "cursor": cursor, }) - - -async def fetch_updated_conversations( - http: HTTPSession, - domain: str, - log: Logger, - start_date: datetime, -) -> AsyncGenerator[Conversation, None]: - """ - Submits an asynchronous job for all conversations - that have ended between start_date and the present. - """ - async for result in _perform_conversation_job( - http, - domain, - log, - start_date, - ): - yield result - - -async def backfill_conversations( - http: HTTPSession, - domain: str, - log: Logger, - page_cursor: PageCursor | None, - cutoff: LogCursor, -) -> AsyncGenerator[Conversation | PageCursor, None]: - """ - Submits an asynchronous job for all conversations from the page_cursor (config start date) and the cutoff. - """ - assert isinstance(page_cursor, str) - assert isinstance(cutoff, datetime) - - start_date = _str_to_dt(page_cursor) - - # Since cutoff is shifted backwards 2 days, it's possible for the cutoff to be before the start date. - # In this situation, incremental replication will cover everything that should be backfilled, and the - # backfill can safely return. - if start_date > cutoff: - return - - async for result in _perform_conversation_job( - http, - domain, - log, - start_date, - end_date=cutoff, - exclude_ongoing_conversations=False, - ): - yield result diff --git a/source-genesys/source_genesys/models.py b/source-genesys/source_genesys/models.py index 76582809f..c9a6f996f 100644 --- a/source-genesys/source_genesys/models.py +++ b/source-genesys/source_genesys/models.py @@ -81,15 +81,12 @@ class UserResponse(BaseModel, extra="forbid"): class Conversation(BaseDocument, extra="allow"): conversationId: str conversationStart: AwareDatetime - - -class EndedConversation(Conversation): - conversationEnd: AwareDatetime - - -class ConversationQueryResponse(BaseModel, extra="forbid"): - conversations: list[Conversation] | None = None # If there are no conversations in the requested range, the conversations field is omitted from the response. - totalHits: int + # Conversations that have not ended do not have a conversationEnd field. + conversationEnd: AwareDatetime = Field( + default=None, + # Don't schematize the default value. + json_schema_extra=lambda x: x.pop('default') # type: ignore + ) class CreateJobResponse(BaseModel, extra="forbid"): diff --git a/source-genesys/source_genesys/resources.py b/source-genesys/source_genesys/resources.py index 8de48e938..8dac1178b 100644 --- a/source-genesys/source_genesys/resources.py +++ b/source-genesys/source_genesys/resources.py @@ -1,4 +1,4 @@ -from datetime import timedelta, datetime, UTC +from datetime import timedelta import functools from logging import Logger @@ -18,17 +18,9 @@ from .api import ( snapshot_users, fetch_conversations, - backfill_conversations, - _dt_to_str, COMMON_API, ) -# The backing data set for Genesys' asynchronous analytics jobs is not updated in real-time. -# It can take hours to a day for the data to be available. We shift the cutoff between backfills -# & incremental replication two days to ensure we're only backfilling over a data range where -# data is available. -ASYNC_JOB_DATA_AVAILABILITY_DELAY = 2 - def update_oauth2spec_with_domain(config: EndpointConfig): """ @@ -120,24 +112,15 @@ def open( http, config.genesys_cloud_domain, ), - fetch_page=functools.partial( - backfill_conversations, - http, - config.genesys_cloud_domain, - ) ) - backfill_start = _dt_to_str(config.start_date) - cutoff = datetime.now(tz=UTC) - timedelta(days=ASYNC_JOB_DATA_AVAILABILITY_DELAY) - return common.Resource( name='conversations', key=["/conversationId"], model=Conversation, open=open, initial_state=ResourceState( - inc=ResourceState.Incremental(cursor=cutoff), - backfill=ResourceState.Backfill(next_page=backfill_start, cutoff=cutoff) + inc=ResourceState.Incremental(cursor=config.start_date), ), initial_config=ResourceConfig( name='conversations', interval=timedelta(minutes=5) diff --git a/source-genesys/tests/snapshots/snapshots__discover__capture.stdout.json b/source-genesys/tests/snapshots/snapshots__discover__capture.stdout.json index 10b99865f..18b7d6486 100644 --- a/source-genesys/tests/snapshots/snapshots__discover__capture.stdout.json +++ b/source-genesys/tests/snapshots/snapshots__discover__capture.stdout.json @@ -49,6 +49,11 @@ "format": "date-time", "title": "Conversationstart", "type": "string" + }, + "conversationEnd": { + "format": "date-time", + "title": "Conversationend", + "type": "string" } }, "required": [