Skip to content

Commit

Permalink
source-genesys: new connector
Browse files Browse the repository at this point in the history
This is a minimal connector for Gensys, containing just the `users`
and `conversations` streams.

`users` is a simple snapshot since there is no way to request updated
users.

`conversations` is incremental and leverage two different endpoints. A
real-time endpoint is used to get created conversations incrementally.
This real-time endpoint is not appropriate to get updates
(we can only specify time ranges for created conversations), so we also
attempt to use the delayed, asynchronous analytics job endpoint to get
conversations that have ended in the last day. This is not meant to
capture all ended conversations as there are scenarios where we would
miss capturing these updates (ex: no conversations are created for a
while but conversations are ended), and a backfill will be required to
full capture all ended conversations.
  • Loading branch information
Alex-Bair committed Nov 11, 2024
1 parent 38dff0c commit 542a715
Show file tree
Hide file tree
Showing 13 changed files with 2,323 additions and 0 deletions.
1 change: 1 addition & 0 deletions source-genesys/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
1,375 changes: 1,375 additions & 0 deletions source-genesys/poetry.lock

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions source-genesys/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[tool.poetry]
name = "source_genesys"
version = "0.1.0"
description = ""
authors = ["Alex Bair <[email protected]>"]

[tool.poetry.dependencies]
estuary-cdk = {path="../estuary-cdk", develop = true}
pydantic = "^2"
python = "^3.12"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.3.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
63 changes: 63 additions & 0 deletions source-genesys/source_genesys/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from logging import Logger
from typing import Callable, Awaitable

from estuary_cdk.flow import (
ConnectorSpec,
)
from estuary_cdk.capture import (
BaseCaptureConnector,
Request,
Task,
common,
request,
response,
)
from estuary_cdk.http import HTTPMixin

from .resources import all_resources, validate_credentials
from .models import (
ConnectorState,
EndpointConfig,
ResourceConfig,
)


class Connector(
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
HTTPMixin,
):
def request_class(self):
return Request[EndpointConfig, ResourceConfig, ConnectorState]

async def spec(self, _: request.Spec, logger: Logger) -> ConnectorSpec:
return ConnectorSpec(
configSchema=EndpointConfig.model_json_schema(),
documentationUrl="https://go.estuary.dev/source-genesys",
resourceConfigSchema=ResourceConfig.model_json_schema(),
resourcePathPointers=ResourceConfig.PATH_POINTERS,
)

async def discover(
self, log: Logger, discover: request.Discover[EndpointConfig]
) -> response.Discovered[ResourceConfig]:
resources = await all_resources(log, self, discover.config)
return common.discovered(resources)

async def validate(
self,
log: Logger,
validate: request.Validate[EndpointConfig, ResourceConfig],
) -> response.Validated:
await validate_credentials(log, self, validate.config)
resources = await all_resources(log, self, validate.config)
resolved = common.resolve_bindings(validate.bindings, resources)
return common.validated(resolved)

