Skip to content

Commit

Permalink
estuary-cdk: FetchChangesFn contract now allows for incremental strea…
Browse files Browse the repository at this point in the history
…ming

Rework the contract to allow implementations to yield checkpoints at
times of their choosing. This allows for more ergonomic handling of
long-lived push streams of documents.
  • Loading branch information
jgraettinger committed Mar 1, 2024
1 parent 9c94e07 commit 571ab9e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 26 deletions.
70 changes: 53 additions & 17 deletions estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,34 @@ class ConnectorState(BaseModel, Generic[_BaseResourceState], extra="forbid"):
AsyncGenerator[_BaseDocument | LogCursor, None],
]
"""
FetchChangesFn is a function which fetches available documents since the LogCursor.
It yields Documents until no changes remain, then yields one updated LogCursor
which reflects progress made against processing the log, and then returns.
FetchChangesFn fetches available checkpoints since the provided last LogCursor.
If no documents are available at all, then it yields no documents and its
argument LogCursor. It does NOT wait indefinitely for more documents.
Checkpoints consist of a yielded sequence of documents followed by a LogCursor,
where the LogCursor checkpoints those preceding documents.
Implementations may block for brief periods to await documents, such as while
awaiting a server response, but should not block forever as it prevents the
Yielded LogCursors MUST be strictly increasing relative to the argument
LogCursor and also to previously yielded LogCursors.
It's an error if FetchChangesFn yields documents, and then returns without
yielding a final LogCursor. NOTE(johnny): if needed, we could extend the
contract to allow an explicit "roll back" sentinel.
FetchChangesFn yields until no further checkpoints are readily available,
and then returns. If no checkpoints are available at all,
it returns yields nothing and returns.
Implementations may block for brief periods to await checkpoints, such as while
awaiting a server response, but MUST NOT block forever as it prevents the
connector from exiting.
Implementations should NOT sleep or implement their own coarse rate limit.
Instead, configure a resource `interval` to enable periodic polling.
Implementations MAY return early, such as if it's convenient to fetch only
a next page of recent changes. If an implementation yields any checkpoints,
then it is immediately re-invoked.
Otherwise if it returns without yielding a checkpoint, then
`ResourceConfig.interval` is respected between invocations.
Implementations SHOULD NOT sleep or implement their own coarse rate limit
(use `ResourceConfig.interval`).
"""


Expand Down Expand Up @@ -540,21 +555,42 @@ async def _binding_incremental_task(

while True:

next_cursor = state.cursor
count = 0
checkpoints = 0
pending = False

async for item in fetch_changes(state.cursor, task.logger):
if isinstance(item, BaseDocument):
task.captured(binding_index, item)
count += 1
pending = True
else:
next_cursor = item

if count != 0 or next_cursor != state.cursor:
state.cursor = next_cursor
task.checkpoint(connector_state)
# Ensure LogCursor types match and that they're strictly increasing.
is_larger = False
if isinstance(item, int) and isinstance(state.cursor, int):
is_larger = item > state.cursor
elif isinstance(item, datetime) and isinstance(state.cursor, datetime):
is_larger = item > state.cursor
else:
raise RuntimeError(
f"Implementation error: FetchChangesFn yielded LogCursor {item} of a different type than the last LogCursor {state.cursor}",
)

if not is_larger:
raise RuntimeError(
f"Implementation error: FetchChangesFn yielded LogCursor {item} which is not greater than the last LogCursor {state.cursor}",
)

state.cursor = item
task.checkpoint(connector_state)
checkpoints += 1
pending = False

if pending:
raise RuntimeError(
"Implementation error: FetchChangesFn yielded a documents without a final LogCursor",
)

if count != 0:
if checkpoints:
continue # Immediately fetch subsequent changes.

# At this point we've fully caught up with the log and are idle.
Expand Down
3 changes: 2 additions & 1 deletion source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ async def fetch_changes(
for doc in documents.results:
yield doc

yield recent[-1][0] if recent else log_cursor
if recent:
yield recent[-1][0]


async def fetch_recent_companies(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@
"properties": {
"address": "244 5th Avenue",
"address2": "1277",
"annualrevenue": "10000000",
"annualrevenue": "1000000",
"city": "New York",
"country": "United States",
"createdate": "2024-02-08T02:36:27.739Z",
Expand All @@ -1152,7 +1152,7 @@
"hs_analytics_source_data_2": "sample-contact",
"hs_created_by_user_id": "63843671",
"hs_date_entered_lead": "2024-02-08T02:36:27.739Z",
"hs_lastmodifieddate": "2024-02-28T22:41:14.697Z",
"hs_lastmodifieddate": "2024-02-29T17:12:01.335Z",
"hs_num_blockers": "0",
"hs_num_child_companies": "0",
"hs_num_contacts_with_buying_roles": "0",
Expand Down Expand Up @@ -1206,6 +1206,13 @@
}
],
"annualrevenue": [
{
"sourceId": "userId:63843671",
"sourceType": "CRM_UI",
"timestamp": "2024-02-29T17:12:01.335000Z",
"updatedByUserId": 63843671,
"value": "1000000"
},
{
"sourceType": "COMPANY_INSIGHTS",
"timestamp": "2024-02-19T21:39:53.367000Z",
Expand Down Expand Up @@ -1416,6 +1423,13 @@
}
],
"hs_lastmodifieddate": [
{
"sourceId": "userId:63843671",
"sourceType": "CRM_UI",
"timestamp": "2024-02-29T17:12:01.335000Z",
"updatedByUserId": 63843671,
"value": "2024-02-29T17:12:01.335Z"
},
{
"sourceId": "userId:63843671",
"sourceType": "CRM_UI",
Expand Down Expand Up @@ -1544,11 +1558,6 @@
"timestamp": "2024-02-16T17:37:34.730000Z",
"updatedByUserId": 63843671,
"value": "2024-02-16T17:37:34.730Z"
},
{
"sourceType": "API",
"timestamp": "2024-02-15T18:06:45.989000Z",
"value": "2024-02-15T18:06:45.989Z"
}
],
"hs_num_blockers": [
Expand Down Expand Up @@ -2104,7 +2113,7 @@
}
]
},
"updatedAt": "2024-02-28T22:41:14.697000Z"
"updatedAt": "2024-02-29T17:12:01.335000Z"
}
],
[
Expand Down

0 comments on commit 571ab9e

Please sign in to comment.