diff --git a/router/router_test.go b/router/router_test.go index 68c4fcc88d..093b9f2078 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -338,6 +338,18 @@ func TestBackoff(t *testing.T) { r.throttlerFactory.(*mockThrottlerFactory).count.Store(0) }) + t.Run("drain 'Null' destType jobs", func(t *testing.T) { + r.drainer = routerutils.NewDrainer( + conf, func(string) (*routerutils.DestinationWithSources, bool) { + return nil, false + }, + ) + slot, err := r.findWorkerSlot(context.Background(), workers, &jobsdb.JobT{CustomVal: "Null", Parameters: []byte(`{}`)}, map[eventorder.BarrierKey]struct{}{}) + require.NoError(t, err) + require.NotNil(t, slot) + require.Equal(t, routerutils.DrainReasonDestDisabled, slot.drainReason) + }) + t.Run("eventorder enabled with drain job", func(t *testing.T) { r.drainer = &drainer{drain: true, reason: "drain job due to some reason"} r.guaranteeUserEventOrder = true diff --git a/router/utils/utils.go b/router/utils/utils.go index 08161779fa..1fc45992b8 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -139,6 +139,9 @@ type drainer struct { func (d *drainer) Drain( job *jobsdb.JobT, ) (bool, string) { + if job.CustomVal == "Null" { + return true, DrainReasonDestDisabled + } createdAt := job.CreatedAt var jobParams JobParameters _ = json.Unmarshal(job.Parameters, &jobParams)