async def open(
self,
log: Logger,
open: request.Open[EndpointConfig, ResourceConfig, ConnectorState],
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
resources = await all_resources(log, self, open.capture.config)
resolved = common.resolve_bindings(open.capture.bindings, resources)
return common.open(open, resolved)
4 changes: 4 additions & 0 deletions source-genesys/source_genesys/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
import source_genesys

asyncio.run(source_genesys.Connector().serve())
300 changes: 300 additions & 0 deletions source-genesys/source_genesys/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
import asyncio

from datetime import datetime, timedelta, UTC
from logging import Logger
from typing import AsyncGenerator, Any
from estuary_cdk.capture.common import LogCursor, PageCursor
from estuary_cdk.http import HTTPSession, RateLimiter

from .models import (
User,
UserResponse,
Conversation,
ConversationQueryResponse,
CreateJobResponse,
CheckJobStatusResponse,
JobResultsResponse,
)

COMMON_API = "https://api"
DATETIME_STRING_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# Some of the /v2/analytics/*/query endpoints only allow queries covering intervals of 7 days or fewer.
MAX_QUERY_DATE_WINDOW = 7
MAX_QUERY_PAGE_SIZE = 100


def _dt_to_str(dt: datetime) -> str:
return dt.strftime(DATETIME_STRING_FORMAT)

def _str_to_dt(date: str) -> datetime:
return datetime.fromisoformat(date)


async def snapshot_users(
http: HTTPSession,
domain: str,
log: Logger,
) -> AsyncGenerator[User, None]:
"""
API docs - https://developer.genesys.cloud/useragentman/users/#get-api-v2-users
"""
url = f"{COMMON_API}.{domain}/api/v2/users"

params = {
"pageSize": 500,
"pageNumber": 1,
"sortOrder": "ASC",
}

lastPageNumber = 1

while params["pageNumber"] <= lastPageNumber:
response = UserResponse.model_validate_json(
await http.request(log, url, params=params)
)

users = response.entities
for user in users:
yield user

params["pageNumber"] += 1
lastPageNumber = response.pageCount


async def fetch_conversations(
http: HTTPSession,
domain: str,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Conversation | LogCursor, None]:
"""
There is not a single endpoint that can be used to retrieve updated conversations in real-time,
so `fetch_conversations` relies on two separate endpoints:
/api/v2/analytics/conversations/details/query - provides created conversations in real-time
/api/v2/analytics/conversations/details/jobs - provides updated converversations with a delay
"""
assert isinstance(log_cursor, datetime)

most_recent_created_cursor = log_cursor

async for record in fetch_created_conversations(http, domain, log, log_cursor):
record_dt = record.conversationStart
if record_dt > log_cursor:
most_recent_created_cursor = record.conversationStart
yield record

# If any real-time documents were yielded, check the past two days for conversations that have ended.
if most_recent_created_cursor != log_cursor:
async for record in fetch_updated_conversations(http, domain, log, log_cursor - timedelta(days=1)):
yield record

yield most_recent_created_cursor


async def fetch_created_conversations(
http: HTTPSession,
domain: str,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[Conversation, None]:
"""
Retrieves created conversations from the log_cursor to the present.
API docs - https://developer.genesys.cloud/routing/conversations/conversations-apis#post-api-v2-analytics-conversations-details-query
"""
assert isinstance(log_cursor, datetime)
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/query"

lower_bound = log_cursor
now = datetime.now(tz=UTC)

# Iterate from our last seen cursor value to the present in 7 day windows.
while lower_bound < now:
upper_bound = min(lower_bound + timedelta(days=MAX_QUERY_DATE_WINDOW), now)
pageNumber = 1

# Paginate through this date window's results.
while True:
# This endpoint returns results in ascending order of the conversationStart field by default,
# so we don't need to specify any ordering options.
body = {
"interval": f"{_dt_to_str(lower_bound)}/{_dt_to_str(upper_bound)}",
"paging": {
"pageSize": MAX_QUERY_PAGE_SIZE,
"pageNumber": pageNumber,
}
}

response = ConversationQueryResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
)

conversations = response.conversations

# If there are no more results to page through, move on to the next date window.
if conversations is None:
break

for conversation in conversations:
yield conversation

pageNumber += 1

lower_bound = upper_bound


def _build_conversation_job_body(
start_date: datetime,
end_date: datetime,
exclude_ongoing_conversations: bool = True,
) -> dict[str, Any]:
body = {
"interval": f"{_dt_to_str(start_date)}/{_dt_to_str(end_date)}",
}

if exclude_ongoing_conversations:
filter_condition = {
# Order results in ascending order of when they completed.
"orderBy": "conversationEnd",
# Only include conversations that have ended.
"conversationFilters": [
{
"predicates": [
{
"type": "dimension",
"dimension": "conversationEnd",
"operator": "exists"
}
],
"type": "and"
}
]
}

body.update(filter_condition)

return body


async def _perform_conversation_job(
http: HTTPSession,
domain: str,
log: Logger,
start_date: datetime,
end_date: datetime = datetime.now(tz=UTC),
exclude_ongoing_conversations: bool = True,
) -> AsyncGenerator[Conversation, None]:
"""
Requests the Genesys API to perform an async job & paginates through the results.
API docs - https://developer.genesys.cloud/routing/conversations/conversations-apis#post-api-v2-analytics-conversations-details-jobs
"""
# Submit job.
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/jobs"
body = _build_conversation_job_body(start_date, end_date, exclude_ongoing_conversations)

response = CreateJobResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
)

job_id = response.jobId

# Query job status.
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/jobs/{job_id}"

state = "QUEUED"
rate_limiter = RateLimiter()
# 3 seconds anecdotally seems like the minimum time for a job to complete.
rate_limiter.delay = 3.0
while state != "FULFILLED":
delay = rate_limiter.delay
await asyncio.sleep(delay)

response = CheckJobStatusResponse.model_validate_json(
await http.request(log, url)
)

state = response.state

match state:
case "FAILED":
raise RuntimeError("Conversations job failed. Please inspect the error and update job details.", response.errorMessage)
case "CANCELLED" | "EXPIRED":
raise RuntimeError(f"Conversations job status is {state}. Job request body was: ", body)

rate_limiter.update(delay, True)

# Paginate through results.
url = f"{COMMON_API}.{domain}/api/v2/analytics/conversations/details/jobs/{job_id}/results"
params: dict[str, str | int] = {
"pageSize": 1000,
}

while True:
response = JobResultsResponse.model_validate_json(
await http.request(log, url, params=params)
)

conversations = response.conversations
for conversation in conversations:
yield conversation

cursor = response.cursor
if not cursor:
break

params.update({
"cursor": cursor,
})


async def fetch_updated_conversations(
http: HTTPSession,
domain: str,
log: Logger,
start_date: datetime,
) -> AsyncGenerator[Conversation, None]:
"""
Submits an asynchronous job for all conversations
that have ended between start_date and the present.
"""
async for result in _perform_conversation_job(
http,
domain,
log,
start_date,
):
yield result


async def backfill_conversations(
http: HTTPSession,
domain: str,
log: Logger,
page_cursor: PageCursor | None,
cutoff: LogCursor,
) -> AsyncGenerator[Conversation | PageCursor, None]:
"""
Submits an asynchronous job for all conversations from the page_cursor (config start date) and the cutoff.
"""
assert isinstance(page_cursor, str)
assert isinstance(cutoff, datetime)

start_date = _str_to_dt(page_cursor)

# Since cutoff is shifted backwards 2 days, it's possible for the cutoff to be before the start date.
# In this situation, incremental replication will cover everything that should be backfilled, and the
# backfill can safely return.
if start_date > cutoff:
return

async for result in _perform_conversation_job(
http,
domain,
log,
start_date,
end_date=cutoff,
exclude_ongoing_conversations=False,
):
yield result
Loading

0 comments on commit 542a715

Please sign in to comment.