Skip to content

Commit

Permalink
feat: snowpipe streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 29, 2024
1 parent caab755 commit a1fe0e0
Show file tree
Hide file tree
Showing 47 changed files with 4,218 additions and 31 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ jobs:
- integration_test/tracing
- integration_test/backendconfigunavailability
- integration_test/trackedusersreporting
- integration_test/snowpipestreaming
- processor
- regulation-worker
- router
Expand Down Expand Up @@ -186,7 +187,8 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }}
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
run: make test exclude="${{ matrix.exclude }}" package=${{ matrix.package }}
SNOWPIPE_STREAMING_KEYPAIR_UNENCRYPTED_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWPIPE_STREAMING_KEYPAIR_UNENCRYPTED_INTEGRATION_TEST_CREDENTIALS }}
run: FORCE_RUN_INTEGRATION_TESTS=true make test exclude="${{ matrix.exclude }}" package=${{ matrix.package }}
- name: Sanitize name for Artifact
run: |
name=$(echo -n "${{ matrix.package }}" | sed -e 's/[ \t:\/\\"<>|*?]/-/g' -e 's/--*/-/g')
Expand Down
3 changes: 2 additions & 1 deletion gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -488,7 +489,7 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
}
receivedAt, ok := userEvent.events[0]["receivedAt"].(string)
if !ok || !arctx.ReplaySource {
receivedAt = time.Now().Format(misc.RFC3339Milli)
receivedAt = timeutil.Now().Format(misc.RFC3339Milli)
}
singularEventBatch := SingularEventBatch{
Batch: userEvent.events,
Expand Down
1,605 changes: 1,605 additions & 0 deletions integration_test/snowpipestreaming/snowpipestreaming_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "3.9"

services:
rudder-snowpipe-clients:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/rudder-snowpipe-clients:develop"
ports:
- "9078"
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:9078/health || exit 1
interval: 1s
retries: 25
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
transformer:
image: "rudderstack/rudder-transformer:latest"
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/develop-rudder-transformer:latest"
ports:
- "9090:9090"
healthcheck:
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/asyncdestinationmanager/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package common
import "slices"

var (
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD"}
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD", "SNOWPIPE_STREAMING"}
sftpDestinations = []string{"SFTP"}
)

Expand Down
3 changes: 3 additions & 0 deletions router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
lyticsBulkUpload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload"
marketobulkupload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/sftp"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/yandexmetrica"
)

Expand All @@ -41,6 +42,8 @@ func newRegularManager(
return klaviyobulkupload.NewManager(logger, statsFactory, destination)
case "LYTICS_BULK_UPLOAD":
return lyticsBulkUpload.NewManager(logger, statsFactory, destination)
case "SNOWPIPE_STREAMING":
return snowpipestreaming.New(conf, logger, statsFactory, destination), nil
}
return nil, errors.New("invalid destination type")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package snowpipestreaming

import (
"context"

"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
)

type apiAdapter struct {
stats struct {
createChannelCount stats.Counter
deleteChannelCount stats.Counter
insertCount stats.Counter
statusCount stats.Counter
createChannelResponseTime stats.Timer
deleteChannelResponseTime stats.Timer
insertResponseTime stats.Timer
statusResponseTime stats.Timer
}

api
}

func newApiAdapter(api api, statsFactory stats.Stats, destination *backendconfig.DestinationT) *apiAdapter {
adapter := &apiAdapter{}
adapter.api = api

tags := stats.Tags{
"module": "batch_router",
"workspaceId": destination.WorkspaceID,
"destType": destination.DestinationDefinition.Name,
"destinationId": destination.ID,
}
adapter.stats.createChannelCount = statsFactory.NewTaggedStat("snowpipestreaming_create_channel_count", stats.CountType, tags)
adapter.stats.deleteChannelCount = statsFactory.NewTaggedStat("snowpipestreaming_delete_channel_count", stats.CountType, tags)
adapter.stats.insertCount = statsFactory.NewTaggedStat("snowpipestreaming_insert_count", stats.CountType, tags)
adapter.stats.statusCount = statsFactory.NewTaggedStat("snowpipestreaming_status_count", stats.CountType, tags)
adapter.stats.createChannelResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_create_channel_response_time", stats.TimerType, tags)
adapter.stats.deleteChannelResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_delete_channel_response_time", stats.TimerType, tags)
adapter.stats.insertResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_insert_response_time", stats.TimerType, tags)
adapter.stats.statusResponseTime = statsFactory.NewTaggedStat("snowpipestreaming_status_response_time", stats.TimerType, tags)

return adapter
}

func (a *apiAdapter) CreateChannel(ctx context.Context, req *model.CreateChannelRequest) (*model.ChannelResponse, error) {
defer a.stats.createChannelCount.Increment()
defer a.stats.createChannelResponseTime.RecordDuration()()
return a.api.CreateChannel(ctx, req)
}

func (a *apiAdapter) DeleteChannel(ctx context.Context, channelID string, sync bool) error {
defer a.stats.deleteChannelCount.Increment()
defer a.stats.deleteChannelResponseTime.RecordDuration()()
return a.api.DeleteChannel(ctx, channelID, sync)
}

func (a *apiAdapter) Insert(ctx context.Context, channelID string, insertRequest *model.InsertRequest) (*model.InsertResponse, error) {
defer a.stats.insertCount.Increment()
defer a.stats.insertResponseTime.RecordDuration()()
return a.api.Insert(ctx, channelID, insertRequest)
}

