From 618d151af4510ed3da36eab1eb0d3d920516b3cb Mon Sep 17 00:00:00 2001 From: Adil Ahmed Date: Wed, 15 May 2024 02:18:12 +0500 Subject: [PATCH] Added payroll summary stream. --- tap_restaurant365/client.py | 4 +- tap_restaurant365/streams.py | 80 ++++++++++++++++++++++++++++++++++++ tap_restaurant365/tap.py | 1 + 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/tap_restaurant365/client.py b/tap_restaurant365/client.py index 0a3a1af..be51312 100644 --- a/tap_restaurant365/client.py +++ b/tap_restaurant365/client.py @@ -65,9 +65,11 @@ def get_new_paginator(self) -> BaseAPIPaginator: def get_starting_time(self, context): start_date = self.config.get("start_date") + rep_key = None if start_date: start_date = parser.parse(self.config.get("start_date")) - rep_key = self.get_starting_timestamp(context) + timedelta(seconds=1) + if context: + rep_key = self.get_starting_timestamp(context) + timedelta(seconds=1) return rep_key or start_date def get_url_params( diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index 2f1b359..f05364d 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -106,6 +106,8 @@ def get_url_params( params["$filter"] += f" and type eq 'Stock Count'" if self.name == "bank_expenses": params["$filter"] += f" and type eq 'Bank Expense'" + if self.name == "payroll_summary": + params["$filter"] = f"payrollStart ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and payrollEnd le {end_date.strftime('%Y-%m-%dT23:59:59Z')}" # if skip > 0: params["$skip"] = skip @@ -650,3 +652,81 @@ def get_url_params( if skip > 0: params["$skip"] = skip return params + +class PayrollSummaryStream(LimitedTimeframeStream): + """Define custom stream.""" + + name = "payroll_summary" + path = "/PayrollSummary" + primary_keys = None + replication_key = None + pagination_date = None + schema = th.PropertiesList( + th.Property("employeeID", th.StringType), + th.Property("location", th.StringType), + th.Property("locationNumber", th.StringType), + th.Property("jobCode", th.StringType), + th.Property("payRate", th.NumberType), + th.Property("regularHours", th.NumberType), + th.Property("overtimeHours", th.NumberType), + th.Property("doubleOvertime", th.NumberType), + th.Property("breakPenalty", th.NumberType), + th.Property("grossReceipts", th.NumberType), + th.Property("splitShiftPenalty", th.NumberType), + th.Property("chargeTips", th.NumberType), + th.Property("declaredTips", th.NumberType), + th.Property("percentageOfSales", th.NumberType), + th.Property("percent", th.IntegerType), + th.Property("payrollStart", th.DateTimeType), + th.Property("payrollEnd", th.DateTimeType), + ).to_dict() + def get_next_page_token( + self, response: requests.Response, previous_token: t.Optional[t.Any] + ) -> t.Optional[t.Any]: + """ + Return a token for identifying next page or None if no more pages. + + The token is a datetime object that is used to filter the next page + of results. The token is calculated by adding the days_delta + parameter to the previous token. If the new token is in the future, + the pagination is disabled. + + Args: + response: The response object from the latest request. + previous_token: The token from the previous page, or None if + this is the first page. + + Returns: + A token for the next page, or None if no more pages are available. + """ + today = datetime.today() # noqa: DTZ002 + if self.pagination_date: + previous_token = self.pagination_date + if isinstance(previous_token,str): + previous_token = parser.parse(previous_token) + # Add a day to previous token + next_token = previous_token + timedelta(days=1) + next_token = next_token.replace(tzinfo=None) + + # Disable pagination if the next token's date is in the future + if (today - next_token).days < 0: + return None + # Return the next token and the current skip value + return next_token + + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: Any | None, # noqa: ANN401 + ) -> dict[str, Any]: + + params: dict = {} + token_date = None + if next_page_token: + token_date = next_page_token + start_date = token_date or self.get_starting_time(context) + end_date = start_date + timedelta(days=self.days_delta) + self.pagination_date = end_date + params["$filter"] = f"payrollStart ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and payrollEnd le {end_date.strftime('%Y-%m-%dT23:59:59Z')}" + return params diff --git a/tap_restaurant365/tap.py b/tap_restaurant365/tap.py index cbdcc80..590eb0e 100644 --- a/tap_restaurant365/tap.py +++ b/tap_restaurant365/tap.py @@ -67,6 +67,7 @@ def discover_streams(self) -> list[streams.Restaurant365Stream]: streams.StockCountStream(self), streams.TransactionsStream(self), streams.TransactionDetailsStream(self), + streams.PayrollSummaryStream(self), ]