Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: isolate server ut communication #5430

Merged
merged 33 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8bf15a2
feat: isolate server transformer communication
cisse21 Jan 13, 2025
f16e30e
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Jan 13, 2025
6f610cb
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Jan 14, 2025
7ed43e3
chore: introduce http client
cisse21 Jan 14, 2025
abd8358
chore: refactor types
cisse21 Jan 14, 2025
180ab01
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Jan 15, 2025
e587e51
chore: merge master
cisse21 Jan 15, 2025
eea1385
chore: populate ut module
cisse21 Jan 15, 2025
aba2651
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Jan 20, 2025
fdebb34
chore: add module dest transform
cisse21 Jan 20, 2025
da37413
chore: add module tp validator
cisse21 Jan 20, 2025
4bf68a2
chore: remove swap original src id and src id
cisse21 Jan 20, 2025
24d2907
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Jan 22, 2025
f01fe43
chore: review comments
cisse21 Jan 22, 2025
fc88564
chore: fix conflicts
cisse21 Feb 11, 2025
c3e5dc6
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Feb 11, 2025
397f5cc
chore: fix tests
cisse21 Feb 11, 2025
83288fd
chore: run tests with v2 enabled (#5498)
cisse21 Feb 11, 2025
1ff1da4
chore: use global http client
cisse21 Feb 12, 2025
526c759
chore: add mocks client
cisse21 Feb 12, 2025
d7ea277
chore: add ut tests
cisse21 Feb 12, 2025
06cbe73
chore: add dt tests
cisse21 Feb 12, 2025
ee6277f
chore: add tp validator tests
cisse21 Feb 12, 2025
e6e2d28
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Feb 13, 2025
f2a3af9
chore: review comments
cisse21 Feb 13, 2025
d2ccfba
chore: fix conflicts
cisse21 Feb 18, 2025
9d97dc6
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Feb 18, 2025
8a2bde0
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Feb 18, 2025
447dea3
chore: conflicts lint fixes
cisse21 Feb 18, 2025
08116a9
chore: review comments
cisse21 Feb 18, 2025
e502cf6
chore: add to integration test
cisse21 Feb 18, 2025
f839376
chore: review comments
cisse21 Feb 19, 2025
3d92dad
Merge branch 'master' into feat.isolateServerUTCommunication
cisse21 Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event_sampler

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (es *BadgerEventSampler) Get(key string) (bool, error) {
return nil
})

if err == badger.ErrKeyNotFound {
if errors.Is(err, badger.ErrKeyNotFound) {
return false, nil
} else if err != nil {
return false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestReportingDroppedEvents(t *testing.T) {
transformertest.ViolationErrorTransformerHandler(
http.StatusBadRequest,
"tracking plan validation failed",
[]transformer.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
[]types.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
),
).
Build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestReportingErrorIndex(t *testing.T) {
transformertest.ViolationErrorTransformerHandler(
http.StatusBadRequest,
"tracking plan validation failed",
[]transformer.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
[]types.ValidationError{{Type: "Datatype-Mismatch", Message: "must be number"}},
),
).
Build()
Expand Down
9 changes: 5 additions & 4 deletions integration_test/tracing/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/processor/types"

_ "github.com/marcboeker/go-duckdb"
"github.com/ory/dockertest/v3"
"github.com/samber/lo"
Expand All @@ -33,7 +35,6 @@ import (
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jsonrs"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/health"
Expand Down Expand Up @@ -359,15 +360,15 @@ func TestTracing(t *testing.T) {
defer bcServer.Close()

trServer := transformertest.NewBuilder().
WithUserTransformHandler(func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) {
WithUserTransformHandler(func(request []types.TransformerEvent) (response []types.TransformerResponse) {
for i := range request {
req := request[i]
response = append(response, transformer.TransformerResponse{
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
response = append(response, transformer.TransformerResponse{
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
Expand Down
258 changes: 131 additions & 127 deletions integration_test/transformer_contract/transformer_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,159 +28,162 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jsonrs"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/transformertest"
)

func TestTransformerContract(t *testing.T) {
t.Run("User Transformer", func(t *testing.T) {
config.Reset()
defer config.Reset()
transformationV2 := []bool{true, false}
for _, v := range transformationV2 {
t.Run("User Transformer", func(t *testing.T) {
config.Reset()
defer config.Reset()

workspaceConfig := backendconfig.ConfigT{
WorkspaceID: "workspace-1",
Sources: []backendconfig.SourceT{
{
ID: "source-1",
Name: "source-name-1",
SourceDefinition: backendconfig.SourceDefinitionT{
ID: "source-def-1",
Name: "source-def-name-1",
Category: "source-def-category-1",
Type: "source-def-type-1",
},
WriteKey: "writekey-1",
WorkspaceID: "workspace-1",
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: "destination-1",
Name: "destination-name-1",
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "destination-def-1",
Name: "destination-def-name-1",
DisplayName: "destination-def-display-name-1",
},
Enabled: true,
WorkspaceID: "workspace-1",
Transformations: []backendconfig.TransformationT{
{
ID: "transformation-1",
VersionID: "version-1",
workspaceConfig := backendconfig.ConfigT{
WorkspaceID: "workspace-1",
Sources: []backendconfig.SourceT{
{
ID: "source-1",
Name: "source-name-1",
SourceDefinition: backendconfig.SourceDefinitionT{
ID: "source-def-1",
Name: "source-def-name-1",
Category: "source-def-category-1",
Type: "source-def-type-1",
},
WriteKey: "writekey-1",
WorkspaceID: "workspace-1",
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: "destination-1",
Name: "destination-name-1",
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "destination-def-1",
Name: "destination-def-name-1",
DisplayName: "destination-def-display-name-1",
},
Enabled: true,
WorkspaceID: "workspace-1",
Transformations: []backendconfig.TransformationT{
{
ID: "transformation-1",
VersionID: "version-1",
},
},
IsProcessorEnabled: true,
RevisionID: "revision-1",
},
IsProcessorEnabled: true,
RevisionID: "revision-1",
},
},
DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{
SourceId: "source-1",
SourceConfigVersion: 1,
Deleted: false,
TrackingPlan: backendconfig.TrackingPlanT{
Id: "tracking-plan-1",
Version: 1,
DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{
SourceId: "source-1",
SourceConfigVersion: 1,
Deleted: false,
TrackingPlan: backendconfig.TrackingPlanT{
Id: "tracking-plan-1",
Version: 1,
},
},
},
},
},
Credentials: map[string]backendconfig.Credential{
"credential-1": {
Key: "key-1",
Value: "value-1",
IsSecret: false,
Credentials: map[string]backendconfig.Credential{
"credential-1": {
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
},
},
}

bcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/workspaceConfig":
response, _ := jsonrs.Marshal(workspaceConfig)
_, _ = w.Write(response)
default:
w.WriteHeader(http.StatusNotFound)
}
}))

trServer := transformertest.NewBuilder().
WithUserTransformHandler(
func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) {
for i := range request {
req := request[i]
bcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/workspaceConfig":
response, _ := jsonrs.Marshal(workspaceConfig)
_, _ = w.Write(response)
default:
w.WriteHeader(http.StatusNotFound)
}
}))

require.Equal(t, req.Metadata.SourceID, "source-1")
require.Equal(t, req.Metadata.SourceName, "source-name-1")
require.Equal(t, req.Metadata.SourceType, "source-def-name-1")
require.Equal(t, req.Metadata.SourceCategory, "source-def-category-1")
require.Equal(t, req.Metadata.SourceDefinitionID, "source-def-1")
require.Equal(t, req.Metadata.WorkspaceID, "workspace-1")
require.Equal(t, req.Metadata.DestinationID, "destination-1")
require.Equal(t, req.Metadata.DestinationType, "destination-def-name-1")
require.Equal(t, req.Metadata.DestinationName, "destination-name-1")
require.Equal(t, req.Metadata.TransformationID, "transformation-1")
require.Equal(t, req.Metadata.TransformationVersionID, "version-1")
require.Equal(t, req.Metadata.EventType, "identify")
require.Equal(t, req.Credentials, []transformer.Credential{
{
ID: "credential-1",
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
})
response = append(response, transformer.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
}
return
},
).
Build()
defer trServer.Close()
trServer := transformertest.NewBuilder().
WithUserTransformHandler(
func(request []types.TransformerEvent) (response []types.TransformerResponse) {
for i := range request {
req := request[i]

pool, err := dockertest.NewPool("")
require.NoError(t, err)
require.Equal(t, req.Metadata.SourceID, "source-1")
require.Equal(t, req.Metadata.SourceName, "source-name-1")
require.Equal(t, req.Metadata.SourceType, "source-def-name-1")
require.Equal(t, req.Metadata.SourceCategory, "source-def-category-1")
require.Equal(t, req.Metadata.SourceDefinitionID, "source-def-1")
require.Equal(t, req.Metadata.WorkspaceID, "workspace-1")
require.Equal(t, req.Metadata.DestinationID, "destination-1")
require.Equal(t, req.Metadata.DestinationType, "destination-def-name-1")
require.Equal(t, req.Metadata.DestinationName, "destination-name-1")
require.Equal(t, req.Metadata.TransformationID, "transformation-1")
require.Equal(t, req.Metadata.TransformationVersionID, "version-1")
require.Equal(t, req.Metadata.EventType, "identify")
require.Equal(t, req.Credentials, []types.Credential{
{
ID: "credential-1",
Key: "key-1",
Value: "value-1",
IsSecret: false,
},
})
response = append(response, types.TransformerResponse{
Metadata: req.Metadata,
Output: req.Message,
StatusCode: http.StatusOK,
})
}
return
},
).
Build()
defer trServer.Close()

postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)
pool, err := dockertest.NewPool("")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})
gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir(), v)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})

url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())
url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())

eventsCount := 12
eventsCount := 12

err = sendEvents(eventsCount, "identify", "writekey-1", url)
require.NoError(t, err)
err = sendEvents(eventsCount, "identify", "writekey-1", url)
require.NoError(t, err)

requireJobsCount(t, ctx, postgresContainer.DB, "gw", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "rt", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "gw", jobsdb.Succeeded.State, eventsCount)
requireJobsCount(t, ctx, postgresContainer.DB, "rt", jobsdb.Succeeded.State, eventsCount)

cancel()
require.NoError(t, wg.Wait())
})
// TODO: Add tests for dest transformer and tracking plan validation
t.Run("Dest Transformer", func(t *testing.T) {})
t.Run("Tracking Plan Validation", func(t *testing.T) {})
cancel()
require.NoError(t, wg.Wait())
})
// TODO: Add tests for dest transformer and tracking plan validation
t.Run("Dest Transformer", func(t *testing.T) {})
t.Run("Tracking Plan Validation", func(t *testing.T) {})
}
}

func runRudderServer(
Expand All @@ -190,6 +193,7 @@ func runRudderServer(
postgresContainer *postgres.Resource,
cbURL, transformerURL,
tmpDir string,
transformationV2 bool,
) (err error) {
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("WORKSPACE_TOKEN", "token")
Expand Down Expand Up @@ -217,7 +221,7 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")

t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.enableTransformationV2"), strconv.FormatBool(transformationV2))
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panicked: %v", r)
Expand Down
Loading
Loading