func (a *apiAdapter) Status(ctx context.Context, channelID string) (*model.StatusResponse, error) {
defer a.stats.statusCount.Increment()
defer a.stats.statusResponseTime.RecordDuration()()
return a.api.Status(ctx, channelID)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package snowpipestreaming

import (
"context"
"fmt"

"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
internalapi "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (m *Manager) createChannel(
ctx context.Context,
asyncDest *common.AsyncDestinationStruct,
destConf destConfig,
tableName string,
eventSchema whutils.ModelTableSchema,
) (*model.ChannelResponse, error) {
if response, ok := m.channelCache.Load(tableName); ok {
return response.(*model.ChannelResponse), nil
}

req := &model.CreateChannelRequest{
RudderIdentifier: asyncDest.Destination.ID,
Partition: m.config.instanceID,
AccountConfig: model.AccountConfig{
Account: destConf.Account,
User: destConf.User,
Role: destConf.Role,
PrivateKey: whutils.FormatPemContent(destConf.PrivateKey),
PrivateKeyPassphrase: destConf.PrivateKeyPassphrase,
},
TableConfig: model.TableConfig{
Database: destConf.Database,
Schema: destConf.Namespace,
Table: tableName,
},
}

resp, err := m.api.CreateChannel(ctx, req)
if err != nil {
return nil, fmt.Errorf("creating channel: %v", err)
}
if resp.Success {
m.channelCache.Store(tableName, resp)
return resp, nil
}

switch resp.Code {
case internalapi.ErrSchemaDoesNotExistOrNotAuthorized:
resp, err = m.handleSchemaError(ctx, req, eventSchema)
if err != nil {
return nil, fmt.Errorf("handling schema error: %v", err)
}
if !resp.Success {
return nil, fmt.Errorf("creating channel for schema error: %s", resp.Error)
}
m.channelCache.Store(tableName, resp)
return resp, nil
case internalapi.ErrTableDoesNotExistOrNotAuthorized:
resp, err = m.handleTableError(ctx, req, eventSchema)
if err != nil {
return nil, fmt.Errorf("handling table error: %v", err)
}
if !resp.Success {
return nil, fmt.Errorf("creating channel for table error: %s", resp.Error)
}
m.channelCache.Store(tableName, resp)
return resp, nil
default:
return nil, fmt.Errorf("creating channel: %v", err)
}
}

func (m *Manager) handleSchemaError(
ctx context.Context,
channelReq *model.CreateChannelRequest,
eventSchema whutils.ModelTableSchema,
) (*model.ChannelResponse, error) {
m.stats.channelSchemaCreationErrorCount.Increment()

snowflakeManager, err := m.createSnowflakeManager(ctx, channelReq.TableConfig.Schema)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %v", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %v", err)
}
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %v", err)
}
return m.api.CreateChannel(ctx, channelReq)
}

func (m *Manager) handleTableError(
ctx context.Context,
channelReq *model.CreateChannelRequest,
eventSchema whutils.ModelTableSchema,
) (*model.ChannelResponse, error) {
m.stats.channelTableCreationErrorCount.Increment()

snowflakeManager, err := m.createSnowflakeManager(ctx, channelReq.TableConfig.Schema)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %v", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %v", err)
}
return m.api.CreateChannel(ctx, channelReq)
}

func (m *Manager) recreateChannel(
ctx context.Context,
asyncDest *common.AsyncDestinationStruct,
destConf destConfig,
tableName string,
eventSchema whutils.ModelTableSchema,
existingChannelResponse *model.ChannelResponse,
) (*model.ChannelResponse, error) {
if err := m.deleteChannel(ctx, tableName, existingChannelResponse.ChannelID); err != nil {
return nil, fmt.Errorf("deleting channel: %v", err)
}

channelResponse, err := m.createChannel(ctx, asyncDest, destConf, tableName, eventSchema)
if err != nil {
return nil, fmt.Errorf("recreating channel: %v", err)
}
return channelResponse, nil
}

func (m *Manager) deleteChannel(ctx context.Context, tableName string, channelID string) error {
m.channelCache.Delete(tableName)
if err := m.api.DeleteChannel(ctx, channelID, true); err != nil {
return fmt.Errorf("deleting channel: %v", err)
}
return nil
}

func (m *Manager) createSnowflakeManager(ctx context.Context, namespace string) (manager.Manager, error) {
modelWarehouse := whutils.ModelWarehouse{
WorkspaceID: m.destination.WorkspaceID,
Destination: *m.destination,
Namespace: namespace,
Type: m.destination.DestinationDefinition.Name,
Identifier: m.destination.WorkspaceID + ":" + m.destination.ID,
}
modelWarehouse.Destination.Config["useKeyPairAuth"] = true // Since we are currently only supporting key pair auth

sf, err := manager.New(whutils.SNOWFLAKE, m.conf, m.logger, m.statsFactory)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %v", err)
}
err = sf.Setup(ctx, modelWarehouse, &whutils.NopUploader{})
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %v", err)
}
return sf, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package snowpipestreaming

import (
"context"
"fmt"

whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (m *Manager) addColumns(ctx context.Context, namespace, tableName string, columns []whutils.ColumnInfo) error {
snowflakeManager, err := m.createSnowflakeManager(ctx, namespace)
if err != nil {
return fmt.Errorf("creating snowflake manager: %v", err)
}
defer func() {
snowflakeManager.Cleanup(ctx)
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
return fmt.Errorf("adding columns: %v", err)
}
return nil
}

func findNewColumns(eventSchema, snowPipeSchema whutils.ModelTableSchema) []whutils.ColumnInfo {
var newColumns []whutils.ColumnInfo
for column, dataType := range eventSchema {
if _, exists := snowPipeSchema[column]; !exists {
newColumns = append(newColumns, whutils.ColumnInfo{
Name: column,
Type: dataType,
})
}
}
return newColumns
}
Loading

0 comments on commit a1fe0e0

Please sign in to comment.