Skip to content

Commit

Permalink
chore: flipping SourceID & OriginalSourceID during transformation (#4887
Browse files Browse the repository at this point in the history
)
  • Loading branch information
BonapartePC authored Jul 22, 2024
1 parent a80f494 commit 009a071
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
3 changes: 3 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ func (proc *Handle) makeCommonMetadataFromSingularEvent(singularEvent types.Sing
commonMetadata := transformer.Metadata{}
commonMetadata.SourceID = source.ID
commonMetadata.SourceName = source.Name
commonMetadata.OriginalSourceID = source.OriginalID
commonMetadata.WorkspaceID = source.WorkspaceID
commonMetadata.Namespace = proc.namespace
commonMetadata.InstanceID = proc.instanceID
Expand Down Expand Up @@ -992,6 +993,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme
metadata.SourceType = commonMetadata.SourceType
metadata.SourceCategory = commonMetadata.SourceCategory
metadata.SourceID = commonMetadata.SourceID
metadata.OriginalSourceID = commonMetadata.OriginalSourceID
metadata.SourceName = commonMetadata.SourceName
metadata.WorkspaceID = commonMetadata.WorkspaceID
metadata.Namespace = commonMetadata.Namespace
Expand Down Expand Up @@ -2495,6 +2497,7 @@ func (proc *Handle) transformSrcDest(
commonMetaData := &transformer.Metadata{
SourceID: sourceID,
SourceName: sourceName,
OriginalSourceID: eventList[0].Metadata.OriginalSourceID,
SourceType: eventList[0].Metadata.SourceType,
SourceCategory: eventList[0].Metadata.SourceCategory,
WorkspaceID: workspaceID,
Expand Down
15 changes: 15 additions & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Metadata struct {
SourceID string `json:"sourceId"`
SourceName string `json:"sourceName"`
OriginalSourceID string `json:"originalSourceId"`
WorkspaceID string `json:"workspaceId"`
Namespace string `json:"namespace"`
InstanceID string `json:"instanceId"`
Expand Down Expand Up @@ -248,6 +249,15 @@ func (trans *handle) transform(
if len(clientEvents) == 0 {
return Response{}
}
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterwards
for _, clientEvent := range clientEvents {
if clientEvent.Metadata.OriginalSourceID != "" {
originalSourceID := clientEvent.Metadata.OriginalSourceID
clientEvent.Metadata.OriginalSourceID = clientEvent.Metadata.SourceID
clientEvent.Metadata.SourceID = originalSourceID
}
}
sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
Expand Down Expand Up @@ -306,6 +316,11 @@ func (trans *handle) transform(
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.Metadata.OriginalSourceID != "" {
originalSourceID := transformerResponse.Metadata.SourceID
transformerResponse.Metadata.SourceID = transformerResponse.Metadata.OriginalSourceID
transformerResponse.Metadata.OriginalSourceID = originalSourceID
}
switch transformerResponse.StatusCode {
case http.StatusOK:
outClientEvents = append(outClientEvents, transformerResponse)
Expand Down

0 comments on commit 009a071

Please sign in to comment.