Skip to content

Commit

Permalink
fix: warehouse router tracker (#5407)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Jan 8, 2025
1 parent 29b279d commit 8e314b6
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 369 deletions.
4 changes: 4 additions & 0 deletions warehouse/internal/model/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (w *Warehouse) GetPreferAppendSetting() bool {
}
return value
}

func (w *Warehouse) IsEnabled() bool {
return w.Source.Enabled && w.Destination.Enabled
}
3 changes: 1 addition & 2 deletions warehouse/router/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -309,7 +308,7 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse)
RETURNING id
`, warehouseutils.WarehouseUploadsTable)

now := timeutil.Now()
now := r.now()
row := r.db.QueryRow(
sqlStatement,
warehouse.Source.ID,
Expand Down
45 changes: 32 additions & 13 deletions warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ type Router struct {
stagingFilesBatchSize config.ValueLoader[int]
warehouseSyncFreqIgnore config.ValueLoader[bool]
cronTrackerRetries config.ValueLoader[int64]
uploadBufferTimeInMin config.ValueLoader[time.Duration]
}

stats struct {
processingPendingJobsStat stats.Measurement
processingAvailableWorkersStat stats.Measurement
processingPickupLagStat stats.Measurement
processingPickupWaitTimeStat stats.Measurement

schedulerWarehouseLengthStat stats.Measurement
schedulerTotalSchedulingTimeStat stats.Measurement
processingPendingJobsStat stats.Gauge
processingAvailableWorkersStat stats.Gauge
processingPickupLagStat stats.Timer
processingPickupWaitTimeStat stats.Timer
schedulerWarehouseLengthStat stats.Gauge
schedulerTotalSchedulingTimeStat stats.Timer
cronTrackerExecTimestamp stats.Gauge
}
}

Expand Down Expand Up @@ -150,7 +151,7 @@ func New(
r.tenantManager = tenantManager
r.bcManager = bcManager
r.destType = destType
r.now = time.Now
r.now = timeutil.Now
r.triggerStore = triggerStore
r.createJobMarkerMap = make(map[string]time.Time)
r.createUploadAlways = createUploadAlways
Expand Down Expand Up @@ -200,7 +201,7 @@ func (r *Router) Start(ctx context.Context) error {
return nil
}))
g.Go(crash.NotifyWarehouse(func() error {
return r.CronTracker(gCtx)
return r.cronTracker(gCtx)
}))
return g.Wait()
}
Expand Down Expand Up @@ -616,7 +617,7 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse

func (r *Router) uploadStartAfterTime() time.Time {
if r.config.enableJitterForSyncs.Load() {
return timeutil.Now().Add(time.Duration(rand.Intn(15)) * time.Second)
return r.now().Add(time.Duration(rand.Intn(15)) * time.Second)
}
return r.now()
}
Expand Down Expand Up @@ -711,14 +712,32 @@ func (r *Router) loadReloadableConfig(whName string) {
r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs")
r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore")
r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries")
r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin")
}

func (r *Router) loadStats() {
tags := stats.Tags{"destType": r.destType}
tags := stats.Tags{"module": moduleName, "destType": r.destType}
r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, tags)
r.stats.processingAvailableWorkersStat = r.statsFactory.NewTaggedStat("wh_processing_available_workers", stats.GaugeType, tags)
r.stats.processingPickupLagStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_lag", stats.TimerType, tags)
r.stats.processingPickupWaitTimeStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_wait_time", stats.TimerType, tags)
r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler.warehouse_length", stats.GaugeType, tags)
r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler.total_scheduling_time", stats.TimerType, tags)
r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler_warehouse_length", stats.GaugeType, tags)
r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler_total_scheduling_time", stats.TimerType, tags)
r.stats.cronTrackerExecTimestamp = r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, tags)
}

func (r *Router) copyWarehouses() []model.Warehouse {
r.configSubscriberLock.RLock()
defer r.configSubscriberLock.RUnlock()

warehouses := make([]model.Warehouse, len(r.warehouses))
copy(warehouses, r.warehouses)
return warehouses
}

func (r *Router) getNowSQL() string {
if r.nowSQL != "" {
return r.nowSQL
}
return "NOW()"
}
3 changes: 2 additions & 1 deletion warehouse/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/rudderlabs/rudder-server/services/notifier"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/bcm"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestRouter(t *testing.T) {

db := sqlmiddleware.New(pgResource.DB)

now := time.Now()
now := timeutil.Now()

repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
Expand Down
11 changes: 6 additions & 5 deletions warehouse/router/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

backendConfig "github.com/rudderlabs/rudder-server/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/timeutil"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {
Identifier: "test_identifier_upload_frequency_exceeded",
}

now := time.Now()
now := timeutil.Now()

r := Router{}
r.conf = config.New()
Expand All @@ -260,7 +261,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {
Identifier: "test_identifier_upload_frequency_exceeded",
}

now := time.Now()
now := timeutil.Now()

r := Router{}
r.conf = config.New()
Expand Down Expand Up @@ -316,7 +317,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {
},
}

now := time.Now()
now := timeutil.Now()

r := Router{}
r.conf = config.New()
Expand Down Expand Up @@ -344,7 +345,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {
},
}

now := time.Now()
now := timeutil.Now()

r := Router{}
r.conf = config.New()
Expand Down Expand Up @@ -453,7 +454,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {
return tc.now
}

r.updateCreateJobMarker(w, time.Now())
r.updateCreateJobMarker(w, now)

err := r.canCreateUpload(context.Background(), w)
if tc.wantErr != nil {
Expand Down
23 changes: 0 additions & 23 deletions warehouse/router/testdata/sql/seed_tracker_test.sql

This file was deleted.

Loading

0 comments on commit 8e314b6

Please sign in to comment.