Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asana | Fix events stream #1304

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions source-asana/source_asana/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,62 +235,59 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:

class Events(AsanaStream):
primary_key = "created_at"
sync_token = None
sync_token: Optional[str] = None
has_more: bool = False
raise_on_http_errors = False

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "events"

def read_records(self, *args, **kwargs):
# Check if sync token is available
if self.sync_token is not None:
# Pass the sync token as a request parameter
kwargs["next_page_token"] = {"sync": self.sync_token}

yield from super().read_records(*args, **kwargs)

# After reading records, update the sync token
self.sync_token = self.get_latest_sync_token()

def get_latest_sync_token(self) -> str:
latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token
def parse_response(
self, response: requests.Response, **kwargs
) -> Iterable[Mapping]:
payload: dict = response.json()
data = payload.get("data", [])

if latest_sync_token is None:
return None
if ( # Check if response is a 412 error
response.status_code == HTTPStatus.PRECONDITION_FAILED
and not self.sync_token
):
self.logger.warning(
"Sync token expired. Fetch the full dataset for this query now."
)

return latest_sync_token["sync"] # Extract the sync token value
self.sync_token = payload.get("sync")

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 412: # Check if response is a 412 error
response_json = response.json()
if "sync" in response_json: # Check if new sync token is available
self.sync_token = response_json["sync"]
else:
self.sync_token = None
self.logger.warning("Sync token expired. Fetch the full dataset for this query now.")
else:
response_json = response.json()
return data

# Check if response has new sync token
if "sync" in response_json:
self.sync_token = response_json["sync"]
def next_page_token(
self, response: requests.Response
) -> Optional[Mapping[str, Any]]:
payload = response.json()

yield from response_json.get("data", [])
has_more = bool(payload.get("has_more"))
# self.sync_token = payload.get("sync")

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
last_sync = decoded_response.get("sync")
if last_sync:
return {"sync": last_sync}
if not has_more:
self.logger.info("Nothing to read.")
return None
return {"sync": self.sync_token}

def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
def request_params(
self, stream_slice: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
params["resource"] = stream_slice["resource_gid"]
params["sync"] = self.sync_token
return params

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Projects, slice_field="resource_gid")
yield from self.read_slices_from_records(stream_class=Tasks, slice_field="resource_gid")

yield from self.read_slices_from_records(
stream_class=Projects, slice_field="resource_gid"
)
yield from self.read_slices_from_records(
stream_class=Tasks, slice_field="resource_gid"
)


class OrganizationExports(AsanaStream):
Expand Down
Loading