From 7cb6ff112be869b608fd97c3321b3b763a8741c9 Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Thu, 5 Dec 2024 09:09:28 -0500 Subject: [PATCH] add support to send attachments (#28) --- target_salesforce_v3/sinks.py | 44 ++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index 8d014f1..3432cf0 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -848,6 +848,8 @@ def lookup_fields_dict(self): @property def name(self): return self.stream_name + + not_searchable_by_mail = ["ContentVersion"] def get_fields_for_object(self, object_type): """Check if Salesforce has an object type and fetches its fields.""" @@ -883,6 +885,10 @@ def preprocess_record(self, record, context): self.logger.info("Skipping record, because it was not found on Salesforce.") return {} + # add field to link attachments + if self.name == "ContentVersion": + fields.update({"LinkedEntityId": {"createable": True}}) + # keep only available fields and that are creatable or updatable # NOTE: we need to keep relations (__r, xId) record = {k:v for k,v in record.items() if k.endswith("__r") or fields.get(k+"Id") or (fields.get(k) and (fields[k]["createable"] or fields[k]["updateable"] or k.lower() in ["id", "externalid"]))} @@ -907,7 +913,7 @@ def preprocess_record(self, record, context): req = self.request_api("GET", "queryAll", params={"q": query}) req = req.json().get("records") # lookup for record with email fields - elif self.config.get("lookup_by_email", True): + elif self.config.get("lookup_by_email", True) and self.name not in self.not_searchable_by_mail: # Try to find object instance using email email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"] email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)] @@ -954,7 +960,11 @@ def upsert_record(self, record, context): if record == {}: self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.") return + + # for files pop object id to link the file + linked_object_id = record.pop("LinkedEntityId", None) if self.name == "ContentVersion" else None + # get object fields fields_desc = self.sf_fields_description(object_type=object_type) possible_update_fields = [] @@ -999,6 +1009,7 @@ def upsert_record(self, record, context): return object_id, True, state_updates id = response.json().get("id") + self.link_attachment_to_object(id, linked_object_id) self.logger.info(f"{object_type} updated with id: {id}") return id, True, state_updates except Exception as e: @@ -1011,6 +1022,7 @@ def upsert_record(self, record, context): response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])}) id = response.json().get("id") self.logger.info(f"{object_type} updated with id: {id}") + self.link_attachment_to_object(id, linked_object_id) return id, True, state_updates except Exception as e: self.logger.exception(f"Could not PATCH to {url}: {e}") @@ -1022,6 +1034,7 @@ def upsert_record(self, record, context): response = self.request_api("POST", endpoint=endpoint, request_data=record) id = response.json().get("id") self.logger.info(f"{object_type} created with id: {id}") + self.link_attachment_to_object(id, linked_object_id) return id, True, state_updates except Exception as e: if "INVALID_FIELD_FOR_INSERT_UPDATE" in str(e): @@ -1047,3 +1060,32 @@ def upsert_record(self, record, context): self.logger.exception(f"Error encountered while creating {object_type}") raise e + + def link_attachment_to_object(self, file_id, linked_object_id): + if self.name != "ContentVersion": + return + if not linked_object_id: + self.logger.info(f"Object id not found to link file with id {file_id}") + return + try: + # get contentdocumentid + content_endpoint = "query" + params = {"q": f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{file_id}'"} + content_document_id = self.request_api("GET", endpoint=content_endpoint, params=params) + content_document_id = content_document_id.json() + if content_document_id.get("records"): + content_document_id = content_document_id["records"][0]["ContentDocumentId"] + else: + raise Exception(f"Failed while trying to link file {file_id} and object {linked_object_id} because ContentDocumentId was not found") + + endpoint = "sobjects/ContentDocumentLink" + record = { + "ContentDocumentId": content_document_id, + "LinkedEntityId": linked_object_id, + "ShareType": "V" + } + response = self.request_api("POST", endpoint=endpoint, request_data=record) + self.logger.info(f"File with id {file_id} succesfully linked to object with id {linked_object_id}. Link id {response.json()['id']}") + except Exception as e: + self.logger.info(f"Failed while trying to link file {file_id} and object {linked_object_id}") + raise e \ No newline at end of file