Skip to content

Commit

Permalink
source-genesys: only use the jobs endpoint for retrieving conversations
Browse files Browse the repository at this point in the history
The code for retrieving conversations in real-time from the query
endpoint is left in the previous commit in case we want to add this
functionality later when the CDK has the capability to have a delayed
stream / a second cursor.
  • Loading branch information
Alex-Bair committed Nov 12, 2024
1 parent 20f5a23 commit 0e1b88a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 189 deletions.
177 changes: 16 additions & 161 deletions source-genesys/source_genesys/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,18 @@
User,
UserResponse,
Conversation,
ConversationQueryResponse,
CreateJobResponse,
CheckJobStatusResponse,
JobResultsResponse,
)

COMMON_API = "https://api"
DATETIME_STRING_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# Some of the /v2/analytics/*/query endpoints only allow queries covering intervals of 7 days or fewer.
MAX_QUERY_DATE_WINDOW = 7
MAX_QUERY_PAGE_SIZE = 100


def _dt_to_str(dt: datetime) -> str:
return dt.strftime(DATETIME_STRING_FORMAT)

def _str_to_dt(date: str) -> datetime:
return datetime.fromisoformat(date)


async def snapshot_users(
http: HTTPSession,
Expand Down Expand Up @@ -68,113 +61,25 @@ async def fetch_conversations(
log_cursor: LogCursor,
) -> AsyncGenerator[Conversation | LogCursor, None]:
"""
There is not a single endpoint that can be used to retrieve updated conversations in real-time,
so `fetch_conversations` relies on two separate endpoints:
/api/v2/analytics/conversations/details/query - provides created conversations in real-time
/api/v2/analytics/conversations/details/jobs - provides updated converversations with a delay
Submits an asynchronous job for all conversations
that have ended between start_date and the present.
"""
assert isinstance(log_cursor, datetime)

most_recent_created_cursor = log_cursor
updated_cursor = log_cursor

async for record in fetch_created_conversations(http, domain, log, log_cursor):
record_dt = record.conversationStart
if record_dt > log_cursor:
most_recent_created_cursor = record.conversationStart
async for record in _perform_conversation_job(
http, domain, log, log_cursor
):
# Async analytics jobs return any conversations that started or ended within the requested date window.
# The stream's cursor should be updated to the record's most recent datetime.
most_recent_conversation_dt = record.conversationEnd or record.conversationStart
if most_recent_conversation_dt > log_cursor:
updated_cursor = max(updated_cursor, most_recent_conversation_dt)
yield record

# If any real-time documents were yielded, check the past two days for conversations that have ended.
if most_recent_created_cursor != log_cursor:
async for record in fetch_updated_conversations(http, domain, log, log_cursor - timedelta(days=1)):
yield record

yield most_recent_created_cursor


async def fetch_created_conversations(
http: HTTPSession,
domain: str,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Conversation, None]:
"""
Retrieves created conversations from the log_cursor to the present.
API docs - https://developer.genesys.cloud/routing/conversations/conversations-apis#post-api-v2-analytics-conversations-details-query
"""
assert isinstance(log_cursor, datetime)
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/query"

lower_bound = log_cursor
now = datetime.now(tz=UTC)

# Iterate from our last seen cursor value to the present in 7 day windows.
while lower_bound < now:
upper_bound = min(lower_bound + timedelta(days=MAX_QUERY_DATE_WINDOW), now)
pageNumber = 1

# Paginate through this date window's results.
while True:
# This endpoint returns results in ascending order of the conversationStart field by default,
# so we don't need to specify any ordering options.
body = {
"interval": f"{_dt_to_str(lower_bound)}/{_dt_to_str(upper_bound)}",
"paging": {
"pageSize": MAX_QUERY_PAGE_SIZE,
"pageNumber": pageNumber,
}
}

response = ConversationQueryResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
)

conversations = response.conversations

# If there are no more results to page through, move on to the next date window.
if conversations is None:
break

for conversation in conversations:
yield conversation

pageNumber += 1

lower_bound = upper_bound


def _build_conversation_job_body(
start_date: datetime,
end_date: datetime,
exclude_ongoing_conversations: bool = True,
) -> dict[str, Any]:
body = {
"interval": f"{_dt_to_str(start_date)}/{_dt_to_str(end_date)}",
}

if exclude_ongoing_conversations:
filter_condition = {
# Order results in ascending order of when they completed.
"orderBy": "conversationEnd",
# Only include conversations that have ended.
"conversationFilters": [
{
"predicates": [
{
"type": "dimension",
"dimension": "conversationEnd",
"operator": "exists"
}
],
"type": "and"
}
]
}

body.update(filter_condition)

return body
if updated_cursor > log_cursor:
yield updated_cursor


async def _perform_conversation_job(
Expand All @@ -183,7 +88,6 @@ async def _perform_conversation_job(
log: Logger,
start_date: datetime,
end_date: datetime = datetime.now(tz=UTC),
exclude_ongoing_conversations: bool = True,
) -> AsyncGenerator[Conversation, None]:
"""
Requests the Genesys API to perform an async job & paginates through the results.
Expand All @@ -192,7 +96,9 @@ async def _perform_conversation_job(
"""
# Submit job.
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/jobs"
body = _build_conversation_job_body(start_date, end_date, exclude_ongoing_conversations)
body = {
"interval": f"{_dt_to_str(start_date)}/{_dt_to_str(end_date)}",
}

response = CreateJobResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
Expand Down Expand Up @@ -247,54 +153,3 @@ async def _perform_conversation_job(
params.update({
"cursor": cursor,
})


async def fetch_updated_conversations(
http: HTTPSession,
domain: str,
log: Logger,
start_date: datetime,
) -> AsyncGenerator[Conversation, None]:
"""
Submits an asynchronous job for all conversations
that have ended between start_date and the present.
"""
async for result in _perform_conversation_job(
http,
domain,
log,
start_date,
):
yield result


async def backfill_conversations(
http: HTTPSession,
domain: str,
log: Logger,
page_cursor: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[Conversation | PageCursor, None]:
"""
Submits an asynchronous job for all conversations from the page_cursor (config start date) and the cutoff.
"""
assert isinstance(page_cursor, str)
assert isinstance(cutoff, datetime)

start_date = _str_to_dt(page_cursor)

# Since cutoff is shifted backwards 2 days, it's possible for the cutoff to be before the start date.
# In this situation, incremental replication will cover everything that should be backfilled, and the
# backfill can safely return.
if start_date > cutoff:
return

async for result in _perform_conversation_job(
http,
domain,
log,
start_date,
end_date=cutoff,
exclude_ongoing_conversations=False,
):
yield result
15 changes: 6 additions & 9 deletions source-genesys/source_genesys/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,12 @@ class UserResponse(BaseModel, extra="forbid"):
class Conversation(BaseDocument, extra="allow"):
conversationId: str
conversationStart: AwareDatetime


class EndedConversation(Conversation):
conversationEnd: AwareDatetime


class ConversationQueryResponse(BaseModel, extra="forbid"):
conversations: list[Conversation] | None = None # If there are no conversations in the requested range, the conversations field is omitted from the response.
totalHits: int
# Conversations that have not ended do not have a conversationEnd field.
conversationEnd: AwareDatetime = Field(
default=None,
# Don't schematize the default value.
json_schema_extra=lambda x: x.pop('default') # type: ignore
)


class CreateJobResponse(BaseModel, extra="forbid"):
Expand Down
21 changes: 2 additions & 19 deletions source-genesys/source_genesys/resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta, datetime, UTC
from datetime import timedelta
import functools
from logging import Logger

Expand All @@ -18,17 +18,9 @@
from .api import (
snapshot_users,
fetch_conversations,
backfill_conversations,
_dt_to_str,
COMMON_API,
)

# The backing data set for Genesys' asynchronous analytics jobs is not updated in real-time.
# It can take hours to a day for the data to be available. We shift the cutoff between backfills
# & incremental replication two days to ensure we're only backfilling over a data range where
# data is available.
ASYNC_JOB_DATA_AVAILABILITY_DELAY = 2


def update_oauth2spec_with_domain(config: EndpointConfig):
"""
Expand Down Expand Up @@ -120,24 +112,15 @@ def open(
http,
config.genesys_cloud_domain,
),
fetch_page=functools.partial(
backfill_conversations,
http,
config.genesys_cloud_domain,
)
)

backfill_start = _dt_to_str(config.start_date)
cutoff = datetime.now(tz=UTC) - timedelta(days=ASYNC_JOB_DATA_AVAILABILITY_DELAY)

return common.Resource(
name='conversations',
key=["/conversationId"],
model=Conversation,
open=open,
initial_state=ResourceState(
inc=ResourceState.Incremental(cursor=cutoff),
backfill=ResourceState.Backfill(next_page=backfill_start, cutoff=cutoff)
inc=ResourceState.Incremental(cursor=config.start_date),
),
initial_config=ResourceConfig(
name='conversations', interval=timedelta(minutes=5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
"format": "date-time",
"title": "Conversationstart",
"type": "string"
},
"conversationEnd": {
"format": "date-time",
"title": "Conversationend",
"type": "string"
}
},
"required": [
Expand Down

0 comments on commit 0e1b88a

Please sign in to comment.