From 009a071d05b9ed8bae8182468aec9e20a4516464 Mon Sep 17 00:00:00 2001 From: Pavan Chaithanya <74360974+BonapartePC@users.noreply.github.com> Date: Mon, 22 Jul 2024 11:57:15 +0530 Subject: [PATCH] chore: flipping SourceID & OriginalSourceID during transformation (#4887) --- processor/processor.go | 3 +++ processor/transformer/transformer.go | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/processor/processor.go b/processor/processor.go index 6b65d73395..f25f66bf8e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -961,6 +961,7 @@ func (proc *Handle) makeCommonMetadataFromSingularEvent(singularEvent types.Sing commonMetadata := transformer.Metadata{} commonMetadata.SourceID = source.ID commonMetadata.SourceName = source.Name + commonMetadata.OriginalSourceID = source.OriginalID commonMetadata.WorkspaceID = source.WorkspaceID commonMetadata.Namespace = proc.namespace commonMetadata.InstanceID = proc.instanceID @@ -992,6 +993,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme metadata.SourceType = commonMetadata.SourceType metadata.SourceCategory = commonMetadata.SourceCategory metadata.SourceID = commonMetadata.SourceID + metadata.OriginalSourceID = commonMetadata.OriginalSourceID metadata.SourceName = commonMetadata.SourceName metadata.WorkspaceID = commonMetadata.WorkspaceID metadata.Namespace = commonMetadata.Namespace @@ -2495,6 +2497,7 @@ func (proc *Handle) transformSrcDest( commonMetaData := &transformer.Metadata{ SourceID: sourceID, SourceName: sourceName, + OriginalSourceID: eventList[0].Metadata.OriginalSourceID, SourceType: eventList[0].Metadata.SourceType, SourceCategory: eventList[0].Metadata.SourceCategory, WorkspaceID: workspaceID, diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index adbed020a2..d46fdc8611 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -47,6 +47,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary type Metadata struct { SourceID string `json:"sourceId"` SourceName string `json:"sourceName"` + OriginalSourceID string `json:"originalSourceId"` WorkspaceID string `json:"workspaceId"` Namespace string `json:"namespace"` InstanceID string `json:"instanceId"` @@ -248,6 +249,15 @@ func (trans *handle) transform( if len(clientEvents) == 0 { return Response{} } + // flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation + // flip back afterwards + for _, clientEvent := range clientEvents { + if clientEvent.Metadata.OriginalSourceID != "" { + originalSourceID := clientEvent.Metadata.OriginalSourceID + clientEvent.Metadata.OriginalSourceID = clientEvent.Metadata.SourceID + clientEvent.Metadata.SourceID = originalSourceID + } + } sTags := stats.Tags{ "dest_type": clientEvents[0].Destination.DestinationDefinition.Name, "dest_id": clientEvents[0].Destination.ID, @@ -306,6 +316,11 @@ func (trans *handle) transform( // Transform is one to many mapping so returned // response for each is an array. We flatten it out for _, transformerResponse := range batch { + if transformerResponse.Metadata.OriginalSourceID != "" { + originalSourceID := transformerResponse.Metadata.SourceID + transformerResponse.Metadata.SourceID = transformerResponse.Metadata.OriginalSourceID + transformerResponse.Metadata.OriginalSourceID = originalSourceID + } switch transformerResponse.StatusCode { case http.StatusOK: outClientEvents = append(outClientEvents, transformerResponse)