diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index abda4282..348c6dd5 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -4,6 +4,9 @@ from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, shopify_error_handling) +import os +import sys +import json LOGGER = singer.get_logger() @@ -31,14 +34,14 @@ def canonicalize(transaction_dict, field_name): value_upper = receipt.get(field_name_upper) if value_lower and value_upper: if value_lower == value_upper: - LOGGER.info(( - "Transaction (id=%d) contains a receipt " - "that has `%s` and `%s` keys with the same " - "value. Removing the `%s` key."), - transaction_dict['id'], - field_name, - field_name_upper, - field_name_upper) + # LOGGER.info( + # "Transaction (id=%d) contains a receipt " + # "that has `%s` and `%s` keys with the same " + # "value. Removing the `%s` key.", + # transaction_dict['id'], + # field_name, + # field_name_upper, + # field_name_upper) transaction_dict['receipt'].pop(field_name_upper) else: raise ValueError(( @@ -52,6 +55,15 @@ def canonicalize(transaction_dict, field_name): # pylint: disable=line-too-long transaction_dict["receipt"][field_name] = transaction_dict['receipt'].pop(field_name_upper) +class HiddenPrints: + def __enter__(self): + self._original_stdout = sys.stdout + sys.stdout = open(os.devnull, 'w') + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout.close() + sys.stdout = self._original_stdout + class Transactions(Stream): name = 'transactions' @@ -61,14 +73,73 @@ class Transactions(Stream): # nothing to set the `replication_method` member to. # https://help.shopify.com/en/api/reference/orders/transaction#properties + gql_query = """ + query Orders($query: String, $cursor: String) { + orders(first: 250, query: $query, after: $cursor) { + nodes { + transactions(first: 100) { + authorizationCode + createdAt + errorCode + gateway + id + kind + paymentDetails { + ... on CardPaymentDetails { + avsResultCode + bin + company + cvvResultCode + expirationMonth + expirationYear + name + number + paymentMethodName + wallet + } + } + receiptJson + status + test + parentTransaction { + id + } + amountV2 { + amount + currencyCode + } + } + id + } + pageInfo { + endCursor + hasNextPage + hasPreviousPage + startCursor + } + } + } + """ + # TODO: add user.id back if they are Shopify Plus store + # user { + # id + # } + # add retailLocation.id back on newer API version Field 'retailLocation' doesn't exist on type 'Order' + # retailLocation { + # id + # } + @shopify_error_handling - def call_api_for_transactions(self, parent_object): - return self.replication_object.find( - limit=TRANSACTIONS_RESULTS_PER_PAGE, - order_id=parent_object.id, - ) + def call_api_for_transactions(self, gql_client, query, cursor=None): + with HiddenPrints(): + response = gql_client.execute(self.gql_query, dict(query=query, cursor=cursor)) + result = json.loads(response) + if result.get("errors"): + raise Exception(result['errors']) + return result + - def get_transactions(self, parent_object): + def get_transactions(self, query): # We do not need to support paging on this substream. If that # were to become untrue, reference Metafields. # @@ -78,12 +149,16 @@ def get_transactions(self, parent_object): # # https://github.com/Shopify/shopify_python_api/blob/e8c475ccc84b1516912b37f691d00ecd24921e9b/shopify/resources/order.py#L17-L18 - page = self.call_api_for_transactions(parent_object) - yield from page + gql_client = shopify.GraphQL() + page = self.call_api_for_transactions(gql_client, query) + yield page - while page.has_next_page(): - page = self.get_next_page(page) - yield from page + # paginate + page_info = page['data']['orders']['pageInfo'] + while page_info['hasNextPage']: + page = self.call_api_for_transactions(gql_client, query, cursor=page_info['endCursor']) + page_info = page['data']['orders']['pageInfo'] + yield page def get_objects(self): # Right now, it's ok for the user to select 'transactions' but not @@ -94,20 +169,56 @@ def get_objects(self): # future. # Get transactions, bookmarking at `transaction_orders` + # get the bookmark from the orders stream selected_parent = Context.stream_objects['orders']() selected_parent.name = "transaction_orders" + updated_at = selected_parent.get_bookmark().isoformat() + query = f"updated_at:>'{updated_at}'" + transactions = 0 + + for page in self.get_transactions(query): + for order in page['data']['orders']['nodes']: + order_id = int(order['id'].split("/")[-1]) + location_id = order.get("retailLocation", {}).get("id") + for raw_tran in order['transactions']: + transaction = { + "order_id": order_id, + "location_id": location_id, + "error_code": raw_tran['errorCode'], + "user_id": (raw_tran.get("user") or {}).get("id"), # NOTE need to add the missing part of query above for this to work + "parent_id": int(raw_tran["parentTransaction"]["id"].split("/")[-1]) if raw_tran.get("parentTransaction") else None, + "test": raw_tran['test'], + "kind": raw_tran['kind'], + "amount": raw_tran['amountV2']['amount'], + "currency": raw_tran['amountV2']['currencyCode'], + "authorization": raw_tran['authorizationCode'], + "gateway": raw_tran['gateway'], + "id": raw_tran['id'].split("/")[-1], + "created_at": raw_tran['createdAt'], + "status": raw_tran['status'], + "admin_graphql_api_id": raw_tran['id'], + "receipt": json.loads(raw_tran['receiptJson']) if raw_tran.get("receiptJson") else None + } + + # add payment details + if raw_tran.get("paymentDetails"): + payment_details = raw_tran['paymentDetails'] + transaction["payment_details"] = { + "cvv_result_code": payment_details.get("cvvResultCode"), + "credit_card_bin": payment_details.get("bin"), + "credit_card_company": payment_details.get("name"), + "credit_card_number": payment_details.get("number"), + "avs_result_code": payment_details.get("avsResultCode"), + } + + transactions += 1 + yield transaction - # Page through all `orders`, bookmarking at `transaction_orders` - for parent_object in selected_parent.get_objects(): - transactions = self.get_transactions(parent_object) - for transaction in transactions: - yield transaction def sync(self): bookmark = self.get_bookmark() max_bookmark = bookmark - for transaction in self.get_objects(): - transaction_dict = transaction.to_dict() + for transaction_dict in self.get_objects(): replication_value = strptime_to_utc(transaction_dict[self.replication_key]) if replication_value >= bookmark: for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: