Skip to content

Commit

Permalink
source-twilio: check for updated messages based on the lookback window
Browse files Browse the repository at this point in the history
Twilio's API only allows us to query messages based on their creation
date, not their updated date. Previously, the connector was only getting
creates, with the option to use the lookback window to push back the
cursor value to re-emit all messages created in the past X minutes. The
`messages` stream now uses the lookback window to query Twilio for
messages created X minutes in the past, but it only emits messages that
have been created or updated after the current cursor value.
  • Loading branch information
Alex-Bair committed Dec 20, 2024
1 parent 2beedcf commit ba1925a
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion source-twilio/source_twilio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,56 @@ class Messages(IncrementalTwilioStream, TwilioNestedStream):
upper_boundary_filter_field = "DateSent<"
cursor_field = "date_sent"

# This overwrites IncrementalTwilioStream's method. The only difference is that
# this method pushes back the start datetime based on the configured lookback window.
def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]:
def align_to_dt_format(dt: DateTime) -> DateTime:
return pendulum.parse(dt.format(self.time_filter_template))

end_datetime = pendulum.now("utc")
pushed_back_start_datetime = pendulum.parse(self.state.get(self.cursor_field, self._start_date)) - pendulum.duration(minutes=self._lookback_window)
start_datetime = min(end_datetime, pushed_back_start_datetime)
current_start = start_datetime
current_end = start_datetime
# Aligning to a datetime format is done to avoid the following scenario:
# start_dt = 2021-11-14T00:00:00, end_dt (now) = 2022-11-14T12:03:01, time_filter_template = "YYYY-MM-DD"
# First slice: (2021-11-14, 2022-11-14)
# (!) Second slice: (2022-11-15, 2022-11-14) - because 2022-11-14T00:00:00 (prev end) < 2022-11-14T12:03:01,
# so we have to compare dates, not date-times to avoid yielding that last slice
while align_to_dt_format(current_end) < align_to_dt_format(end_datetime):
current_end = min(end_datetime, current_start + self.slice_step)
slice_ = {
self.lower_boundary_filter_field: current_start.format(self.time_filter_template),
self.upper_boundary_filter_field: current_end.format(self.time_filter_template),
}
yield slice_
current_start = current_end + self.slice_granularity

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
unsorted_records = []
initial_cursor = self.state.get(self.cursor_field, self._start_date)

# Skip the IncrementalTwilioStream's read_records method since it filters based on date_sent instead of date_updated.
for record in super(IncrementalTwilioStream, self).read_records(sync_mode, cursor_field, stream_slice, stream_state):
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string()
record["date_updated"] = pendulum.parse(record["date_updated"], strict=False).to_iso8601_string()
unsorted_records.append(record)
sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field])
for record in sorted_records:
# If this is a new record, yield it and update the cursor.
if record[self.cursor_field] >= initial_cursor:
self._cursor_value = record[self.cursor_field]
yield record
# Otherwise if it's an update to a record we've seen before, yield it.
elif record["date_updated"] >= initial_cursor:
yield record


class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources"""
Expand All @@ -567,7 +617,7 @@ class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
@cached_property
def parent_stream_instance(self):
most_recent_cursor = self.state.get(self.cursor_field, self._start_date)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor, lookback_window=self._lookback_window)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor)


class UsageNestedStream(TwilioNestedStream):
Expand Down

0 comments on commit ba1925a

Please sign in to comment.