diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index 4f982c48..d6c5b3a4 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -35,6 +35,7 @@ def initialize_shopify_client(): shopify.ShopifyResource.activate_session(session) graphql_version = Context.config.get('graphql_api_version', '2024-04') graphql_session = shopify.Session(shop, graphql_version, api_key) + # Shop.current() makes a call for shop details with provided shop and api_key return shopify.Shop.current().attributes, session, graphql_session @@ -177,7 +178,6 @@ def sync(): else: shopify.ShopifyResource.activate_session(rest_session) - if not Context.state.get('bookmarks'): Context.state['bookmarks'] = {} Context.state['bookmarks']['currently_sync_stream'] = stream_id diff --git a/tap_shopify/streams/compatibility/product_compatibility.py b/tap_shopify/streams/compatibility/product_compatibility.py new file mode 100644 index 00000000..029a2acd --- /dev/null +++ b/tap_shopify/streams/compatibility/product_compatibility.py @@ -0,0 +1,145 @@ +import json +import os +import shopify.resources + +class ProductCompatibility(): + def __init__(self, graphql_product): + """Initialize with a GraphQL product object.""" + self.graphql_product = graphql_product + self.admin_graphql_api_id = graphql_product["id"] + self.product_id = self._extract_int_id(graphql_product["id"]) + + current_dir = os.path.dirname(os.path.abspath(__file__)) + value_map_path = os.path.join(current_dir, "value_maps", "product.json") + with open(value_map_path, 'r') as file: + self.value_map = json.load(file) + + def metafields(self, _options=None, **kwargs): + if _options is None: + _options = kwargs + return shopify.resources.Metafield.find(resource="products", resource_id=self.product_id, **_options) + + def _extract_int_id(self, string_id): + return int(string_id.split("/")[-1]) + + def _convert_options(self): + return [ + { + "id": self._extract_int_id(option["id"]), + "product_id": self.product_id, + "name": option["name"], + "position": option["position"], + "values": option["values"] + } + for option in self.graphql_product["options"] + ] + + def _convert_images(self): + return [ + { + "id": self._extract_int_id(image["id"]), + "admin_graphql_api_id": image["id"], + "position": idx + 1, + "alt": image["altText"], + "created_at": None, # No longer supported by GraphQL API + "updated_at": None, # No longer supported by GraphQL API + "width": image["width"], + "height": image["height"], + "src": image["src"], + "variant_ids": None # No longer supported by GraphQL API + } + for idx, image in enumerate(self.graphql_product.get("images", {}).get("nodes", [])) + ] + + def _extract_variant_options(self, variant): + option_dict = { + "option1": None, + "option2": None, + "option3": None + } # The maximum number of selectedOptions returned from a ProductVariant is 3 + selected_options = variant["selectedOptions"] + for idx, option in enumerate(selected_options): + option_dict[f"option{idx + 1}"] = option["value"] + return option_dict + + def _cast_variant_values(self, variant): + """Cast variant values based on the value_map.""" + for key, value in variant.items(): + if key in self.value_map["variants"]: + key_map = self.value_map["variants"][key] + if value in key_map: + variant[key] = key_map[value] + return variant + + def _convert_variants(self): + return [ + { + "admin_graphql_api_id": variant["id"], + "barcode": variant["barcode"], + "compare_at_price": variant["compareAtPrice"], + "created_at": variant["createdAt"], + "fulfillment_service": variant["fulfillmentService"]["handle"], + "grams": None, # No longer supported by GraphQL API + "id": self._extract_int_id(variant["id"]), + "image_id": self._extract_int_id(variant["image"]["id"]) if variant.get("image") else None, + "inventory_item_id": self._extract_int_id(variant["inventoryItem"]["id"]), + "inventory_management": None, # No longer supported by GraphQL API + "inventory_policy": variant["inventoryPolicy"], + "inventory_quantity": variant["inventoryQuantity"], + "old_inventory_quantity": None, # No longer supported by GraphQL API + "position": variant["position"], + "price": variant["price"], + "requires_shipping": variant["inventoryItem"]["requiresShipping"], + "sku": variant["sku"], + "tax_code": variant["taxCode"], + "taxable": variant["taxable"], + "title": variant["title"], + "updated_at": variant["updatedAt"], + "weight": variant["weight"], + "weight_unit": variant["weightUnit"], + } | self._extract_variant_options(variant) + for variant in self.graphql_product.get("variants", {}).get("nodes", []) + ] + + def _cast_values(self, data, mappings): + """ + Recursively traverse and cast values in a dictionary or list based on mappings. + :param data: The data to process (dictionary, list, or scalar). + :param mappings: The mapping dictionary to use for casting. + :return: The processed data with values cast according to the mappings. + """ + if isinstance(data, dict): + return { + key: self._cast_values(value, mappings.get(key, {})) + for key, value in data.items() + } + elif isinstance(data, list): + return [self._cast_values(item, mappings) for item in data] + elif data in mappings: + return mappings[data] + return data + + def to_dict(self): + """Return the REST API-compatible product as a dictionary.""" + product_dict = { + "admin_graphql_api_id": self.graphql_product["id"], + "body_html": self.graphql_product["descriptionHtml"] or "", + "created_at": self.graphql_product["createdAt"], + "handle": self.graphql_product["handle"], + "id": self.product_id, + "image": None, # No longer supported by GraphQL API + "product_type": self.graphql_product["productType"], + "published_at": self.graphql_product["publishedAt"], + "published_scope": None, # No longer supported by GraphQL API + "status": self.graphql_product["status"], + "tags": ", ".join(self.graphql_product["tags"]), + "template_suffix": self.graphql_product["templateSuffix"], + "title": self.graphql_product["title"], + "updated_at": self.graphql_product["updatedAt"], + "vendor": self.graphql_product["vendor"], + "options": self._convert_options(), + "images": self._convert_images(), + "variants": self._convert_variants() + } + + return self._cast_values(product_dict, self.value_map) diff --git a/tap_shopify/streams/compatibility/value_maps/product.json b/tap_shopify/streams/compatibility/value_maps/product.json new file mode 100644 index 00000000..0fe9862a --- /dev/null +++ b/tap_shopify/streams/compatibility/value_maps/product.json @@ -0,0 +1,19 @@ +{ + "status": { + "ACTIVE": "active", + "ARCHIVED": "archived", + "DRAFT": "draft" + }, + "variants": { + "inventory_policy": { + "DENY": "deny", + "CONTINUE": "continue" + }, + "weight_unit": { + "GRAMS": "g", + "KILOGRAMS": "kg", + "OUNCES": "oz", + "POUNDS": "lb" + } + } +} \ No newline at end of file diff --git a/tap_shopify/streams/inventory_items.py b/tap_shopify/streams/inventory_items.py index 89088513..f837b895 100644 --- a/tap_shopify/streams/inventory_items.py +++ b/tap_shopify/streams/inventory_items.py @@ -25,10 +25,10 @@ def get_objects(self): # Page through all `products`, bookmarking at `product_variants` for parent_object in selected_parent.get_objects(): - - product_variants = parent_object.variants + product_dict = parent_object.to_dict() + product_variants = product_dict["variants"] inventory_items_ids = ",".join( - [str(product_variant.inventory_item_id) for product_variant in product_variants]) + [str(product_variant["inventory_item_id"]) for product_variant in product_variants]) # Max limit of IDs is 100 and Max limit of product_variants in one product is also 100 # hence we can directly pass all inventory_items_ids diff --git a/tap_shopify/streams/product_category.py b/tap_shopify/streams/product_category.py index 1daba9a9..f5dedadf 100644 --- a/tap_shopify/streams/product_category.py +++ b/tap_shopify/streams/product_category.py @@ -73,7 +73,7 @@ def sync(self): for incoming_item in self.get_objects(): replication_value = strptime_to_utc(incoming_item[self.replication_key]) if replication_value >= bookmark: - + yield incoming_item if replication_value > self.max_bookmark: self.max_bookmark = replication_value diff --git a/tap_shopify/streams/products.py b/tap_shopify/streams/products.py index fbfcb6a0..c7d570ce 100644 --- a/tap_shopify/streams/products.py +++ b/tap_shopify/streams/products.py @@ -1,12 +1,138 @@ import shopify - -from tap_shopify.streams.base import Stream +from tap_shopify.streams.base import (Stream, shopify_error_handling) from tap_shopify.context import Context +import json +from datetime import timedelta +import singer +from singer.utils import strftime +from tap_shopify.streams.compatibility.product_compatibility import ProductCompatibility +LOGGER = singer.get_logger() class Products(Stream): name = 'products' replication_object = shopify.Product - status_key = "published_status" + + gql_query = """ + query GetProducts($query: String, $cursor: String) { + products(first: 250, after: $cursor, query: $query) { + nodes { + status + publishedAt + createdAt + vendor + updatedAt + descriptionHtml + productType + tags + handle + templateSuffix + title + id + options { + id + name + position + values + } + images(first: 250) { + nodes { + id + altText + src + height + width + } + } + variants(first: 100) { + nodes { + id + title + sku + position + price + compareAtPrice + weight + weightUnit + inventoryPolicy + inventoryQuantity + taxable + taxCode + updatedAt + image { + id + } + inventoryItem { + id + requiresShipping + } + createdAt + barcode + fulfillmentService { + handle + } + selectedOptions { + name + value + } + } + } + } + pageInfo { + hasNextPage + endCursor + } + } + } + """ + + @shopify_error_handling + def call_api_for_products(self, gql_client, query, cursor=None): + variables = { + "query": query, + "cursor": cursor + } + response = gql_client.execute(self.gql_query, variables) + result = json.loads(response) + if result.get("errors"): + raise Exception(result['errors']) + return result + + def get_products(self, updated_at_min, updated_at_max, cursor=None): + gql_client = shopify.GraphQL() + query = f"updated_at:>'{updated_at_min.isoformat()}' AND updated_at:<'{updated_at_max.isoformat()}'" + page = self.call_api_for_products(gql_client, query, cursor) + return page + + def get_objects(self): + updated_at_min = self.get_bookmark() + stop_time = singer.utils.now().replace(microsecond=0) + date_window_size = float(Context.config.get("date_window_size", 1)) + + while updated_at_min < stop_time: + updated_at_max = updated_at_min + timedelta(days=date_window_size) + if updated_at_max > stop_time: + updated_at_max = stop_time + + LOGGER.info(f"Fetching products updated between {updated_at_min} and {updated_at_max}") + cursor = None + + while True: + page = self.get_products(updated_at_min, updated_at_max, cursor) + products = page['data']['products']['nodes'] + page_info = page['data']['products']['pageInfo'] + + for product in products: + yield ProductCompatibility(product) + + # Update the cursor and check if there's another page + if page_info['hasNextPage']: + cursor = page_info['endCursor'] + else: + break + + # Update the bookmark for the next batch + updated_at_min = updated_at_max + self.update_bookmark(strftime(updated_at_min)) Context.stream_objects['products'] = Products