Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HG-3697: Use GraphQL API for Products Stream #28

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tap_shopify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def initialize_shopify_client():
shopify.ShopifyResource.activate_session(session)
graphql_version = Context.config.get('graphql_api_version', '2024-04')
graphql_session = shopify.Session(shop, graphql_version, api_key)

# Shop.current() makes a call for shop details with provided shop and api_key
return shopify.Shop.current().attributes, session, graphql_session

Expand Down Expand Up @@ -177,7 +178,6 @@ def sync():
else:
shopify.ShopifyResource.activate_session(rest_session)


if not Context.state.get('bookmarks'):
Context.state['bookmarks'] = {}
Context.state['bookmarks']['currently_sync_stream'] = stream_id
Expand Down
145 changes: 145 additions & 0 deletions tap_shopify/streams/compatibility/product_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import json
import os
import shopify.resources

class ProductCompatibility():
def __init__(self, graphql_product):
"""Initialize with a GraphQL product object."""
self.graphql_product = graphql_product
self.admin_graphql_api_id = graphql_product["id"]
self.product_id = self._extract_int_id(graphql_product["id"])

current_dir = os.path.dirname(os.path.abspath(__file__))
value_map_path = os.path.join(current_dir, "value_maps", "product.json")
with open(value_map_path, 'r') as file:
self.value_map = json.load(file)

def metafields(self, _options=None, **kwargs):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to support the metafields stream, which calls this method here.

Unfortunately, it seems that the request shopify.resources.Metafield.find issues looks like this:

https://hotglue-testing-1.myshopify.com/admin/api/2024-01/products/7903137038556/metafields.json?limit=175&since_id=1

Which calls the /products endpoint. Looking into how to resolve this.

if _options is None:
_options = kwargs
return shopify.resources.Metafield.find(resource="products", resource_id=self.product_id, **_options)

def _extract_int_id(self, string_id):
return int(string_id.split("/")[-1])

def _convert_options(self):
return [
{
"id": self._extract_int_id(option["id"]),
"product_id": self.product_id,
"name": option["name"],
"position": option["position"],
"values": option["values"]
}
for option in self.graphql_product["options"]
]

def _convert_images(self):
return [
{
"id": self._extract_int_id(image["id"]),
"admin_graphql_api_id": image["id"],
"position": idx + 1,
"alt": image["altText"],
"created_at": None, # No longer supported by GraphQL API
"updated_at": None, # No longer supported by GraphQL API
"width": image["width"],
"height": image["height"],
"src": image["src"],
"variant_ids": None # No longer supported by GraphQL API
}
for idx, image in enumerate(self.graphql_product.get("images", {}).get("nodes", []))
]

def _extract_variant_options(self, variant):
option_dict = {
"option1": None,
"option2": None,
"option3": None
} # The maximum number of selectedOptions returned from a ProductVariant is 3
selected_options = variant["selectedOptions"]
for idx, option in enumerate(selected_options):
option_dict[f"option{idx + 1}"] = option["value"]
return option_dict

def _cast_variant_values(self, variant):
"""Cast variant values based on the value_map."""
for key, value in variant.items():
if key in self.value_map["variants"]:
key_map = self.value_map["variants"][key]
if value in key_map:
variant[key] = key_map[value]
return variant

def _convert_variants(self):
return [
{
"admin_graphql_api_id": variant["id"],
"barcode": variant["barcode"],
"compare_at_price": variant["compareAtPrice"],
"created_at": variant["createdAt"],
"fulfillment_service": variant["fulfillmentService"]["handle"],
"grams": None, # No longer supported by GraphQL API
"id": self._extract_int_id(variant["id"]),
"image_id": self._extract_int_id(variant["image"]["id"]) if variant.get("image") else None,
"inventory_item_id": self._extract_int_id(variant["inventoryItem"]["id"]),
"inventory_management": None, # No longer supported by GraphQL API
"inventory_policy": variant["inventoryPolicy"],
"inventory_quantity": variant["inventoryQuantity"],
"old_inventory_quantity": None, # No longer supported by GraphQL API
"position": variant["position"],
"price": variant["price"],
"requires_shipping": variant["inventoryItem"]["requiresShipping"],
"sku": variant["sku"],
"tax_code": variant["taxCode"],
"taxable": variant["taxable"],
"title": variant["title"],
"updated_at": variant["updatedAt"],
"weight": variant["weight"],
"weight_unit": variant["weightUnit"],
} | self._extract_variant_options(variant)
for variant in self.graphql_product.get("variants", {}).get("nodes", [])
]

def _cast_values(self, data, mappings):
"""
Recursively traverse and cast values in a dictionary or list based on mappings.
:param data: The data to process (dictionary, list, or scalar).
:param mappings: The mapping dictionary to use for casting.
:return: The processed data with values cast according to the mappings.
"""
if isinstance(data, dict):
return {
key: self._cast_values(value, mappings.get(key, {}))
for key, value in data.items()
}
elif isinstance(data, list):
return [self._cast_values(item, mappings) for item in data]
elif data in mappings:
return mappings[data]
return data

def to_dict(self):
"""Return the REST API-compatible product as a dictionary."""
product_dict = {
"admin_graphql_api_id": self.graphql_product["id"],
"body_html": self.graphql_product["descriptionHtml"] or "",
"created_at": self.graphql_product["createdAt"],
"handle": self.graphql_product["handle"],
"id": self.product_id,
"image": None, # No longer supported by GraphQL API
"product_type": self.graphql_product["productType"],
"published_at": self.graphql_product["publishedAt"],
"published_scope": None, # No longer supported by GraphQL API
"status": self.graphql_product["status"],
"tags": ", ".join(self.graphql_product["tags"]),
"template_suffix": self.graphql_product["templateSuffix"],
"title": self.graphql_product["title"],
"updated_at": self.graphql_product["updatedAt"],
"vendor": self.graphql_product["vendor"],
"options": self._convert_options(),
"images": self._convert_images(),
"variants": self._convert_variants()
}

return self._cast_values(product_dict, self.value_map)
19 changes: 19 additions & 0 deletions tap_shopify/streams/compatibility/value_maps/product.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"status": {
"ACTIVE": "active",
"ARCHIVED": "archived",
"DRAFT": "draft"
},
"variants": {
"inventory_policy": {
"DENY": "deny",
"CONTINUE": "continue"
},
"weight_unit": {
"GRAMS": "g",
"KILOGRAMS": "kg",
"OUNCES": "oz",
"POUNDS": "lb"
}
}
}
6 changes: 3 additions & 3 deletions tap_shopify/streams/inventory_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def get_objects(self):

# Page through all `products`, bookmarking at `product_variants`
for parent_object in selected_parent.get_objects():

product_variants = parent_object.variants
product_dict = parent_object.to_dict()
product_variants = product_dict["variants"]
inventory_items_ids = ",".join(
[str(product_variant.inventory_item_id) for product_variant in product_variants])
[str(product_variant["inventory_item_id"]) for product_variant in product_variants])

# Max limit of IDs is 100 and Max limit of product_variants in one product is also 100
# hence we can directly pass all inventory_items_ids
Expand Down
2 changes: 1 addition & 1 deletion tap_shopify/streams/product_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def sync(self):
for incoming_item in self.get_objects():
replication_value = strptime_to_utc(incoming_item[self.replication_key])
if replication_value >= bookmark:

yield incoming_item
if replication_value > self.max_bookmark:
self.max_bookmark = replication_value
Expand Down
132 changes: 129 additions & 3 deletions tap_shopify/streams/products.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,138 @@
import shopify

from tap_shopify.streams.base import Stream
from tap_shopify.streams.base import (Stream, shopify_error_handling)
from tap_shopify.context import Context
import json
from datetime import timedelta
import singer
from singer.utils import strftime
from tap_shopify.streams.compatibility.product_compatibility import ProductCompatibility

LOGGER = singer.get_logger()

class Products(Stream):
name = 'products'
replication_object = shopify.Product
status_key = "published_status"

gql_query = """
query GetProducts($query: String, $cursor: String) {
products(first: 250, after: $cursor, query: $query) {
nodes {
status
publishedAt
createdAt
vendor
updatedAt
descriptionHtml
productType
tags
handle
templateSuffix
title
id
options {
id
name
position
values
}
images(first: 250) {
nodes {
id
altText
src
height
width
}
}
variants(first: 100) {
nodes {
id
title
sku
position
price
compareAtPrice
weight
weightUnit
inventoryPolicy
inventoryQuantity
taxable
taxCode
updatedAt
image {
id
}
inventoryItem {
id
requiresShipping
}
createdAt
barcode
fulfillmentService {
handle
}
selectedOptions {
name
value
}
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""

@shopify_error_handling
def call_api_for_products(self, gql_client, query, cursor=None):
variables = {
"query": query,
"cursor": cursor
}
response = gql_client.execute(self.gql_query, variables)
result = json.loads(response)
if result.get("errors"):
raise Exception(result['errors'])
return result

def get_products(self, updated_at_min, updated_at_max, cursor=None):
gql_client = shopify.GraphQL()
query = f"updated_at:>'{updated_at_min.isoformat()}' AND updated_at:<'{updated_at_max.isoformat()}'"
page = self.call_api_for_products(gql_client, query, cursor)
return page

def get_objects(self):
updated_at_min = self.get_bookmark()
stop_time = singer.utils.now().replace(microsecond=0)
date_window_size = float(Context.config.get("date_window_size", 1))

while updated_at_min < stop_time:
updated_at_max = updated_at_min + timedelta(days=date_window_size)
if updated_at_max > stop_time:
updated_at_max = stop_time

LOGGER.info(f"Fetching products updated between {updated_at_min} and {updated_at_max}")
cursor = None

while True:
page = self.get_products(updated_at_min, updated_at_max, cursor)
products = page['data']['products']['nodes']
page_info = page['data']['products']['pageInfo']

for product in products:
yield ProductCompatibility(product)

# Update the cursor and check if there's another page
if page_info['hasNextPage']:
cursor = page_info['endCursor']
else:
break

# Update the bookmark for the next batch
updated_at_min = updated_at_max
self.update_bookmark(strftime(updated_at_min))

Context.stream_objects['products'] = Products