From 667c739c212e55a5ddde2a33d4be2b9376d2c7e5 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Tue, 7 Apr 2015 16:20:51 -0700 Subject: [PATCH 1/4] config: update LookupdPollInterval and RDYRedistributeInterval --- config.go | 10 ++++++++-- consumer.go | 6 +++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/config.go b/config.go index 7566681d..a88378e2 100644 --- a/config.go +++ b/config.go @@ -111,7 +111,10 @@ type Config struct { // Duration between polling lookupd for new producers, and fractional jitter to add to // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers // restart at the same time - LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"` + // + // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between + // reconnection attempts + LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"` LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"` // Maximum duration when REQueueing (for doubling of deferred requeue) @@ -128,10 +131,13 @@ type Config struct { // Maximum number of times this consumer will attempt to process a message before giving up MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"` - // Amount of time in seconds to wait for a message from a producer when in a state where RDY + // Duration to wait for a message from a producer when in a state where RDY // counts are re-distributed (ie. max_in_flight < num_producers) LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"` + // Duration between redistributing max-in-flight to connections + RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"` + // Identifiers sent to nsqd representing this client // UserAgent is in the spirit of HTTP (default: "/") ClientID string `opt:"client_id"` // (defaults: short hostname) diff --git a/consumer.go b/consumer.go index 4b0f3432..9077cc1e 100644 --- a/consumer.go +++ b/consumer.go @@ -728,8 +728,8 @@ func (r *Consumer) onConnClose(c *Conn) { // try to reconnect after a bit go func(addr string) { for { - r.log(LogLevelInfo, "(%s) re-connecting in 15 seconds...", addr) - time.Sleep(15 * time.Second) + r.log(LogLevelInfo, "(%s) re-connecting in %.04f seconds...", addr, r.config.LookupdPollInterval) + time.Sleep(r.config.LookupdPollInterval) if atomic.LoadInt32(&r.stopFlag) == 1 { break } @@ -868,7 +868,7 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) { } func (r *Consumer) rdyLoop() { - redistributeTicker := time.NewTicker(5 * time.Second) + redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval) for { select { From 46e252c48a08b549d0146b7b669de51da67caf8f Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Tue, 7 Apr 2015 16:22:01 -0700 Subject: [PATCH 2/4] test: add test for single conn disconnect during backoff --- mock_test.go | 163 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 146 insertions(+), 17 deletions(-) diff --git a/mock_test.go b/mock_test.go index 57814b69..62b9fc00 100644 --- a/mock_test.go +++ b/mock_test.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "net" "strconv" @@ -15,6 +14,23 @@ import ( "time" ) +type tbLog interface { + Log(...interface{}) +} + +type testLogger struct { + tbLog +} + +func (tl *testLogger) Output(maxdepth int, s string) error { + tl.Log(s) + return nil +} + +func newTestLogger(tbl tbLog) logger { + return &testLogger{tbl} +} + type instruction struct { delay time.Duration frameType int32 @@ -29,14 +45,13 @@ type mockNSQD struct { exitChan chan int } -func newMockNSQD(script []instruction) *mockNSQD { +func newMockNSQD(script []instruction, addr string) *mockNSQD { n := &mockNSQD{ script: script, exitChan: make(chan int), } - addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0") - tcpListener, err := net.Listen("tcp", addr.String()) + tcpListener, err := net.Listen("tcp", addr) if err != nil { log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err) } @@ -147,6 +162,7 @@ func (n *mockNSQD) handle(conn net.Conn) { exit: n.tcpListener.Close() + conn.Close() } func framedResponse(frameType int32, data []byte) []byte { @@ -174,18 +190,17 @@ func framedResponse(frameType int32, data []byte) []byte { type testHandler struct{} func (h *testHandler) HandleMessage(message *Message) error { - if bytes.Equal(message.Body, []byte("requeue")) { + switch string(message.Body) { + case "requeue": message.Requeue(-1) return nil - } - if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) { + case "requeue_no_backoff_1": if message.Attempts > 1 { return nil } message.RequeueWithoutBackoff(-1) return nil - } - if bytes.Equal(message.Body, []byte("bad")) { + case "bad": return errors.New("bad") } return nil @@ -198,8 +213,6 @@ func frameMessage(m *Message) []byte { } func TestConsumerBackoff(t *testing.T) { - logger := log.New(ioutil.Discard, "", log.LstdFlags) - msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgGood := NewMessage(msgIDGood, []byte("good")) @@ -221,14 +234,16 @@ func TestConsumerBackoff(t *testing.T) { // needed to exit test instruction{200 * time.Millisecond, -1, []byte("exit")}, } - n := newMockNSQD(script) + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + n := newMockNSQD(script, addr.String()) topicName := "test_consumer_commands" + strconv.Itoa(int(time.Now().Unix())) config := NewConfig() config.MaxInFlight = 5 config.BackoffMultiplier = 10 * time.Millisecond q, _ := NewConsumer(topicName, "ch", config) - q.SetLogger(logger, LogLevelDebug) + q.SetLogger(newTestLogger(t), LogLevelDebug) q.AddHandler(&testHandler{}) err := q.ConnectToNSQD(n.tcpAddr.String()) if err != nil { @@ -272,8 +287,6 @@ func TestConsumerBackoff(t *testing.T) { } func TestConsumerRequeueNoBackoff(t *testing.T) { - // logger := log.New(ioutil.Discard, "", log.LstdFlags) - msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} @@ -293,14 +306,16 @@ func TestConsumerRequeueNoBackoff(t *testing.T) { // needed to exit test instruction{100 * time.Millisecond, -1, []byte("exit")}, } - n := newMockNSQD(script) + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + n := newMockNSQD(script, addr.String()) topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix())) config := NewConfig() config.MaxInFlight = 1 config.BackoffMultiplier = 10 * time.Millisecond q, _ := NewConsumer(topicName, "ch", config) - // q.SetLogger(logger, LogLevelDebug) + q.SetLogger(newTestLogger(t), LogLevelDebug) q.AddHandler(&testHandler{}) err := q.ConnectToNSQD(n.tcpAddr.String()) if err != nil { @@ -341,3 +356,117 @@ func TestConsumerRequeueNoBackoff(t *testing.T) { } } } + +func TestConsumerBackoffDisconnect(t *testing.T) { + msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + + msgGood := NewMessage(msgIDGood, []byte("good")) + msgRequeue := NewMessage(msgIDRequeue, []byte("requeue")) + + script := []instruction{ + // SUB + instruction{0, FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + n := newMockNSQD(script, addr.String()) + + topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix())) + config := NewConfig() + config.MaxInFlight = 5 + config.BackoffMultiplier = 10 * time.Millisecond + config.LookupdPollInterval = 10 * time.Millisecond + config.RDYRedistributeInterval = 10 * time.Millisecond + q, _ := NewConsumer(topicName, "ch", config) + q.SetLogger(newTestLogger(t), LogLevelDebug) + q.AddHandler(&testHandler{}) + err := q.ConnectToNSQD(n.tcpAddr.String()) + if err != nil { + t.Fatalf(err.Error()) + } + + select { + case <-n.exitChan: + log.Printf("clean exit") + case <-time.After(500 * time.Millisecond): + log.Printf("timeout") + } + + for i, r := range n.got { + log.Printf("%d: %s", i, r) + } + + expected := []string{ + "IDENTIFY", + "SUB " + topicName + " ch", + "RDY 5", + fmt.Sprintf("FIN %s", msgIDGood), + "RDY 0", + fmt.Sprintf("REQ %s 0", msgIDRequeue), + "RDY 1", + "RDY 0", + fmt.Sprintf("REQ %s 0", msgIDRequeue), + "RDY 1", + "RDY 0", + fmt.Sprintf("FIN %s", msgIDGood), + "RDY 1", + } + if len(n.got) != len(expected) { + t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected)) + } + for i, r := range n.got { + if string(r) != expected[i] { + t.Fatalf("cmd %d bad %s != %s", i, r, expected[i]) + } + } + + script = []instruction{ + // SUB + instruction{0, FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + + n = newMockNSQD(script, n.tcpAddr.String()) + + select { + case <-n.exitChan: + log.Printf("clean exit") + case <-time.After(500 * time.Millisecond): + log.Printf("timeout") + } + + for i, r := range n.got { + log.Printf("%d: %s", i, r) + } + + expected = []string{ + "IDENTIFY", + "SUB " + topicName + " ch", + "RDY 1", + "RDY 5", + fmt.Sprintf("FIN %s", msgIDGood), + fmt.Sprintf("FIN %s", msgIDGood), + } + if len(n.got) != len(expected) { + t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected)) + } + for i, r := range n.got { + if string(r) != expected[i] { + t.Fatalf("cmd %d bad %s != %s", i, r, expected[i]) + } + } +} From aebe75cf62ad237fe05fc0501b89661ee01a1ee5 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Tue, 7 Apr 2015 16:22:28 -0700 Subject: [PATCH 3/4] consumer/producer: improve debug logging --- consumer.go | 12 +++++++++--- producer_test.go | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index 9077cc1e..285ce779 100644 --- a/consumer.go +++ b/consumer.go @@ -817,7 +817,8 @@ func (r *Consumer) resume() { // pick a random connection to test the waters conns := r.conns() if len(conns) == 0 { - // backoff again + r.log(LogLevelWarning, "no connection available to resume") + r.log(LogLevelWarning, "backing off for %.04f seconds", 1) r.backoff(time.Second) return } @@ -831,7 +832,8 @@ func (r *Consumer) resume() { // while in backoff only ever let 1 message at a time through err := r.updateRDY(choice, 1) if err != nil { - r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) + r.log(LogLevelWarning, "(%s) error resuming RDY 1 - %s", choice.String(), err) + r.log(LogLevelWarning, "backing off for %.04f seconds", 1) r.backoff(time.Second) return } @@ -848,7 +850,11 @@ func (r *Consumer) inBackoffTimeout() bool { } func (r *Consumer) maybeUpdateRDY(conn *Conn) { - if r.inBackoff() || r.inBackoffTimeout() { + inBackoff := r.inBackoff() + inBackoffTimeout := r.inBackoffTimeout() + if inBackoff || inBackoffTimeout { + r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v", + conn, inBackoff, inBackoffTimeout) return } diff --git a/producer_test.go b/producer_test.go index 111177f8..820ce96d 100644 --- a/producer_test.go +++ b/producer_test.go @@ -51,7 +51,7 @@ func TestProducerConnection(t *testing.T) { err := w.Publish("write_test", []byte("test")) if err != nil { - t.Fatalf("should lazily connect") + t.Fatalf("should lazily connect - %s", err) } conn := w.conn.(*Conn) From 2e31ccc3474eb56e8fec075e17c507dfcde455c8 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Tue, 7 Apr 2015 16:22:45 -0700 Subject: [PATCH 4/4] consumer: don't allow redistribute to run with no connections --- consumer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/consumer.go b/consumer.go index 285ce779..6a4d75cf 100644 --- a/consumer.go +++ b/consumer.go @@ -955,16 +955,22 @@ func (r *Consumer) redistributeRDY() { return } - numConns := int32(len(r.conns())) + // if an external heuristic set needRDYRedistributed we want to wait + // until we can actually redistribute to proceed + conns := r.conns() + if len(conns) == 0 { + return + } + maxInFlight := r.getMaxInFlight() - if numConns > maxInFlight { + if len(conns) > int(maxInFlight) { r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)", - numConns, maxInFlight) + len(conns), maxInFlight) atomic.StoreInt32(&r.needRDYRedistributed, 1) } - if r.inBackoff() && numConns > 1 { - r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", numConns) + if r.inBackoff() && len(conns) > 1 { + r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns)) atomic.StoreInt32(&r.needRDYRedistributed, 1) } @@ -972,7 +978,6 @@ func (r *Consumer) redistributeRDY() { return } - conns := r.conns() possibleConns := make([]*Conn, 0, len(conns)) for _, c := range conns { lastMsgDuration := time.Now().Sub(c.LastMessageTime())