Skip to content

Commit

Permalink
feat: isolate server ut communication (#5430)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Feb 19, 2025
1 parent 9f8d6ad commit 63505f6
Show file tree
Hide file tree
Showing 59 changed files with 4,809 additions and 1,266 deletions.
3 changes: 2 additions & 1 deletion enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event_sampler

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (es *BadgerEventSampler) Get(key string) (bool, error) {
return nil
})

if err == badger.ErrKeyNotFound {
if errors.Is(err, badger.ErrKeyNotFound) {
return false, nil
} else if err != nil {
return false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestReportingDroppedEvents(t *testing.T) {
transformertest.ViolationErrorTransformerHandler(
http.StatusBadRequest,
"tracking plan validation failed",
[]transformer.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
[]types.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
),
).
Build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestReportingErrorIndex(t *testing.T) {
transformertest.ViolationErrorTransformerHandler(
http.StatusBadRequest,
"tracking plan validation failed",
[]transformer.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
[]types.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
),
).
Build()
Expand Down
9 changes: 5 additions & 4 deletions integration_test/tracing/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/processor/types"

_ "github.com/marcboeker/go-duckdb"
"github.com/ory/dockertest/v3"
"github.com/samber/lo"
Expand All @@ -33,7 +35,6 @@ import (
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jsonrs"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -359,15 +360,15 @@ func TestTracing(t *testing.T) {
defer bcServer.Close()

trServer := transformertest.NewBuilder().
WithUserTransformHandler(func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) {
WithUserTransformHandler(func(request []types.TransformerEvent) (response []types.TransformerResponse) {
for i := range request {
req := request[i]
response = append(response, transformer.TransformerResponse{
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
response = append(response, transformer.TransformerResponse{
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
Expand Down
258 changes: 131 additions & 127 deletions integration_test/transformer_contract/transformer_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,159 +28,162 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jsonrs"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/transformertest"
)

func TestTransformerContract(t *testing.T) {
t.Run("User Transformer", func(t *testing.T) {
config.Reset()
defer config.Reset()
transformationV2 := []bool{true, false}
for _, v := range transformationV2 {
t.Run("User Transformer", func(t *testing.T) {
config.Reset()
defer config.Reset()

workspaceConfig := backendconfig.ConfigT{
WorkspaceID: "workspace-1",
Sources: []backendconfig.SourceT{
{
ID: "source-1",
Name: "source-name-1",
SourceDefinition: backendconfig.SourceDefinitionT{
ID: "source-def-1",
Name: "source-def-name-1",
Category: "source-def-category-1",
Type: "source-def-type-1",
},
WriteKey: "writekey-1",
WorkspaceID: "workspace-1",
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: "destination-1",
Name: "destination-name-1",
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "destination-def-1",
Name: "destination-def-name-1",
DisplayName: "destination-def-display-name-1",
},
Enabled: true,
WorkspaceID: "workspace-1",
Transformations: []backendconfig.TransformationT{
{
ID: "transformation-1",
VersionID: "version-1",
workspaceConfig := backendconfig.ConfigT{
WorkspaceID: "workspace-1",
Sources: []backendconfig.SourceT{
{
ID: "source-1",
Name: "source-name-1",
SourceDefinition: backendconfig.SourceDefinitionT{
ID: "source-def-1",
Name: "source-def-name-1",
Category: "source-def-category-1",
Type: "source-def-type-1",
},
WriteKey: "writekey-1",
WorkspaceID: "workspace-1",
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: "destination-1",
Name: "destination-name-1",
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "destination-def-1",
Name: "destination-def-name-1",
DisplayName: "destination-def-display-name-1",
},
Enabled: true,
WorkspaceID: "workspace-1",
Transformations: []backendconfig.TransformationT{
{
ID: "transformation-1",
VersionID: "version-1",
},
},
IsProcessorEnabled: true,
RevisionID: "revision-1",
},
IsProcessorEnabled: true,
RevisionID: "revision-1",
},
},
DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{
SourceId: "source-1",
SourceConfigVersion: 1,
Deleted: false,
TrackingPlan: backendconfig.TrackingPlanT{
Id: "tracking-plan-1",
Version: 1,
DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{
SourceId: "source-1",
SourceConfigVersion: 1,
Deleted: false,
TrackingPlan: backendconfig.TrackingPlanT{
Id: "tracking-plan-1",
Version: 1,
},
},
},
},
},
Credentials: map[string]backendconfig.Credential{
"credential-1": {
Key: "key-1",
Value: "value-1",
IsSecret: false,
Credentials: map[string]backendconfig.Credential{
"credential-1": {
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
},
},
}

bcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/workspaceConfig":
response, _ := jsonrs.Marshal(workspaceConfig)
_, _ = w.Write(response)
default:
w.WriteHeader(http.StatusNotFound)
}
}))

trServer := transformertest.NewBuilder().
WithUserTransformHandler(
func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) {
for i := range request {
req := request[i]
bcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/workspaceConfig":
response, _ := jsonrs.Marshal(workspaceConfig)
_, _ = w.Write(response)
default:
w.WriteHeader(http.StatusNotFound)
}
}))

require.Equal(t, req.Metadata.SourceID, "source-1")
require.Equal(t, req.Metadata.SourceName, "source-name-1")
require.Equal(t, req.Metadata.SourceType, "source-def-name-1")
require.Equal(t, req.Metadata.SourceCategory, "source-def-category-1")
require.Equal(t, req.Metadata.SourceDefinitionID, "source-def-1")
require.Equal(t, req.Metadata.WorkspaceID, "workspace-1")
require.Equal(t, req.Metadata.DestinationID, "destination-1")
require.Equal(t, req.Metadata.DestinationType, "destination-def-name-1")
require.Equal(t, req.Metadata.DestinationName, "destination-name-1")
require.Equal(t, req.Metadata.TransformationID, "transformation-1")
require.Equal(t, req.Metadata.TransformationVersionID, "version-1")
require.Equal(t, req.Metadata.EventType, "identify")
require.Equal(t, req.Credentials, []transformer.Credential{
{
ID: "credential-1",
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
})
response = append(response, transformer.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
}
return
},
).
Build()
defer trServer.Close()
trServer := transformertest.NewBuilder().
WithUserTransformHandler(
func(request []types.TransformerEvent) (response []types.TransformerResponse) {
for i := range request {
req := request[i]

pool, err := dockertest.NewPool("")
require.NoError(t, err)
require.Equal(t, req.Metadata.SourceID, "source-1")
require.Equal(t, req.Metadata.SourceName, "source-name-1")
require.Equal(t, req.Metadata.SourceType, "source-def-name-1")
require.Equal(t, req.Metadata.SourceCategory, "source-def-category-1")
require.Equal(t, req.Metadata.SourceDefinitionID, "source-def-1")
require.Equal(t, req.Metadata.WorkspaceID, "workspace-1")
require.Equal(t, req.Metadata.DestinationID, "destination-1")
require.Equal(t, req.Metadata.DestinationType, "destination-def-name-1")
require.Equal(t, req.Metadata.DestinationName, "destination-name-1")
require.Equal(t, req.Metadata.TransformationID, "transformation-1")
require.Equal(t, req.Metadata.TransformationVersionID, "version-1")
require.Equal(t, req.Metadata.EventType, "identify")
require.Equal(t, req.Credentials, []types.Credential{
{
ID: "credential-1",
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
})
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
}
return
},
).
Build()
defer trServer.Close()

postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)
pool, err := dockertest.NewPool("")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})
gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir(), v)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})

url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())
url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())

eventsCount := 12
eventsCount := 12

err = sendEvents(eventsCount, "identify", "writekey-1", url)
require.NoError(t, err)
err = sendEvents(eventsCount, "identify", "writekey-1", url)
require.NoError(t, err)

requireJobsCount(t, ctx, postgresContainer.DB, "gw", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "rt", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "gw", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "rt", jobsdb.Succeeded.State, eventsCount)

cancel()
require.NoError(t, wg.Wait())
})
// TODO: Add tests for dest transformer and tracking plan validation
t.Run("Dest Transformer", func(t *testing.T) {})
t.Run("Tracking Plan Validation", func(t *testing.T) {})
cancel()
require.NoError(t, wg.Wait())
})
// TODO: Add tests for dest transformer and tracking plan validation
t.Run("Dest Transformer", func(t *testing.T) {})
t.Run("Tracking Plan Validation", func(t *testing.T) {})
}
}

func runRudderServer(
Expand All @@ -190,6 +193,7 @@ func runRudderServer(
postgresContainer *postgres.Resource,
cbURL, transformerURL,
tmpDir string,
transformationV2 bool,
) (err error) {
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("WORKSPACE_TOKEN", "token")
Expand Down Expand Up @@ -217,7 +221,7 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")

t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.enableTransformationV2"), strconv.FormatBool(transformationV2))
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panicked: %v", r)
Expand Down
Loading

0 comments on commit 63505f6

Please sign in to comment.