diff --git a/client.go b/client.go index a17bd40..e45a564 100644 --- a/client.go +++ b/client.go @@ -132,8 +132,8 @@ func (c *Client) Producer(cfg *rdkafka.ConfigMap) (*Producer, error) { return c.producer, nil } -// Close closes c's underlying producers and consumers. Calling Close on -// an already closed client will result in a panic. +// Close closes producers, consumers and the connection of c. +// Calling Close on an already closed client will result in a panic. func (c *Client) Close() { for cid := range c.consumers { // TODO(agis): make this blocking @@ -143,4 +143,6 @@ func (c *Client) Close() { if c.producer != nil { c.producer.Close() } + + c.conn.Close() } diff --git a/server.go b/server.go index cf8858e..6c9842b 100644 --- a/server.go +++ b/server.go @@ -55,10 +55,9 @@ func NewServer(ctx context.Context, manager *ConsumerManager, timeout time.Durat } func (s *Server) handleConn(conn net.Conn) { - defer conn.Close() - c := NewClient(conn, s.manager) defer c.Close() + s.clientByID.Store(c.id, c) defer func() { s.clientByID.Delete(c.id)