diff --git a/source-asana/source_asana/streams.py b/source-asana/source_asana/streams.py index 625c432324..104bca6ce6 100644 --- a/source-asana/source_asana/streams.py +++ b/source-asana/source_asana/streams.py @@ -235,62 +235,59 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Events(AsanaStream): primary_key = "created_at" - sync_token = None + sync_token: Optional[str] = None + has_more: bool = False + raise_on_http_errors = False def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return "events" - def read_records(self, *args, **kwargs): - # Check if sync token is available - if self.sync_token is not None: - # Pass the sync token as a request parameter - kwargs["next_page_token"] = {"sync": self.sync_token} - - yield from super().read_records(*args, **kwargs) - - # After reading records, update the sync token - self.sync_token = self.get_latest_sync_token() - - def get_latest_sync_token(self) -> str: - latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token + def parse_response( + self, response: requests.Response, **kwargs + ) -> Iterable[Mapping]: + payload: dict = response.json() + data = payload.get("data", []) - if latest_sync_token is None: - return None + if ( # Check if response is a 412 error + response.status_code == HTTPStatus.PRECONDITION_FAILED + and not self.sync_token + ): + self.logger.warning( + "Sync token expired. Fetch the full dataset for this query now." + ) - return latest_sync_token["sync"] # Extract the sync token value + self.sync_token = payload.get("sync") - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - if response.status_code == 412: # Check if response is a 412 error - response_json = response.json() - if "sync" in response_json: # Check if new sync token is available - self.sync_token = response_json["sync"] - else: - self.sync_token = None - self.logger.warning("Sync token expired. Fetch the full dataset for this query now.") - else: - response_json = response.json() + return data - # Check if response has new sync token - if "sync" in response_json: - self.sync_token = response_json["sync"] + def next_page_token( + self, response: requests.Response + ) -> Optional[Mapping[str, Any]]: + payload = response.json() - yield from response_json.get("data", []) + has_more = bool(payload.get("has_more")) + # self.sync_token = payload.get("sync") - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - decoded_response = response.json() - last_sync = decoded_response.get("sync") - if last_sync: - return {"sync": last_sync} + if not has_more: + self.logger.info("Nothing to read.") + return None + return {"sync": self.sync_token} - def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, stream_slice: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) params["resource"] = stream_slice["resource_gid"] + params["sync"] = self.sync_token return params def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - yield from self.read_slices_from_records(stream_class=Projects, slice_field="resource_gid") - yield from self.read_slices_from_records(stream_class=Tasks, slice_field="resource_gid") - + yield from self.read_slices_from_records( + stream_class=Projects, slice_field="resource_gid" + ) + yield from self.read_slices_from_records( + stream_class=Tasks, slice_field="resource_gid" + ) class OrganizationExports(AsanaStream):