Skip to content

Commit

Permalink
feat: snowpipe streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 23, 2024
1 parent cc74f3c commit 68163a6
Show file tree
Hide file tree
Showing 27 changed files with 2,396 additions and 25 deletions.
742 changes: 742 additions & 0 deletions integration_test/snowpipestreaming/snowpipestreaming_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "3.9"

services:
rudder-snowpipe-clients:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/rudder-snowpipe-clients:chore.snowpipe-poc"
ports:
- "9078"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "3.9"

services:
transformer:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/develop-rudder-transformer:feat.snowpipe-streaming"
ports:
- "9090:9090"
healthcheck:
test: wget --no-verbose --tries=1 --spider http://0.0.0.0:9090/health || exit 1
interval: 1s
retries: 25
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/asyncdestinationmanager/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package common
import "slices"

var (
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD"}
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD", "SNOWPIPE_STREAMING"}
sftpDestinations = []string{"SFTP"}
)

Expand Down
3 changes: 3 additions & 0 deletions router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
lyticsBulkUpload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload"
marketobulkupload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/sftp"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/yandexmetrica"
)

Expand All @@ -41,6 +42,8 @@ func newRegularManager(
return klaviyobulkupload.NewManager(logger, statsFactory, destination)
case "LYTICS_BULK_UPLOAD":
return lyticsBulkUpload.NewManager(logger, statsFactory, destination)
case "SNOWPIPE_STREAMING":
return snowpipestreaming.New(conf, logger, statsFactory, destination), nil
}
return nil, errors.New("invalid destination type")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package snowpipestreaming

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

"github.com/rudderlabs/rudder-go-kit/httputil"
)

type accountConfig struct {
Account string `json:"account"`
User string `json:"user"`
Role string `json:"role"`
PrivateKey string `json:"privateKey"`
PrivateKeyPassphrase string `json:"privateKeyPassphrase"`
}

type tableConfig struct {
Database string `json:"database"`
Schema string `json:"schema"`
Table string `json:"table"`
}

type createChannelRequest struct {
RudderIdentifier string `json:"rudderIdentifier"`
Partition string `json:"partition"`
AccountConfig accountConfig `json:"account"`
TableConfig tableConfig `json:"table"`
}

type createChannelResponse struct {
ChannelId string `json:"channelId"`
ChannelName string `json:"channelName"`
ClientName string `json:"clientName"`
Valid bool `json:"valid"`
TableSchema map[string]map[string]any `json:"tableSchema"`
}

func (m *Manager) createChannel(ctx context.Context, channelReq *createChannelRequest) (*createChannelResponse, error) {
reqJSON, err := json.Marshal(channelReq)
if err != nil {
return nil, fmt.Errorf("marshalling create channel request: %w", err)
}

channelReqURL := m.config.clientURL + "/channels"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, channelReqURL, bytes.NewBuffer(reqJSON))
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, reqErr := m.requestDoer.Do(req)
if reqErr != nil {
return nil, fmt.Errorf("sending request: %w", reqErr)
}
defer func() { httputil.CloseResponse(resp) }()

if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("invalid status code: %d, body: %s", resp.StatusCode, string(b))
}

var res createChannelResponse
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
}

return &res, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package snowpipestreaming

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
)

func TestCreateChannel(t *testing.T) {
var (
ccr = &createChannelRequest{
RudderIdentifier: "rudderIdentifier",
Partition: "partition",
AccountConfig: accountConfig{
Account: "account",
User: "user",
Role: "role",
PrivateKey: "privateKey",
PrivateKeyPassphrase: "privateKeyPassphrase",
},
TableConfig: tableConfig{
Database: "database",
Schema: "schema",
Table: "table",
},
}
)

snowpipeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "application/json", r.Header.Get("Content-Type"))

body, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.NoError(t, r.Body.Close())
require.JSONEq(t, `{"rudderIdentifier":"rudderIdentifier","partition":"partition","account":{"account":"account","user":"user","role":"role","privateKey":"privateKey","privateKeyPassphrase":"privateKeyPassphrase"},"table":{"database":"database","schema":"schema","table":"table"}}`, string(body))

