Skip to content

Commit

Permalink
HGI-7208 - Remove address streams and build address into transactions (
Browse files Browse the repository at this point in the history
  • Loading branch information
LiorB-D authored and hsyyid committed Feb 17, 2025
1 parent d13ee04 commit 2531918
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ dmypy.json

# Pyre type checker
.pyre/
.vscode
18 changes: 18 additions & 0 deletions tap_netsuite_rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import requests
import pendulum
import copy
import re

from pathlib import Path
from datetime import datetime, timedelta
Expand Down Expand Up @@ -759,3 +760,20 @@ def prepare_request_payload(
)
self.logger.info(f"Making query ({timeframe})")
return payload


# Remove double spaces that might result from empty address fields
def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]:
# Collapse duplicate spaces in address fields
if row.get("shippingaddress"):
row["shippingaddress"] = re.sub(r'(, )+', ', ', row["shippingaddress"]).strip(', ')
if row["shippingaddress"] == "":
row.pop("shippingaddress")
if row.get("billingaddress"):
row["billingaddress"] = re.sub(r'(, )+', ', ', row["billingaddress"]).strip(', ')
if row["billingaddress"] == "":
row.pop("billingaddress")


return row

69 changes: 57 additions & 12 deletions tap_netsuite_rest/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ class SalesTransactionsStream(TransactionRootStream):
replication_key = "lastmodifieddate"
custom_filter = "transaction.recordtype = 'salesorder'"


join = """
LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey
LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey
"""

def get_selected_properties(self):
transaction_properties = super().get_selected_properties()
transaction_properties.extend([
"COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress",
"COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress"
])
return transaction_properties

schema = th.PropertiesList(
th.Property("abbrevtype", th.StringType),
th.Property("actualshipdate", th.DateTimeType),
Expand Down Expand Up @@ -110,18 +124,26 @@ class SalesTransactionsStream(TransactionRootStream):
).to_dict()


class VendorBillsStream(NetSuiteStream):
class VendorBillsStream(TransactionRootStream):
name = "vendor_bill_transactions"
primary_keys = ["id"]
table = "transaction"
replication_key = "lastmodifieddate"
custom_filter = "recordtype = 'vendorbill'"


join = """
LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey
LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey
"""

schema = th.PropertiesList(
th.Property("abbrevtype", th.StringType),
th.Property("approvalstatus", th.StringType),
th.Property("balsegstatus", th.StringType),
th.Property("billingstatus", th.StringType),
th.Property("billingaddress", th.StringType),
th.Property("shippingaddress", th.StringType),
th.Property("closedate", th.DateTimeType),
th.Property("createdby", th.StringType),
th.Property("createddate", th.DateTimeType),
Expand Down Expand Up @@ -160,6 +182,14 @@ class VendorBillsStream(NetSuiteStream):
th.Property("voided", th.StringType),
).to_dict()

def get_selected_properties(self):
transaction_properties = super().get_selected_properties()
transaction_properties.extend([
"COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress",
"COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress"
])
return transaction_properties


class SalesTransactionLinesStream(TransactionRootStream):
name = "sales_transactions_lines"
Expand Down Expand Up @@ -284,18 +314,20 @@ class VendorStream(NetsuiteDynamicStream):
replication_key = "lastmodifieddate"


class ShippingAddressStream(NetsuiteDynamicStream):
name = "shipping_address"
primary_keys = ["nkey"]
table = "TransactionShippingAddress"
replication_key = "lastmodifieddate"
# The following streams were removed because they are not documented by Netsuite nor well behaved with keys:
# Instead, shipping + billing address is joined on transaction streams
# class ShippingAddressStream(NetsuiteDynamicStream):
# name = "shipping_address"
# primary_keys = ["nkey"]
# table = "TransactionShippingAddress"
# replication_key = "lastmodifieddate"


class BillingAddressStream(NetsuiteDynamicStream):
name = "billing_address"
primary_keys = ["nkey"]
table = "TransactionBillingAddress"
replication_key = "lastmodifieddate"
# class BillingAddressStream(NetsuiteDynamicStream):
# name = "billing_address"
# primary_keys = ["nkey"]
# table = "TransactionBillingAddress"
# replication_key = "lastmodifieddate"


class TermStream(NetsuiteDynamicStream):
Expand Down Expand Up @@ -640,10 +672,19 @@ class TransactionsStream(TransactionRootStream):
primary_keys = ["id", "lastmodifieddate"]
table = "transaction"
replication_key = "lastmodifieddate"


join = """
LEFT JOIN TransactionShippingAddress tsa ON transaction.shippingaddress = tsa.nkey
LEFT JOIN TransactionBillingAddress tba ON transaction.billingaddress = tba.nkey
"""

default_fields = [
th.Property("id", th.StringType),
th.Property("type", th.StringType),
th.Property("entity", th.StringType),
th.Property("shippingaddress", th.StringType),
th.Property("billingaddress", th.StringType),
th.Property("otherrefnum", th.StringType),
th.Property("closedate", th.DateType),
th.Property("duedate", th.DateType),
Expand Down Expand Up @@ -675,7 +716,11 @@ def get_selected_properties(self):

selected_properties.append('BUILTIN.DF( Transaction.Status ) AS status_description')
selected_properties.append('BUILTIN.DF( Transaction.ApprovalStatus ) AS approvalstatus_description')


# Build Formatted Addresses
selected_properties.append("COALESCE(tsa.addr1, '') || ', ' || COALESCE(tsa.addr2, '') || ', ' || COALESCE(tsa.addr3, '') || ', ' || COALESCE(tsa.city, '') || ', ' || COALESCE(tsa.state, '') || ', ' || COALESCE(tsa.zip, '') || ', ' || COALESCE(tsa.country, '') as shippingaddress")
selected_properties.append("COALESCE(tba.addr1, '') || ', ' || COALESCE(tba.addr2, '') || ', ' || COALESCE(tba.addr3, '') || ', ' || COALESCE(tba.city, '') || ', ' || COALESCE(tba.state, '') || ', ' || COALESCE(tba.zip, '') || ', ' || COALESCE(tba.country, '') as billingaddress")

return selected_properties


Expand Down

0 comments on commit 2531918

Please sign in to comment.