diff --git a/api.go b/api.go index 124bc4c..c2a20eb 100644 --- a/api.go +++ b/api.go @@ -382,6 +382,10 @@ func (p JobQuery) ProgressChanNoWait(jobID string) chan<- string { for { select { case progress := <-lpch: + if progress == "" { + continue + } + // println("setting progress", jobID, progress) err := p.SetProgress(jobID, progress) if err != nil { log.WARNING.Printf("failed to set progress for job id %s: %s, err: %v", jobID, progress, err) @@ -410,7 +414,12 @@ func (p JobQuery) ProgressChanNoWait(jobID string) chan<- string { close(done) return default: - progress = <-pch + select { + case progress = <-pch: + case <-p.done: + close(done) + return + } } } }() diff --git a/api_test.go b/api_test.go index 3de3af8..9276542 100644 --- a/api_test.go +++ b/api_test.go @@ -131,7 +131,6 @@ func ExampleProcess() { defer p.Close() interruptedChan := make(chan struct{}) - println(interruptedChan) done := make(chan struct{}) go func() { @@ -254,7 +253,6 @@ func TestChannelAPI(t *testing.T) { defer j.Close() interruptedChan := j.InterruptedChan(jobID) - println(interruptedChan) processChan := j.ProgressChan(jobID) // emulate a long running task @@ -405,15 +403,12 @@ func TestWithInterruptCtx(t *testing.T) { } func TestProgressNoWait(t *testing.T) { - c := redigomock.NewConn() jobID := uuid.New().String() jq := NewJobQuery(c) ch := jq.ProgressChanNoWait(jobID) cmd := c.Command("SET") - c.Command("EXPIRE") - c.Command("FLUSH") for i := 0; i < 10; i++ { // time.Sleep(100 * time.Millisecond) @@ -422,5 +417,16 @@ func TestProgressNoWait(t *testing.T) { require.NoError(t, jq.Close()) require.NoError(t, c.Err()) - assert.Equal(t, c.Stats(cmd), 1, "should be called once") + assert.Truef(t, 10 > c.Stats(cmd), "should be called once") +} + +func TestProgressNoWaitNonBlock(t *testing.T) { + + c := redigomock.NewConn() + jobID := uuid.New().String() + jq := NewJobQuery(c) + + jq.ProgressChanNoWait(jobID) + time.Sleep(100 * time.Millisecond) + jq.Close() }