Skip to content

Commit

Permalink
Merge branch 'master' into feat.isolateServerUTCommunication
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Feb 18, 2025
2 parents d2ccfba + 15faf6b commit 9d97dc6
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 175 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.18.2
github.com/golang/mock v1.6.0
github.com/gomodule/redigo v1.9.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0
github.com/hashicorp/go-retryablehttp v0.7.7
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.12.23+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/google/s2a-go v0.1.8 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/cenkalti/backoff"
"github.com/samber/lo"

Expand Down Expand Up @@ -81,11 +83,17 @@ func (d *Client) Transform(ctx context.Context, clientEvents []types.Transformer
return types.Response{}
}

sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
"src_id": clientEvents[0].Metadata.SourceID,
"stage": "dest_transformer",
destinationType := clientEvents[0].Destination.DestinationDefinition.Name
destURL := d.destTransformURL(destinationType)

labels := types.TransformerMetricLabels{
Endpoint: transformerutils.GetEndpointFromURL(destURL),
Stage: "dest_transformer",
DestinationType: destinationType,
DestinationID: clientEvents[0].Destination.ID,
SourceID: clientEvents[0].Metadata.SourceID,
WorkspaceID: clientEvents[0].Metadata.WorkspaceID,
SourceType: clientEvents[0].Metadata.SourceType,
}

var trackWg sync.WaitGroup
Expand All @@ -95,11 +103,8 @@ func (d *Client) Transform(ctx context.Context, clientEvents []types.Transformer

trackWg.Add(1)
go func() {
var loggerCtx []interface{}
for k, v := range sTags {
loggerCtx = append(loggerCtx, k, v)
}
transformerutils.TrackLongRunningTransformation(ctx, "dest_transformer", d.config.timeoutDuration, d.log.With(loggerCtx...))
l := d.log.Withn(labels.ToLoggerFields()...)
transformerutils.TrackLongRunningTransformation(ctx, labels.Stage, d.config.timeoutDuration, l)
trackWg.Done()
}()

Expand All @@ -108,7 +113,7 @@ func (d *Client) Transform(ctx context.Context, clientEvents []types.Transformer
d.stat.NewTaggedStat(
"processor.transformer_request_batch_count",
stats.HistogramType,
sTags,
labels.ToStatsTag(),
).Observe(float64(len(batches)))

transformResponse := make([][]types.TransformerResponse, len(batches))
Expand All @@ -122,7 +127,7 @@ func (d *Client) Transform(ctx context.Context, clientEvents []types.Transformer
func(batch []types.TransformerEvent, i int) {
d.guardConcurrency <- struct{}{}
go func() {
transformResponse[i], err = d.sendBatch(ctx, d.destTransformURL(batch[0].Destination.DestinationDefinition.Name), "dest_transformer", batch)
transformResponse[i], err = d.sendBatch(ctx, destURL, labels, batch)
if err != nil {
foundError = true
}
Expand Down Expand Up @@ -161,7 +166,8 @@ func (d *Client) Transform(ctx context.Context, clientEvents []types.Transformer
}
}

func (d *Client) sendBatch(ctx context.Context, url, stage string, data []types.TransformerEvent) ([]types.TransformerResponse, error) {
func (d *Client) sendBatch(ctx context.Context, url string, labels types.TransformerMetricLabels, data []types.TransformerEvent) ([]types.TransformerResponse, error) {
d.stat.NewTaggedStat("transformer_client_request_total_events", stats.CountType, labels.ToStatsTag()).Count(len(data))
// Call remote transformation
var (
rawJSON []byte
Expand All @@ -182,12 +188,7 @@ func (d *Client) sendBatch(ctx context.Context, url, stage string, data []types.
statusCode int
)

respData, statusCode, err = d.doPost(ctx, rawJSON, url, stats.Tags{
"destinationType": data[0].Destination.DestinationDefinition.Name,
"destinationId": data[0].Destination.ID,
"sourceId": data[0].Metadata.SourceID,
"stage": stage,
})
respData, statusCode, err = d.doPost(ctx, rawJSON, url, labels)
if err != nil {
return nil, err
}
Expand All @@ -198,7 +199,7 @@ func (d *Client) sendBatch(ctx context.Context, url, stage string, data []types.
http.StatusNotFound,
http.StatusRequestEntityTooLarge:
default:
d.log.Errorf("Transformer returned status code: %v", statusCode)
d.log.Errorn("Transformer returned status code", logger.NewStringField("statusCode", strconv.Itoa(statusCode)))
}

var transformerResponses []types.TransformerResponse
Expand All @@ -212,17 +213,19 @@ func (d *Client) sendBatch(ctx context.Context, url, stage string, data []types.
if err != nil {
return nil, err
}

Check warning on line 215 in processor/internal/transformer/destination_transformer/destination_transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/transformer/destination_transformer/destination_transformer.go#L214-L215

Added lines #L214 - L215 were not covered by tests
d.stat.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels.ToStatsTag()).Count(len(transformerResponses))
default:
for i := range data {
transformEvent := &data[i]
resp := types.TransformerResponse{StatusCode: statusCode, Error: string(respData), Metadata: transformEvent.Metadata}
transformerResponses = append(transformerResponses, resp)
}
d.stat.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels.ToStatsTag()).Count(len(data))
}
return transformerResponses, nil
}

func (d *Client) doPost(ctx context.Context, rawJSON []byte, url string, tags stats.Tags) ([]byte, int, error) {
func (d *Client) doPost(ctx context.Context, rawJSON []byte, url string, labels types.TransformerMetricLabels) ([]byte, int, error) {
var (
retryCount int
resp *http.Response
Expand Down Expand Up @@ -250,7 +253,14 @@ func (d *Client) doPost(ctx context.Context, rawJSON []byte, url string, tags st

resp, reqErr = d.client.Do(req)
defer func() { httputil.CloseResponse(resp) }()
d.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
// Record metrics with labels
tags := labels.ToStatsTag()
duration := time.Since(requestStartTime)
d.stat.NewTaggedStat("transformer_client_request_total_bytes", stats.CountType, tags).Count(len(rawJSON))

d.stat.NewTaggedStat("transformer_client_total_durations_seconds", stats.CountType, tags).Count(int(duration.Seconds()))
d.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, labels.ToStatsTag()).SendTiming(duration)

if reqErr != nil {
return reqErr
}
Expand All @@ -260,6 +270,10 @@ func (d *Client) doPost(ctx context.Context, rawJSON []byte, url string, tags st
}

respData, reqErr = io.ReadAll(resp.Body)
if reqErr == nil {
d.stat.NewTaggedStat("transformer_client_response_total_bytes", stats.CountType, tags).Count(len(respData))
// We'll count response events after unmarshaling in the request method
}
return reqErr
},
backoff.WithMaxRetries(retryStrategy, uint64(d.config.maxRetry.Load())),
Expand All @@ -285,7 +299,7 @@ func (d *Client) doPost(ctx context.Context, rawJSON []byte, url string, tags st
transformerAPIVersion, _ := strconv.Atoi(resp.Header.Get("apiVersion"))
if reportingtypes.SupportedTransformerApiVersion != transformerAPIVersion {
unexpectedVersionError := fmt.Errorf("incompatible transformer version: Expected: %d Received: %s, URL: %v", reportingtypes.SupportedTransformerApiVersion, resp.Header.Get("apiVersion"), url)
d.log.Error(unexpectedVersionError)
d.log.Errorn("Unexpected version", obskit.Error(unexpectedVersionError))
return nil, 0, unexpectedVersionError
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ func TestDestinationTransformer(t *testing.T) {
}
}

labels := types.TransformerMetricLabels{
Endpoint: transformerutils.GetEndpointFromURL(srv.URL),
Stage: "dest_transformer",
SourceID: Metadata.SourceID,
SourceType: Metadata.SourceType,
DestinationType: destinationConfig.DestinationDefinition.Name,
DestinationID: destinationConfig.ID,
WorkspaceID: Metadata.WorkspaceID,
}
rsp := tr.Transform(context.TODO(), events)
require.Equal(t, expectedResponse, rsp)

Expand All @@ -254,11 +263,36 @@ func TestDestinationTransformer(t *testing.T) {
require.NotEmpty(t, metrics)
for _, m := range metrics {
require.Equal(t, stats.Tags{
"stage": "dest_transformer",
"sourceId": Metadata.SourceID,
"destinationType": destinationConfig.DestinationDefinition.Name,
"destinationId": destinationConfig.ID,
"endpoint": transformerutils.GetEndpointFromURL(srv.URL),
"stage": "dest_transformer",
"sourceId": Metadata.SourceID,
"sourceType": Metadata.SourceType,
"destinationType": destinationConfig.DestinationDefinition.Name,
"destinationId": destinationConfig.ID,
"workspaceId": Metadata.WorkspaceID,
"language": "",
"transformationId": "",

// Legacy tags: to be removed
"dest_type": destinationConfig.DestinationDefinition.Name,
"dest_id": destinationConfig.ID,
"src_id": Metadata.SourceID,
}, m.Tags)

metricsToCheck := []string{
"transformer_client_request_total_bytes",
"transformer_client_response_total_bytes",
"transformer_client_request_total_events",
"transformer_client_response_total_events",
"transformer_client_total_durations_seconds",
}

expectedTags := labels.ToStatsTag()
for _, metricName := range metricsToCheck {
measurements := statsStore.GetByName(metricName)
require.NotEmpty(t, measurements, "metric %s should not be empty", metricName)
require.Equal(t, expectedTags, measurements[0].Tags, "metric %s tags mismatch", metricName)
}
}
}
}
Expand Down
Loading

0 comments on commit 9d97dc6

Please sign in to comment.