Skip to content

Commit

Permalink
estuary-cdk: FetchPageFn is also an AsyncGenerator
Browse files Browse the repository at this point in the history
Much like FetchChangesFn, FetchPageFn is now an AsyncGenerator which
yields documents, checkpoint-able PageCursors, or completes an
iteration of a resource.

This refactoring is consistent with recent updates to FetchChangesFn,
and is motivated by use cases where the fetched "page" could be quite
large and itself composed of multiple concurrent data fetches,
implemented as constituent AsyncGenerator instances.

By using an AsyncGenerator, such an implementation can immediately
yield from across those concurrent fetches as data arrives such that
data is spilled to disk and memory pressure kept low. Then, when the
scatter / gather data fetch completes, the entire "page" is
checkpointed.
  • Loading branch information
jgraettinger authored and jshearer committed Mar 8, 2024
1 parent e1be996 commit ee34ea7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
57 changes: 40 additions & 17 deletions estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,36 @@ class ConnectorState(GenericModel, Generic[_BaseResourceState], extra="forbid"):
FetchSnapshotFn = Callable[[Logger], AsyncGenerator[_BaseDocument, None]]
"""
FetchSnapshotFn is a function which fetches a complete snapshot of a resource.
Snapshot resources are typically "small" -- they fit easily on disk -- and are
gathered in a single shot. Its content is digested to determine if its
changed since the last snapshot. If it hasn't, the snapshot is discarded and
not emitted by the connector.
"""

FetchPageFn = Callable[
[Logger, PageCursor, LogCursor],
Awaitable[tuple[Iterable[_BaseDocument], PageCursor]],
AsyncGenerator[_BaseDocument | PageCursor, None],
]
"""
FetchPageFn is a function which fetches a page of documents.
It takes a PageCursor for the next page and an upper-bound "cutoff" LogCursor.
It returns documents and an updated PageCursor, where None indicates
no further pages remain.
The "cutoff" LogCursor represents the log position at which incremental
replication started, and should be used to filter returned documents
which were modified at-or-after the cutoff, as such documents are
FetchPageFn fetches available checkpoints since the provided last PageCursor.
It will typically fetch just one page, though it may fetch multiple pages.
The argument PageCursor is None if a new iteration is being started.
Otherwise it is the last PageCursor yielded by FetchPageFn.
The argument LogCursor is the "cutoff" log position at which incremental
replication started, and should be used to suppress documents which were
modified at-or-after the cutoff, as such documents are
already observed through incremental replication.
Checkpoints consist of a yielded sequence of documents followed by a
non-None PageCursor, which checkpoints those preceding documents,
or by simply returning if the iteration is complete.
It's an error if FetchPageFn yields a PageCursor of None.
Instead, mark the end of the sequence by yielding documents and then
returning without yielding a final PageCursor.
"""

FetchChangesFn = Callable[
Expand Down Expand Up @@ -521,7 +535,7 @@ async def _binding_backfill_task(
bindingStateV1={binding.stateKey: ResourceState(backfill=state)}
)

if state.next_page:
if state.next_page is not None:
task.log.info(f"resuming backfill", state)
else:
task.log.info(f"beginning backfill", state)
Expand All @@ -530,14 +544,23 @@ async def _binding_backfill_task(
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)

page, next_page = await fetch_page(task.log, state.next_page, state.cutoff)
for doc in page:
task.captured(binding_index, doc)
# Track if fetch_page returns without having yielded a PageCursor.
done = True

if next_page is not None:
state.next_page = next_page
task.checkpoint(connector_state)
else:
async for item in fetch_page(task.log, state.next_page, state.cutoff):
if isinstance(item, BaseDocument):
task.captured(binding_index, item)
done = True
elif item is None:
raise RuntimeError(
f"Implementation error: FetchPageFn yielded PageCursor None. To represent end-of-sequence, yield documents and return without a final PageCursor."
)
else:
state.next_page = item
task.checkpoint(connector_state)
done = False

if done:
break

connector_state = ConnectorState(
Expand Down
12 changes: 7 additions & 5 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def fetch_page(
log: Logger,
page: str | None,
cutoff: datetime,
) -> tuple[Iterable[CRMObject], str | None]:
) -> AsyncGenerator[CRMObject | str, None]:

url = f"{HUB}/crm/v3/objects/{cls.NAME}"
properties = await fetch_properties(log, cls, http)
Expand All @@ -69,10 +69,12 @@ async def fetch_page(
await http.request(log, url, method="GET", params=input)
)

return (
(doc for doc in result.results if doc.updatedAt < cutoff),
result.paging.next.after if result.paging else None,
)
for doc in result.results:
if doc.updatedAt < cutoff:
yield doc

if result.paging:
yield result.paging.next.after


async def fetch_batch(
Expand Down

0 comments on commit ee34ea7

Please sign in to comment.