Skip to content

Commit

Permalink
Enhance error handling in Salesforce sinks by initializing response v…
Browse files Browse the repository at this point in the history
…ariable before API calls. Updated error logging to ensure response details are included when exceptions occur in ContactsSink, CampaignSink, and FallbackSink. This improves the clarity of error messages and aids in debugging.
  • Loading branch information
butkeraites-hotglue committed Dec 12, 2024
1 parent 016de51 commit 1b32da6
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def upsert_record(self, record, context):
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")
if record:
response = None
try:
response = self.request_api("POST", request_data=record)
id = response.json().get("id")
Expand All @@ -254,7 +255,8 @@ def upsert_record(self, record, context):
self.assign_to_topic(id,self.topics)
return id, True, state_updates
except Exception as e:
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
raise e

def validate_response(self, response):
Expand Down Expand Up @@ -308,6 +310,7 @@ def assign_to_topic(self,contact_id,topics:list) -> None:
# Add the contact to the topic
self.logger.info(f"INFO: Adding Contact/Lead Id:[{contact_id}] to Topic Id:[{topic_id}].")

response = None
try:
response = self.request_api("POST",endpoint="sobjects/TopicAssignment",request_data={
"EntityId": contact_id,
Expand All @@ -320,7 +323,8 @@ def assign_to_topic(self,contact_id,topics:list) -> None:
# Means it's already in the topic
if "DUPLICATE_VALUE" in str(e):
return
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
raise e

def assign_to_campaign(self,contact_id,campaigns:list) -> None:
Expand Down Expand Up @@ -365,6 +369,7 @@ def assign_to_campaign(self,contact_id,campaigns:list) -> None:
# Create the CampaignMember
self.logger.info(f"INFO: Adding Contact/Lead Id:[{contact_id}] as a CampaignMember of Campaign Id:[{mapping.get('CampaignId')}].")

response = None
try:
response = self.request_api("POST",endpoint="sobjects/CampaignMember",request_data=mapping)
data = response.json()
Expand All @@ -375,7 +380,8 @@ def assign_to_campaign(self,contact_id,campaigns:list) -> None:
id = data.get("id")
self.logger.info(f"CampaignMember created with id: {id}")
except Exception as e:
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
raise e


Expand Down Expand Up @@ -715,14 +721,17 @@ def upsert_record(self, record, context):
if not record.get("Name") and not record.get("WhatId"):
raise FatalAPIError("ERROR: Campaigns in Salesforce are required to have a 'Name' field")


response = None
try:
response = self.request_api("POST", request_data=record)
id = response.json().get("id")
self.logger.info(f"{self.name} created with id: {id}")
return id, True, state_updates
except Exception as e:
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
else:
self.logger.exception(f"Could not POST to {self.endpoint}: {e}")
raise e


Expand Down Expand Up @@ -1000,6 +1009,7 @@ def upsert_record(self, record, context):
if record.get("Id") or record.get("id"):
object_id = record.pop("Id") or record.pop("id")
url = "/".join([endpoint, object_id])
response = None
try:
response = self.request_api("PATCH", endpoint=url, request_data=record)
if response.status_code == 204:
Expand All @@ -1011,7 +1021,10 @@ def upsert_record(self, record, context):
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
else:
self.logger.exception(f"Could not PATCH to {url}: {e}")

if len(possible_update_fields) > 0:
for id_field in possible_update_fields:
Expand All @@ -1025,6 +1038,7 @@ def upsert_record(self, record, context):
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")

response = None
try:
if len(possible_update_fields) > 0:
self.logger.info("Failed to find updatable entity, trying to create it.")
Expand All @@ -1036,7 +1050,10 @@ def upsert_record(self, record, context):
return id, True, state_updates
except Exception as e:
if "INVALID_FIELD_FOR_INSERT_UPDATE" not in str(e):
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
else:
self.logger.exception(f"Could not POST to {endpoint}: {e}")
raise e
try:
fields = json.loads(str(e))[0]['fields']
Expand All @@ -1052,13 +1069,17 @@ def upsert_record(self, record, context):
for f in fields:
record.pop(f, None)
# retry
response = None
try:
response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
return id, True, state_updates
except Exception as e:
self.log_error_message(self.name, response, e)
if response:
self.log_error_message(self.name, response, e)
else:
self.logger.exception(f"Could not POST to {endpoint}: {e}")
raise e


Expand Down

0 comments on commit 1b32da6

Please sign in to comment.