Skip to content

Commit

Permalink
feat: implement background syncing for warehouse schema
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Jan 3, 2025
1 parent 18f4bdf commit f539685
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 92 deletions.
9 changes: 9 additions & 0 deletions warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Router struct {
waitForWorkerSleep time.Duration
uploadAllocatorSleep time.Duration
uploadStatusTrackFrequency time.Duration
enableSyncSchema bool
syncSchemaFrequency time.Duration
shouldPopulateHistoricIdentities bool
uploadFreqInS config.ValueLoader[int64]
noOfWorkers config.ValueLoader[int]
Expand Down Expand Up @@ -202,6 +204,11 @@ func (r *Router) Start(ctx context.Context) error {
g.Go(crash.NotifyWarehouse(func() error {
return r.CronTracker(gCtx)
}))
if r.config.enableSyncSchema {
g.Go(crash.NotifyWarehouse(func() error {
return r.sync(gCtx)
}))

Check warning on line 210 in warehouse/router/router.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/router.go#L208-L210

Added lines #L208 - L210 were not covered by tests
}
return g.Wait()
}

Expand Down Expand Up @@ -711,6 +718,8 @@ func (r *Router) loadReloadableConfig(whName string) {
r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs")
r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore")
r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries")
r.config.syncSchemaFrequency = r.conf.GetDurationVar(12, time.Hour, "Warehouse.syncSchemaFrequency")
r.config.enableSyncSchema = r.conf.GetBoolVar(false, "Warehouse.enableSyncSchema")
}

func (r *Router) loadStats() {
Expand Down
44 changes: 44 additions & 0 deletions warehouse/router/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package router

import (
"context"
"fmt"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (r *Router) sync(ctx context.Context) error {
for {
r.configSubscriberLock.RLock()
warehouses := append([]model.Warehouse{}, r.warehouses...)
r.configSubscriberLock.RUnlock()
execTime := time.Now()
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
return fmt.Errorf("failed to create warehouse manager: %w", err)
}

Check warning on line 24 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L23-L24

Added lines #L23 - L24 were not covered by tests
for _, warehouse := range warehouses {
err := whManager.Setup(ctx, warehouse, warehouseutils.NewNoOpUploader())
if err != nil {
r.logger.Errorn("failed to setup WH Manager", obskit.Error(err))
continue

Check warning on line 29 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L28-L29

Added lines #L28 - L29 were not covered by tests
}
if err := schema.SyncSchema(ctx, whManager, warehouse, r.db, r.logger.Child("syncer")); err != nil {
r.logger.Errorn("failed to sync schema", obskit.Error(err))
continue

Check warning on line 33 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L32-L33

Added lines #L32 - L33 were not covered by tests
}
}
nextExecTime := execTime.Add(r.config.syncSchemaFrequency)
select {
case <-ctx.Done():
r.logger.Infon("context is cancelled, stopped running schema syncer")
return nil
case <-time.After(time.Until(nextExecTime)):

Check warning on line 41 in warehouse/router/sync.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/sync.go#L41

Added line #L41 was not covered by tests
}
}
}
154 changes: 154 additions & 0 deletions warehouse/router/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package router

import (
"context"
"fmt"
"reflect"
"testing"
"time"

miniogo "github.com/minio/minio-go/v7"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func TestSync_SyncRemoteSchemaIntegration(t *testing.T) {
destinationType := warehouseutils.POSTGRES
bucket := "some-bucket"
sourceID := "test-source-id"
destinationID := "test-destination-id"
workspaceID := "test-workspace-id"
provider := "MINIO"
sslMode := "disable"
testNamespace := "test_namespace"
testTable := "test_table"

ctx := context.Background()
pool, err := dockertest.NewPool("")
require.NoError(t, err)
pgResource, err := postgres.Setup(pool, t)
require.NoError(t, err)
minioResource, err := minio.Setup(pool, t)
require.NoError(t, err)

err = minioResource.Client.MakeBucket(ctx, bucket, miniogo.MakeBucketOptions{
Region: "us-east-1",
})
require.NoError(t, err)
t.Log("db:", pgResource.DBDsn)
conf := config.New()
err = (&migrator.Migrator{
Handle: pgResource.DB,
MigrationsTable: "wh_schema_migrations",
}).Migrate("warehouse")
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)

warehouse := model.Warehouse{
WorkspaceID: workspaceID,
Source: backendconfig.SourceT{
ID: sourceID,
},
Destination: backendconfig.DestinationT{
ID: destinationID,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: destinationType,
},
Config: map[string]interface{}{
"host": pgResource.Host,
"port": pgResource.Port,
"database": pgResource.Database,
"user": pgResource.User,
"password": pgResource.Password,
"sslMode": sslMode,
"bucketProvider": provider,
"bucketName": minioResource.BucketName,
"accessKeyID": minioResource.AccessKeyID,
"secretAccessKey": minioResource.AccessKeySecret,
"endPoint": minioResource.Endpoint,
},
},
Namespace: "test_namespace",
Identifier: "RS:test-source-id:test-destination-id-create-jobs",
}
r := Router{
logger: logger.NOP,
conf: conf,
db: db,
warehouses: []model.Warehouse{warehouse},
destType: warehouseutils.POSTGRES,
statsFactory: stats.NOP,
}
r.config.syncSchemaFrequency = 1 * time.Hour

setupCh := make(chan struct{})
syncCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
go func() {
defer close(setupCh)
err = r.sync(syncCtx)
require.NoError(t, err)
}()

t.Run("fetching schema from postgres", func(t *testing.T) {
schemaSql := fmt.Sprintf("CREATE SCHEMA %q;", testNamespace)
_, err = pgResource.DB.Exec(schemaSql)
require.NoError(t, err)

tableSql := fmt.Sprintf(`CREATE TABLE %q.%q (
job_id BIGSERIAL PRIMARY KEY,
workspace_id TEXT NOT NULL DEFAULT '',
uuid UUID NOT NULL,
user_id TEXT NOT NULL,
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload JSONB NOT NULL,
event_count INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, testNamespace, testTable)
_, err = pgResource.DB.Exec(tableSql)
require.NoError(t, err)

<-setupCh
r.conf.Set("Warehouse.enableSyncSchema", true)
sh, err := schema.New(
context.Background(),
r.db,
warehouse,
r.conf,
r.logger.Child("syncer"),
r.statsFactory,
)
require.NoError(t, err)
require.Eventually(t, func() bool {
schema, err := sh.GetLocalSchema(ctx)
require.NoError(t, err)
return reflect.DeepEqual(schema, model.Schema{
"test_table": model.TableSchema{
"created_at": "datetime",
"event_count": "int",
"event_payload": "json",
"expire_at": "datetime",
"job_id": "int",
"parameters": "json",
"user_id": "string",
"workspace_id": "string",
},
})
}, 3*time.Second, 100*time.Millisecond)
cancel()
})
}
23 changes: 15 additions & 8 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type UploadJob struct {
stagingFileRepo *repo.StagingFiles
loadFilesRepo *repo.LoadFiles
whManager manager.Manager
schemaHandle *schema.Schema
schemaHandle schema.SchemaHandler
conf *config.Config
logger logger.Logger
statsFactory stats.Stats
Expand Down Expand Up @@ -148,6 +148,19 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
logfield.UseRudderStorage, dto.Upload.UseRudderStorage,
)

schemaHandle, err := schema.New(
ctx,
f.db,
dto.Warehouse,
f.conf,
f.logger.Child("warehouse"),
f.statsFactory,
)
if err != nil {
log.Errorw("failed to create schema handler", logfield.Error, err)
return nil
}

Check warning on line 162 in warehouse/router/upload.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/upload.go#L160-L162

Added lines #L160 - L162 were not covered by tests

uj := &UploadJob{
ctx: ujCtx,
reporting: f.reporting,
Expand All @@ -162,13 +175,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
uploadsRepo: repo.NewUploads(f.db),
stagingFileRepo: repo.NewStagingFiles(f.db),
loadFilesRepo: repo.NewLoadFiles(f.db),
schemaHandle: schema.New(
f.db,
dto.Warehouse,
f.conf,
f.logger.Child("warehouse"),
f.statsFactory,
),
schemaHandle: schemaHandle,

upload: dto.Upload,
warehouse: dto.Warehouse,
Expand Down
19 changes: 10 additions & 9 deletions warehouse/router/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -129,15 +128,19 @@ func TestColumnCountStat(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
conf := config.New()
conf.Set(fmt.Sprintf("Warehouse.%s.columnCountLimit", strings.ToLower(warehouseutils.WHDestNameMap[tc.destinationType])), tc.columnCountLimit)

j := UploadJob{
conf: conf,
upload: model.Upload{
uploadJobFactory := &UploadJobFactory{
logger: logger.NOP,
statsFactory: statsStore,
conf: conf,
}
rs := redshift.New(config.New(), logger.NOP, stats.NOP)
j := uploadJobFactory.NewUploadJob(context.Background(), &model.UploadJob{
Upload: model.Upload{
WorkspaceID: workspaceID,
DestinationID: destinationID,
SourceID: sourceID,
},
warehouse: model.Warehouse{
Warehouse: model.Warehouse{
Type: tc.destinationType,
Destination: backendconfig.DestinationT{
ID: destinationID,
Expand All @@ -148,9 +151,7 @@ func TestColumnCountStat(t *testing.T) {
Name: sourceName,
},
},
statsFactory: statsStore,
schemaHandle: &schema.Schema{}, // TODO use constructor
}
}, rs)
j.schemaHandle.UpdateWarehouseTableSchema(tableName, model.TableSchema{
"test-column-1": "string",
"test-column-2": "string",
Expand Down
Loading

0 comments on commit f539685

Please sign in to comment.