diff --git a/.gitignore b/.gitignore index c3b0a98..cba99f1 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,4 @@ dmypy.json # Pyre type checker .pyre/ +.vscode diff --git a/tap_netsuite_rest/client.py b/tap_netsuite_rest/client.py index d25a109..8b55fd3 100644 --- a/tap_netsuite_rest/client.py +++ b/tap_netsuite_rest/client.py @@ -5,6 +5,7 @@ import requests import pendulum import copy +import re from pathlib import Path from datetime import datetime, timedelta @@ -759,3 +760,20 @@ def prepare_request_payload( ) self.logger.info(f"Making query ({timeframe})") return payload + + + # Remove double spaces that might result from empty address fields + def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]: + # Collapse duplicate spaces in address fields + if row.get("shippingaddress"): + row["shippingaddress"] = re.sub(r'(, )+', ', ', row["shippingaddress"]).strip(', ') + if row["shippingaddress"] == "": + row.pop("shippingaddress") + if row.get("billingaddress"): + row["billingaddress"] = re.sub(r'(, )+', ', ', row["billingaddress"]).strip(', ') + if row["billingaddress"] == "": + row.pop("billingaddress") + + + return row + \ No newline at end of file diff --git a/tap_netsuite_rest/streams.py b/tap_netsuite_rest/streams.py index caeb047..87989aa 100644 --- a/tap_netsuite_rest/streams.py +++ b/tap_netsuite_rest/streams.py @@ -61,6 +61,20 @@ class SalesTransactionsStream(TransactionRootStream): replication_key = "lastmodifieddate" custom_filter = "transaction.recordtype = 'salesorder'" + + join = """ + LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey + LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey + """ + + def get_selected_properties(self): + transaction_properties = super().get_selected_properties() + transaction_properties.extend([ + "COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress", + "COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress" + ]) + return transaction_properties + schema = th.PropertiesList( th.Property("abbrevtype", th.StringType), th.Property("actualshipdate", th.DateTimeType), @@ -110,18 +124,26 @@ class SalesTransactionsStream(TransactionRootStream): ).to_dict() -class VendorBillsStream(NetSuiteStream): +class VendorBillsStream(TransactionRootStream): name = "vendor_bill_transactions" primary_keys = ["id"] table = "transaction" replication_key = "lastmodifieddate" custom_filter = "recordtype = 'vendorbill'" + + join = """ + LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey + LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey + """ + schema = th.PropertiesList( th.Property("abbrevtype", th.StringType), th.Property("approvalstatus", th.StringType), th.Property("balsegstatus", th.StringType), th.Property("billingstatus", th.StringType), + th.Property("billingaddress", th.StringType), + th.Property("shippingaddress", th.StringType), th.Property("closedate", th.DateTimeType), th.Property("createdby", th.StringType), th.Property("createddate", th.DateTimeType), @@ -160,6 +182,14 @@ class VendorBillsStream(NetSuiteStream): th.Property("voided", th.StringType), ).to_dict() + def get_selected_properties(self): + transaction_properties = super().get_selected_properties() + transaction_properties.extend([ + "COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress", + "COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress" + ]) + return transaction_properties + class SalesTransactionLinesStream(TransactionRootStream): name = "sales_transactions_lines" @@ -284,18 +314,20 @@ class VendorStream(NetsuiteDynamicStream): replication_key = "lastmodifieddate" -class ShippingAddressStream(NetsuiteDynamicStream): - name = "shipping_address" - primary_keys = ["nkey"] - table = "TransactionShippingAddress" - replication_key = "lastmodifieddate" +# The following streams were removed because they are not documented by Netsuite nor well behaved with keys: +# Instead, shipping + billing address is joined on transaction streams +# class ShippingAddressStream(NetsuiteDynamicStream): +# name = "shipping_address" +# primary_keys = ["nkey"] +# table = "TransactionShippingAddress" +# replication_key = "lastmodifieddate" -class BillingAddressStream(NetsuiteDynamicStream): - name = "billing_address" - primary_keys = ["nkey"] - table = "TransactionBillingAddress" - replication_key = "lastmodifieddate" +# class BillingAddressStream(NetsuiteDynamicStream): +# name = "billing_address" +# primary_keys = ["nkey"] +# table = "TransactionBillingAddress" +# replication_key = "lastmodifieddate" class TermStream(NetsuiteDynamicStream): @@ -640,10 +672,19 @@ class TransactionsStream(TransactionRootStream): primary_keys = ["id", "lastmodifieddate"] table = "transaction" replication_key = "lastmodifieddate" + + + join = """ + LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey + LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey + """ + default_fields = [ th.Property("id", th.StringType), th.Property("type", th.StringType), th.Property("entity", th.StringType), + th.Property("shippingaddress", th.StringType), + th.Property("billingaddress", th.StringType), th.Property("otherrefnum", th.StringType), th.Property("closedate", th.DateType), th.Property("duedate", th.DateType), @@ -675,7 +716,11 @@ def get_selected_properties(self): selected_properties.append('BUILTIN.DF( Transaction.Status ) AS status_description') selected_properties.append('BUILTIN.DF( Transaction.ApprovalStatus ) AS approvalstatus_description') - + + # Build Formatted Addresses + selected_properties.append("COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress") + selected_properties.append("COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress") + return selected_properties