Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HGI-6654: improve transaction load time #22

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 137 additions & 26 deletions tap_shopify/streams/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
shopify_error_handling)
import os
import sys
import json

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -31,14 +34,14 @@ def canonicalize(transaction_dict, field_name):
value_upper = receipt.get(field_name_upper)
if value_lower and value_upper:
if value_lower == value_upper:
LOGGER.info((
"Transaction (id=%d) contains a receipt "
"that has `%s` and `%s` keys with the same "
"value. Removing the `%s` key."),
transaction_dict['id'],
field_name,
field_name_upper,
field_name_upper)
# LOGGER.info(
# "Transaction (id=%d) contains a receipt "
# "that has `%s` and `%s` keys with the same "
# "value. Removing the `%s` key.",
# transaction_dict['id'],
# field_name,
# field_name_upper,
# field_name_upper)
transaction_dict['receipt'].pop(field_name_upper)
else:
raise ValueError((
Expand All @@ -52,6 +55,15 @@ def canonicalize(transaction_dict, field_name):
# pylint: disable=line-too-long
transaction_dict["receipt"][field_name] = transaction_dict['receipt'].pop(field_name_upper)

class HiddenPrints:
def __enter__(self):
self._original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')

def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout.close()
sys.stdout = self._original_stdout


class Transactions(Stream):
name = 'transactions'
Expand All @@ -61,14 +73,73 @@ class Transactions(Stream):
# nothing to set the `replication_method` member to.
# https://help.shopify.com/en/api/reference/orders/transaction#properties

gql_query = """
query Orders($query: String, $cursor: String) {
orders(first: 250, query: $query, after: $cursor) {
nodes {
transactions(first: 100) {
authorizationCode
createdAt
errorCode
gateway
id
kind
paymentDetails {
... on CardPaymentDetails {
avsResultCode
bin
company
cvvResultCode
expirationMonth
expirationYear
name
number
paymentMethodName
wallet
}
}
receiptJson
status
test
parentTransaction {
id
}
amountV2 {
amount
currencyCode
}
}
id
}
pageInfo {
endCursor
hasNextPage
hasPreviousPage
startCursor
}
}
}
"""
# TODO: add user.id back if they are Shopify Plus store
# user {
# id
# }
# add retailLocation.id back on newer API version Field 'retailLocation' doesn't exist on type 'Order'
# retailLocation {
# id
# }

@shopify_error_handling
def call_api_for_transactions(self, parent_object):
return self.replication_object.find(
limit=TRANSACTIONS_RESULTS_PER_PAGE,
order_id=parent_object.id,
)
def call_api_for_transactions(self, gql_client, query, cursor=None):
with HiddenPrints():
response = gql_client.execute(self.gql_query, dict(query=query, cursor=cursor))
result = json.loads(response)
if result.get("errors"):
raise Exception(result['errors'])
return result


def get_transactions(self, parent_object):
def get_transactions(self, query):
# We do not need to support paging on this substream. If that
# were to become untrue, reference Metafields.
#
Expand All @@ -78,12 +149,16 @@ def get_transactions(self, parent_object):
#
# https://github.com/Shopify/shopify_python_api/blob/e8c475ccc84b1516912b37f691d00ecd24921e9b/shopify/resources/order.py#L17-L18

page = self.call_api_for_transactions(parent_object)
yield from page
gql_client = shopify.GraphQL()
page = self.call_api_for_transactions(gql_client, query)
yield page

while page.has_next_page():
page = self.get_next_page(page)
yield from page
# paginate
page_info = page['data']['orders']['pageInfo']
while page_info['hasNextPage']:
page = self.call_api_for_transactions(gql_client, query, cursor=page_info['endCursor'])
page_info = page['data']['orders']['pageInfo']
yield page

def get_objects(self):
# Right now, it's ok for the user to select 'transactions' but not
Expand All @@ -94,20 +169,56 @@ def get_objects(self):
# future.

# Get transactions, bookmarking at `transaction_orders`
# get the bookmark from the orders stream
selected_parent = Context.stream_objects['orders']()
selected_parent.name = "transaction_orders"
updated_at = selected_parent.get_bookmark().isoformat()
query = f"updated_at:>'{updated_at}'"
transactions = 0

for page in self.get_transactions(query):
for order in page['data']['orders']['nodes']:
order_id = int(order['id'].split("/")[-1])
location_id = order.get("retailLocation", {}).get("id")
for raw_tran in order['transactions']:
transaction = {
"order_id": order_id,
"location_id": location_id,
"error_code": raw_tran['errorCode'],
"user_id": (raw_tran.get("user") or {}).get("id"), # NOTE need to add the missing part of query above for this to work
"parent_id": int(raw_tran["parentTransaction"]["id"].split("/")[-1]) if raw_tran.get("parentTransaction") else None,
"test": raw_tran['test'],
"kind": raw_tran['kind'],
"amount": raw_tran['amountV2']['amount'],
"currency": raw_tran['amountV2']['currencyCode'],
"authorization": raw_tran['authorizationCode'],
"gateway": raw_tran['gateway'],
"id": raw_tran['id'].split("/")[-1],
"created_at": raw_tran['createdAt'],
"status": raw_tran['status'],
"admin_graphql_api_id": raw_tran['id'],
"receipt": json.loads(raw_tran['receiptJson']) if raw_tran.get("receiptJson") else None
}

# add payment details
if raw_tran.get("paymentDetails"):
payment_details = raw_tran['paymentDetails']
transaction["payment_details"] = {
"cvv_result_code": payment_details.get("cvvResultCode"),
"credit_card_bin": payment_details.get("bin"),
"credit_card_company": payment_details.get("name"),
"credit_card_number": payment_details.get("number"),
"avs_result_code": payment_details.get("avsResultCode"),
}

transactions += 1
yield transaction

# Page through all `orders`, bookmarking at `transaction_orders`
for parent_object in selected_parent.get_objects():
transactions = self.get_transactions(parent_object)
for transaction in transactions:
yield transaction

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for transaction in self.get_objects():
transaction_dict = transaction.to_dict()
for transaction_dict in self.get_objects():
replication_value = strptime_to_utc(transaction_dict[self.replication_key])
if replication_value >= bookmark:
for field_name in ['token', 'version', 'ack', 'timestamp', 'build']:
Expand Down