From 9be2c5021bd074ed581c57d87b2d1feeb7887eb6 Mon Sep 17 00:00:00 2001 From: Key Date: Tue, 10 Sep 2024 16:06:25 -0500 Subject: [PATCH] add read only fields to memory, clean record --- target_salesforce_v3/sinks.py | 10 ++++++++++ target_salesforce_v3/target.py | 2 ++ 2 files changed, 12 insertions(+) diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index 177f2c7..6e65b03 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -928,6 +928,10 @@ def preprocess_record(self, record, context): for key in record: if isinstance(record.get(key), datetime): record[key] = record[key].isoformat() + + # clean any read only fields + for field in self._target.read_only_fields.get(self.stream_name, []): + record.pop(field, None) return record def upsert_record(self, record, context): @@ -1010,6 +1014,12 @@ def upsert_record(self, record, context): if "INVALID_FIELD_FOR_INSERT_UPDATE" in str(e): fields = json.loads(str(e))[0]['fields'] self.logger.warning(f"Attempted to write read-only fields: {fields}. Removing them and retrying.") + # append read-only field to a list + if not self._target.read_only_fields.get(self.stream_name): + self._target.read_only_fields[self.stream_name] = fields + else: + self._target.read_only_fields[self.stream_name].extend(fields) + # remove read-only fields from record for f in fields: record.pop(f, None) # retry diff --git a/target_salesforce_v3/target.py b/target_salesforce_v3/target.py index 288a281..51e85a6 100644 --- a/target_salesforce_v3/target.py +++ b/target_salesforce_v3/target.py @@ -34,6 +34,8 @@ class TargetSalesforceV3(TargetHotglue): name = "target-salesforce-v3" MAX_PARALLELISM = 10 SINK_TYPES = SINK_TYPES + read_only_fields = {} + def get_sink_class(self, stream_name: str): """Get sink for a stream.""" for sink_class in SINK_TYPES: