From 29a78f0e16548b312861d59955fce28d1d7f0d51 Mon Sep 17 00:00:00 2001 From: Andy Xie Date: Wed, 20 Feb 2019 16:30:37 +0800 Subject: [PATCH] add max_channel_client_connection_count --- nsqd/channel.go | 8 ++++++++ nsqd/options.go | 26 ++++++++++++++------------ nsqd/protocol_v2.go | 4 ++++ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 95a7adb21..f070c605e 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -388,6 +388,14 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura return c.StartDeferredTimeout(msg, timeout) } +// IsClientConnectionCountExceeded checks whether client connection count exceeds max_channel_client_connection_count. +func (c *Channel) IsClientConnectionCountExceeded() bool { + c.Lock() + defer c.Unlock() + + return len(c.clients) >= c.ctx.nsqd.getOpts().MaxChannelClientConnectionCount +} + // AddClient adds a client to the Channel's client list func (c *Channel) AddClient(clientID int64, client Consumer) { c.Lock() diff --git a/nsqd/options.go b/nsqd/options.go index fdfda8572..9cb6a51fc 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -52,12 +52,13 @@ type Options struct { ClientTimeout time.Duration // client overridable configuration options - MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` - MaxRdyCount int64 `flag:"max-rdy-count"` - MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` - MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` - MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` - OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` + MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` + MaxRdyCount int64 `flag:"max-rdy-count"` + MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` + MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` + MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` + OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` + MaxChannelClientConnectionCount int `flag:"max-channel-client-connection-count"` // statsd integration StatsdAddress string `flag:"statsd-address"` @@ -128,12 +129,13 @@ func NewOptions() *Options { MaxReqTimeout: 1 * time.Hour, ClientTimeout: 60 * time.Second, - MaxHeartbeatInterval: 60 * time.Second, - MaxRdyCount: 2500, - MaxOutputBufferSize: 64 * 1024, - MaxOutputBufferTimeout: 30 * time.Second, - MinOutputBufferTimeout: 25 * time.Millisecond, - OutputBufferTimeout: 250 * time.Millisecond, + MaxHeartbeatInterval: 60 * time.Second, + MaxRdyCount: 2500, + MaxOutputBufferSize: 64 * 1024, + MaxOutputBufferTimeout: 30 * time.Second, + MinOutputBufferTimeout: 25 * time.Millisecond, + OutputBufferTimeout: 250 * time.Millisecond, + MaxChannelClientConnectionCount: 1000, StatsdPrefix: "nsq.%s", StatsdInterval: 60 * time.Second, diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c2e7d7b42..bc6d7341a 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -615,6 +615,10 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { for { topic := p.ctx.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) + if channel.IsClientConnectionCountExceeded() { + return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CLIENTS", + fmt.Sprintf("TOO many channel clients for %s:%s", topicName, channelName)) + } channel.AddClient(client.ID, client) if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {