Skip to content

Commit

Permalink
chore: some more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 1, 2024
1 parent a1fe0e0 commit 958a6b2
Show file tree
Hide file tree
Showing 38 changed files with 2,038 additions and 1,733 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
go-version-file: 'go.mod'
- run: go version
- run: go mod download # Not required, used to segregate module download vs test times
- run: make test exclude="/rudder-server/(jobsdb|integration_test|processor|regulation-worker|router|services|suppression-backup-service|warehouse)"
- run: FORCE_RUN_INTEGRATION_TESTS=true make test exclude="/rudder-server/(jobsdb|integration_test|processor|regulation-worker|router|services|suppression-backup-service|warehouse)"
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
Expand Down
2,225 changes: 1,093 additions & 1,132 deletions integration_test/snowpipestreaming/snowpipestreaming_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
rudder-snowpipe-clients:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/rudder-snowpipe-clients:develop"
image: "rudderstack/rudder-snowpipe-clients:develop"
ports:
- "9078"
healthcheck:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
transformer:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/develop-rudder-transformer:latest"
image: "rudderstack/develop-rudder-transformer:fix.snowpipe-streaming-users"
ports:
- "9090:9090"
healthcheck:
Expand Down
3 changes: 3 additions & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ func (trans *handle) destTransformURL(destType string) string {
return destinationEndPoint + "?" + whSchemaVersionQueryParam
}
}
if destType == warehouseutils.SnowpipeStreaming {
return destinationEndPoint + "?" + fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
}
return destinationEndPoint
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,111 @@ package snowpipestreaming

import (
"context"
"strconv"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
)

type apiAdapter struct {
stats struct {
createChannelCount stats.Counter
deleteChannelCount stats.Counter
insertCount stats.Counter
statusCount stats.Counter
createChannelResponseTime stats.Timer
deleteChannelResponseTime stats.Timer
insertResponseTime stats.Timer
statusResponseTime stats.Timer
func newApiAdapter(
logger logger.Logger,
statsFactory stats.Stats,
api api,
destination *backendconfig.DestinationT,
) api {
return &apiAdapter{
logger: logger,
statsFactory: statsFactory,
destination: destination,
api: api,
}

api
}

func newApiAdapter(api api, statsFactory stats.Stats, destination *backendconfig.DestinationT) *apiAdapter {
adapter := &apiAdapter{}
adapter.api = api

tags := stats.Tags{
func (a *apiAdapter) defaultTags() stats.Tags {
return stats.Tags{
"module": "batch_router",
"workspaceId": destination.WorkspaceID,
"destType": destination.DestinationDefinition.Name,
"destinationId": destination.ID,
"workspaceId": a.destination.WorkspaceID,
"destType": a.destination.DestinationDefinition.Name,
"destinationId": a.destination.ID,
}
adapter.stats.createChannelCount = statsFactory.NewTaggedStat("snowpipestreaming_create_channel_count", stats.CountType, tags)
adapter.stats.deleteChannelCount = statsFactory.NewTaggedStat("snowpipestreaming_delete_channel_count", stats.CountType, tags)
adapter.stats.insertCount = statsFactory.NewTaggedStat("snowpipestreaming_insert_count", stats.CountType, tags)
adapter.stats.statusCount = statsFactory.NewTaggedStat("snowpipestreaming_status_count", stats.CountType, tags)
adapter.stats.createChannelResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_create_channel_response_time", stats.TimerType, tags)
adapter.stats.deleteChannelResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_delete_channel_response_time", stats.TimerType, tags)
adapter.stats.insertResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_insert_response_time", stats.TimerType, tags)
adapter.stats.statusResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_status_response_time", stats.TimerType, tags)

return adapter
}

func (a *apiAdapter) CreateChannel(ctx context.Context, req *model.CreateChannelRequest) (*model.ChannelResponse, error) {
defer a.stats.createChannelCount.Increment()
defer a.stats.createChannelResponseTime.RecordDuration()()
return a.api.CreateChannel(ctx, req)
a.logger.Infon("Creating channel",
logger.NewStringField("rudderIdentifier", req.RudderIdentifier),
logger.NewStringField("partition", req.Partition),
logger.NewStringField("database", req.TableConfig.Database),
logger.NewStringField("namespace", req.TableConfig.Schema),
logger.NewStringField("table", req.TableConfig.Table),
)
tags := a.defaultTags()
tags["api"] = "create_channel"

responseTimeStat := a.statsFactory.NewTaggedStat("snowpipe_streaming_api_response_time", stats.TimerType, tags)
defer responseTimeStat.RecordDuration()()

resp, err := a.api.CreateChannel(ctx, req)
if err != nil {
tags["status"] = "false"
return nil, err
}
tags["status"] = strconv.FormatBool(resp.Success)
tags["code"] = resp.Code
return resp, nil
}

func (a *apiAdapter) DeleteChannel(ctx context.Context, channelID string, sync bool) error {
defer a.stats.deleteChannelCount.Increment()
defer a.stats.deleteChannelResponseTime.RecordDuration()()
return a.api.DeleteChannel(ctx, channelID, sync)
a.logger.Infon("Deleting channel",
logger.NewStringField("channelId", channelID),
logger.NewBoolField("sync", sync),
)
tags := a.defaultTags()
tags["api"] = "delete_channel"

responseTimeStat := a.statsFactory.NewTaggedStat("snowpipe_streaming_api_response_time", stats.TimerType, tags)
defer responseTimeStat.RecordDuration()()

err := a.api.DeleteChannel(ctx, channelID, sync)
if err != nil {
tags["status"] = "false"
return err
}
tags["status"] = "true"
return nil
}

func (a *apiAdapter) Insert(ctx context.Context, channelID string, insertRequest *model.InsertRequest) (*model.InsertResponse, error) {
defer a.stats.insertCount.Increment()
defer a.stats.insertResponseTime.RecordDuration()()
return a.api.Insert(ctx, channelID, insertRequest)
tags := a.defaultTags()
tags["api"] = "insert"

responseTimeStat := a.statsFactory.NewTaggedStat("snowpipe_streaming_api_response_time", stats.TimerType, tags)
defer responseTimeStat.RecordDuration()()

resp, err := a.api.Insert(ctx, channelID, insertRequest)
if err != nil {
tags["status"] = "false"
return nil, err
}
tags["status"] = strconv.FormatBool(resp.Success)
tags["code"] = resp.Code
return resp, nil
}

func (a *apiAdapter) Status(ctx context.Context, channelID string) (*model.StatusResponse, error) {
defer a.stats.statusCount.Increment()
defer a.stats.statusResponseTime.RecordDuration()()
return a.api.Status(ctx, channelID)
tags := a.defaultTags()
tags["api"] = "status"

responseTimeStat := a.statsFactory.NewTaggedStat("snowpipe_streaming_api_response_time", stats.TimerType, tags)
defer responseTimeStat.RecordDuration()()

resp, err := a.api.Status(ctx, channelID)
if err != nil {
tags["status"] = "false"
return nil, err
}
tags["status"] = strconv.FormatBool(resp.Success)
return resp, nil
}
Loading

0 comments on commit 958a6b2

Please sign in to comment.