Skip to content

Commit

Permalink
chore: remove swap original src id and src id
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 20, 2025
1 parent da37413 commit 4bf68a2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ func (d *DestTransformer) transform(
if len(clientEvents) == 0 {
return types.Response{}
}
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterward
for i := range clientEvents {
if clientEvents[i].Metadata.OriginalSourceID != "" {
clientEvents[i].Metadata.OriginalSourceID, clientEvents[i].Metadata.SourceID = clientEvents[i].Metadata.SourceID, clientEvents[i].Metadata.OriginalSourceID
}
}
sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
Expand Down Expand Up @@ -139,9 +132,6 @@ func (d *DestTransformer) transform(
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.Metadata.OriginalSourceID != "" {
transformerResponse.Metadata.SourceID, transformerResponse.Metadata.OriginalSourceID = transformerResponse.Metadata.OriginalSourceID, transformerResponse.Metadata.SourceID
}
switch transformerResponse.StatusCode {
case http.StatusOK:
outClientEvents = append(outClientEvents, transformerResponse)
Expand Down Expand Up @@ -295,11 +285,6 @@ func (d *DestTransformer) doPost(ctx context.Context, rawJSON []byte, url string
if reqErr != nil {
return reqErr
}

if !transformer_utils.IsJobTerminated(resp.StatusCode) && resp.StatusCode != transformer_utils.StatusCPDown {
return fmt.Errorf("transformer returned status code: %v", resp.StatusCode)
}

respData, reqErr = io.ReadAll(resp.Body)
return reqErr

Check warning on line 289 in processor/internal/destination_transformer/destination_transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/destination_transformer/destination_transformer.go#L284-L289

Added lines #L284 - L289 were not covered by tests
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@ import (
"github.com/cenkalti/backoff"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/internal/transformer_utils"
"github.com/rudderlabs/rudder-server/utils/httputil"
reportingTypes "github.com/rudderlabs/rudder-server/utils/types"

"github.com/rudderlabs/rudder-server/processor/types"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/internal/http_client"
"github.com/rudderlabs/rudder-server/processor/internal/transformer_utils"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/utils/httputil"
reportingTypes "github.com/rudderlabs/rudder-server/utils/types"
)

type TPValidator struct {
Expand Down Expand Up @@ -74,18 +72,11 @@ func (t *TPValidator) transform(
if len(clientEvents) == 0 {
return types.Response{}
}
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterward
for i := range clientEvents {
if clientEvents[i].Metadata.OriginalSourceID != "" {
clientEvents[i].Metadata.OriginalSourceID, clientEvents[i].Metadata.SourceID = clientEvents[i].Metadata.SourceID, clientEvents[i].Metadata.OriginalSourceID
}
}
sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
"src_id": clientEvents[0].Metadata.SourceID,
"stage": "dest_transformer",
"stage": "trackingPlan_validation",
}

var trackWg sync.WaitGroup
Expand All @@ -99,7 +90,7 @@ func (t *TPValidator) transform(
for k, v := range sTags {
loggerCtx = append(loggerCtx, k, v)
}
transformer_utils.TrackLongRunningTransformation(ctx, "dest_transformer", t.config.timeoutDuration, t.log.With(loggerCtx...))
transformer_utils.TrackLongRunningTransformation(ctx, "trackingPlan_validation", t.config.timeoutDuration, t.log.With(loggerCtx...))
trackWg.Done()

Check warning on line 94 in processor/internal/trackingplan_validation/trackingplan_validation.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/trackingplan_validation/trackingplan_validation.go#L71-L94

Added lines #L71 - L94 were not covered by tests
}()

Expand All @@ -123,7 +114,7 @@ func (t *TPValidator) transform(
t.guardConcurrency <- struct{}{}
go func() {
trace.WithRegion(ctx, "request", func() {
transformResponse[i] = t.request(ctx, url, "dest_transformer", batch)
transformResponse[i] = t.request(ctx, url, "trackingPlan_validation", batch)
})
<-t.guardConcurrency
wg.Done()

Check warning on line 120 in processor/internal/trackingplan_validation/trackingplan_validation.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/trackingplan_validation/trackingplan_validation.go#L97-L120

Added lines #L97 - L120 were not covered by tests
Expand All @@ -139,9 +130,6 @@ func (t *TPValidator) transform(
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.Metadata.OriginalSourceID != "" {
transformerResponse.Metadata.SourceID, transformerResponse.Metadata.OriginalSourceID = transformerResponse.Metadata.OriginalSourceID, transformerResponse.Metadata.SourceID
}
switch transformerResponse.StatusCode {
case http.StatusOK:
outClientEvents = append(outClientEvents, transformerResponse)
Expand Down Expand Up @@ -209,6 +197,11 @@ func (t *TPValidator) request(ctx context.Context, url, stage string, data []typ
"dest_id": data[0].Destination.ID,
"src_id": data[0].Metadata.SourceID,
})
if statusCode == transformer_utils.StatusCPDown {
t.stat.NewStat("processor.control_plane_down", stats.GaugeType).Gauge(1)
return fmt.Errorf("control plane not reachable")
}
t.stat.NewStat("processor.control_plane_down", stats.GaugeType).Gauge(0)
return nil

Check warning on line 205 in processor/internal/trackingplan_validation/trackingplan_validation.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/trackingplan_validation/trackingplan_validation.go#L188-L205

Added lines #L188 - L205 were not covered by tests
},
endlessBackoff,
Expand Down

0 comments on commit 4bf68a2

Please sign in to comment.