Skip to content

Commit

Permalink
estuary-cdk: incremental tasks are idle if their cursor is very recent
Browse files Browse the repository at this point in the history
Some resources always have _some_ level of activity every time they're
checked, which would ordinarily prevent the resource from ever being
considered "caught up", preventing the connector from exiting.

If the LogCursor is a date-time, then additionally consider the task to
be idle if the LogCursor is less-than `interval` old (where interval is
configured on the ResourceConfig), and sleep until the cursor is
`interval` old before issuing a successive fetch.
  • Loading branch information
jgraettinger committed Mar 6, 2024
1 parent 6d32139 commit ae3ece9
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,15 +598,32 @@ async def _binding_incremental_task(
"Implementation error: FetchChangesFn yielded a documents without a final LogCursor",
)

if checkpoints:
continue # Immediately fetch subsequent changes.
sleep_for : timedelta = binding.resourceConfig.interval

if not checkpoints:
# We're idle. Sleep for the full back-off interval.
sleep_for = binding.resourceConfig.interval

elif isinstance(state.cursor, datetime):
lag = (datetime.now(tz=UTC) - state.cursor)

if lag > binding.resourceConfig.interval:
# We're not idle. Attempt to fetch the next changes.
continue
else:
# We're idle. Sleep until the cursor is `interval` old.
sleep_for = binding.resourceConfig.interval - lag
else:
# We're not idle. Attempt to fetch the next changes.
continue

task.log.debug("incremental task is idle", {"sleep_for": sleep_for, "cursor": state.cursor})

# At this point we've fully caught up with the log and are idle.
try:
if not task.stopping.event.is_set():
await asyncio.wait_for(
task.stopping.event.wait(),
timeout=binding.resourceConfig.interval.total_seconds(),
task.stopping.event.wait(), timeout=sleep_for.total_seconds()
)

task.log.debug(f"incremental replication is idle and is yielding to stop")
Expand Down

0 comments on commit ae3ece9

Please sign in to comment.