Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-twilio: capture messages updates based on lookback_window #2225

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions source-twilio/source_twilio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,6 @@ def state(self) -> Mapping[str, Any]:

@state.setter
def state(self, value: MutableMapping[str, Any]):
if self._lookback_window and value.get(self.cursor_field):
new_start_date = (
pendulum.parse(value[self.cursor_field]) - pendulum.duration(minutes=self._lookback_window)
).to_iso8601_string()
if new_start_date > self._start_date:
value[self.cursor_field] = new_start_date
self._cursor_value = value.get(self.cursor_field)

def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]:
Expand Down Expand Up @@ -252,6 +246,34 @@ def read_records(
raise err


class IncrementalTwilioStreamWithLookbackWindow(IncrementalTwilioStream):
# This overwrites IncrementalTwilioStream's generate_date_ranges method. The only difference is that
# this method pushes back the start datetime based on the configured lookback window.
def generate_date_ranges(self) -> Iterable[Optional[MutableMapping[str, Any]]]:
def align_to_dt_format(dt: DateTime) -> DateTime:
return pendulum.parse(dt.format(self.time_filter_template))

end_datetime = pendulum.now("utc")
pushed_back_start_datetime = pendulum.parse(self.state.get(self.cursor_field, self._start_date)) - pendulum.duration(minutes=self._lookback_window)
start_datetime = min(end_datetime, pushed_back_start_datetime)
current_start = start_datetime
current_end = start_datetime
# Aligning to a datetime format is done to avoid the following scenario:
# start_dt = 2021-11-14T00:00:00, end_dt (now) = 2022-11-14T12:03:01, time_filter_template = "YYYY-MM-DD"
# First slice: (2021-11-14, 2022-11-14)
# (!) Second slice: (2022-11-15, 2022-11-14) - because 2022-11-14T00:00:00 (prev end) < 2022-11-14T12:03:01,
# so we have to compare dates, not date-times to avoid yielding that last slice
while align_to_dt_format(current_end) < align_to_dt_format(end_datetime):
current_end = min(end_datetime, current_start + self.slice_step)
slice_ = {
self.lower_boundary_filter_field: current_start.format(self.time_filter_template),
self.upper_boundary_filter_field: current_end.format(self.time_filter_template),
}
yield slice_
current_start = current_end + self.slice_granularity



class TwilioNestedStream(TwilioStream):
"""
Basic class for the streams that are dependant on the results of another stream output (parent-child relations).
Expand Down Expand Up @@ -544,7 +566,7 @@ class Queues(TwilioNestedStream):
parent_stream = Accounts


class Messages(IncrementalTwilioStream, TwilioNestedStream):
class Messages(IncrementalTwilioStreamWithLookbackWindow, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources"""

parent_stream = Accounts
Expand All @@ -553,6 +575,32 @@ class Messages(IncrementalTwilioStream, TwilioNestedStream):
upper_boundary_filter_field = "DateSent<"
cursor_field = "date_sent"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
unsorted_records = []
initial_cursor = self.state.get(self.cursor_field, self._start_date)

# Skip the IncrementalTwilioStream's read_records method since it filters only based on date_sent instead of
# both date_sent and date_updated.
for record in super(IncrementalTwilioStream, self).read_records(sync_mode, cursor_field, stream_slice, stream_state):
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string()
record["date_updated"] = pendulum.parse(record["date_updated"], strict=False).to_iso8601_string()
unsorted_records.append(record)
sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field])
for record in sorted_records:
# If this is a new record, yield it and update the cursor.
if record[self.cursor_field] >= initial_cursor:
self._cursor_value = record[self.cursor_field]
yield record
# Otherwise if it's an update to a record we've seen before, yield it.
elif record["date_updated"] >= initial_cursor:
yield record


class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources"""
Expand All @@ -573,7 +621,7 @@ class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
@cached_property
def parent_stream_instance(self):
most_recent_cursor = self.state.get(self.cursor_field, self._start_date)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor, lookback_window=self._lookback_window)
return self.parent_stream(authenticator=self.authenticator, start_date=most_recent_cursor)


class UsageNestedStream(TwilioNestedStream):
Expand Down
Loading