switch r.URL.String() {
case "/channels":
_, err := w.Write([]byte(`{"channelId":"channelId","channelName":"channelName","clientName":"clientName","valid":true}`))
require.NoError(t, err)
default:
require.FailNowf(t, "SnowpipeClients", "Unexpected %s to SnowpipeClients, not found: %+v", r.Method, r.URL)
w.WriteHeader(http.StatusNotFound)
}
}))
defer snowpipeServer.Close()

t.Run("Success", func(t *testing.T) {
ctx := context.Background()
destination := backendconfigtest.
NewDestinationBuilder("SNOWPIPE_STREAMING").
Build()

c := config.New()
c.Set("Snowpipe.Client.URL", snowpipeServer.URL)

manager := New(c, logger.NOP, stats.NOP, &destination, WithRequestDoer(snowpipeServer.Client()))
res, err := manager.createChannel(ctx, ccr)
require.NoError(t, err)
require.Equal(t, "channelId", res.ChannelId)
require.Equal(t, "channelName", res.ChannelName)
require.Equal(t, "clientName", res.ClientName)
require.True(t, res.Valid)
})
t.Run("Request failure", func(t *testing.T) {
ctx := context.Background()
destination := backendconfigtest.
NewDestinationBuilder("SNOWPIPE_STREAMING").
Build()

c := config.New()
c.Set("Snowpipe.Client.URL", snowpipeServer.URL)

reqDoer := &mockRequestDoer{
err: errors.New("bad client"),
}

manager := New(c, logger.NOP, stats.NOP, &destination, WithRequestDoer(reqDoer))
res, err := manager.createChannel(ctx, ccr)
require.Error(t, err)
require.Nil(t, res)
})
t.Run("Request failure (non 200's status code)", func(t *testing.T) {
ctx := context.Background()
destination := backendconfigtest.
NewDestinationBuilder("SNOWPIPE_STREAMING").
Build()

c := config.New()
c.Set("Snowpipe.Client.URL", snowpipeServer.URL)

reqDoer := &mockRequestDoer{
response: &http.Response{
StatusCode: http.StatusBadRequest,
Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))},
},
}

manager := New(c, logger.NOP, stats.NOP, &destination, WithRequestDoer(reqDoer))
res, err := manager.createChannel(ctx, ccr)
require.Error(t, err)
require.Nil(t, res)
})
t.Run("Request failure (invalid response)", func(t *testing.T) {
ctx := context.Background()
destination := backendconfigtest.
NewDestinationBuilder("SNOWPIPE_STREAMING").
Build()

c := config.New()
c.Set("Snowpipe.Client.URL", snowpipeServer.URL)

reqDoer := &mockRequestDoer{
response: &http.Response{
StatusCode: http.StatusOK,
Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{abd}`))},
},
}

manager := New(c, logger.NOP, stats.NOP, &destination, WithRequestDoer(reqDoer))
res, err := manager.createChannel(ctx, ccr)
require.Error(t, err)
require.Nil(t, res)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package snowpipestreaming

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

"github.com/rudderlabs/rudder-go-kit/httputil"
)

func (m *Manager) deleteChannel(ctx context.Context, channelReq *createChannelRequest) error {
reqJSON, err := json.Marshal(channelReq)
if err != nil {
return fmt.Errorf("marshalling create channel request: %w", err)
}

channelReqURL := m.config.clientURL + "/channels"
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, channelReqURL, bytes.NewBuffer(reqJSON))
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, reqErr := m.requestDoer.Do(req)
if reqErr != nil {
return fmt.Errorf("sending request: %w", reqErr)
}
defer func() { httputil.CloseResponse(resp) }()

if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("invalid status code: %d, body: %s", resp.StatusCode, string(b))
}

return nil
}
Loading

0 comments on commit 68163a6

Please sign in to comment.