Skip to content

Commit

Permalink
source-twilio: check for updated messages based on the lookback window
Browse files Browse the repository at this point in the history
Twilio's API only allows us to query messages based on their creation
date, not their updated date. Previously, the connector was only getting
creates, with the option to use the lookback window to push back the
cursor value to re-emit all messages created in the past X minutes. The
`messages` stream now uses the lookback window to query Twilio for
messages created X minutes in the past, but it only emits messages that
have been created or updated after the current cursor value.
  • Loading branch information
Alex-Bair committed Dec 20, 2024
1 parent 2beedcf commit 91b89ab
Showing 1 changed file with 56 additions and 2 deletions.
58 changes: 56 additions & 2 deletions source-twilio/source_twilio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,34 @@ def read_records(
raise err


class IncrementalTwilioStreamWithLookbackWindow(IncrementalTwilioStream):
# This overwrites IncrementalTwilioStream's generate_date_ranges method. The only difference is that
# this method pushes back the start datetime based on the configured lookback window.
def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]:
def align_to_dt_format(dt: DateTime) -> DateTime:
return pendulum.parse(dt.format(self.time_filter_template))

end_datetime = pendulum.now("utc")
pushed_back_start_datetime = pendulum.parse(self.state.get(self.cursor_field, self._start_date)) - pendulum.duration(minutes=self._lookback_window)
start_datetime = min(end_datetime, pushed_back_start_datetime)
current_start = start_datetime
current_end = start_datetime
# Aligning to a datetime format is done to avoid the following scenario:
# start_dt = 2021-11-14T00:00:00, end_dt (now) = 2022-11-14T12:03:01, time_filter_template = "YYYY-MM-DD"
# First slice: (2021-11-14, 2022-11-14)
# (!) Second slice: (2022-11-15, 2022-11-14) - because 2022-11-14T00:00:00 (prev end) < 2022-11-14T12:03:01,
# so we have to compare dates, not date-times to avoid yielding that last slice
while align_to_dt_format(current_end) < align_to_dt_format(end_datetime):
current_end = min(end_datetime, current_start + self.slice_step)
slice_ = {
self.lower_boundary_filter_field: current_start.format(self.time_filter_template),
self.upper_boundary_filter_field: current_end.format(self.time_filter_template),
}
yield slice_
current_start = current_end + self.slice_granularity



class TwilioNestedStream(TwilioStream):
"""
Basic class for the streams that are dependant on the results of another stream output (parent-child relations).
Expand Down Expand Up @@ -538,7 +566,7 @@ class Queues(TwilioNestedStream):
parent_stream = Accounts


class Messages(IncrementalTwilioStream, TwilioNestedStream):
class Messages(IncrementalTwilioStreamWithLookbackWindow, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources"""

parent_stream = Accounts
Expand All @@ -547,6 +575,32 @@ class Messages(IncrementalTwilioStream, TwilioNestedStream):
upper_boundary_filter_field = "DateSent<"
cursor_field = "date_sent"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
unsorted_records = []
initial_cursor = self.state.get(self.cursor_field, self._start_date)

# Skip the IncrementalTwilioStream's read_records method since it filters only based on date_sent instead of
# both date_sent and date_updated.
for record in super(IncrementalTwilioStream, self).read_records(sync_mode, cursor_field, stream_slice, stream_state):
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string()
record["date_updated"] = pendulum.parse(record["date_updated"], strict=False).to_iso8601_string()
unsorted_records.append(record)
sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field])
for record in sorted_records:
# If this is a new record, yield it and update the cursor.
if record[self.cursor_field] >= initial_cursor:
self._cursor_value = record[self.cursor_field]
yield record
# Otherwise if it's an update to a record we've seen before, yield it.
elif record["date_updated"] >= initial_cursor:
yield record


class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources"""
Expand All @@ -567,7 +621,7 @@ class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
@cached_property
def parent_stream_instance(self):
most_recent_cursor = self.state.get(self.cursor_field, self._start_date)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor, lookback_window=self._lookback_window)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor)


class UsageNestedStream(TwilioNestedStream):
Expand Down

0 comments on commit 91b89ab

Please sign in to comment.