diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 2e50981..95b8d68 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -204,7 +204,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.PointsWriter.Subscriber = s.Subscriber // Initialize meta executor. - s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.QueryTimeout), time.Duration(c.Coordinator.DialTimeout)) + s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.ShardReaderTimeout), time.Duration(c.Coordinator.DialTimeout), + time.Duration(c.Coordinator.PoolMaxIdleTime), c.Coordinator.PoolMaxIdleStreams) s.MetaExecutor.MetaClient = s.MetaClient s.MetaExecutor.TLSConfig = tlsClientConfig diff --git a/coordinator/client_pool.go b/coordinator/client_pool.go index 7904803..bf8ff06 100644 --- a/coordinator/client_pool.go +++ b/coordinator/client_pool.go @@ -1,10 +1,13 @@ package coordinator import ( + "errors" "net" "sync" ) +var ErrClientClosed = errors.New("client already closed") + type clientPool struct { mu sync.RWMutex pool map[uint64]Pool diff --git a/coordinator/config.go b/coordinator/config.go index 0e39295..ce3eba4 100644 --- a/coordinator/config.go +++ b/coordinator/config.go @@ -26,6 +26,9 @@ const ( // remains idle in the connection pool. DefaultPoolMaxIdleTime = time.Minute + // DefaultShardReaderTimeout is the default timeout set on shard readers. + DefaultShardReaderTimeout = time.Duration(0) + // DefaultWriteTimeout is the default timeout for a complete write to succeed. DefaultWriteTimeout = 10 * time.Second @@ -78,6 +81,7 @@ func NewConfig() Config { DialTimeout: toml.Duration(DefaultDialTimeout), PoolMaxIdleStreams: DefaultPoolMaxIdleStreams, PoolMaxIdleTime: toml.Duration(DefaultPoolMaxIdleTime), + ShardReaderTimeout: toml.Duration(DefaultShardReaderTimeout), WriteTimeout: toml.Duration(DefaultWriteTimeout), QueryTimeout: toml.Duration(query.DefaultQueryTimeout), MaxConcurrentQueries: DefaultMaxConcurrentQueries, diff --git a/coordinator/meta_executor.go b/coordinator/meta_executor.go index 3ba7b11..9f004da 100644 --- a/coordinator/meta_executor.go +++ b/coordinator/meta_executor.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/storage/reads" "github.com/influxdata/influxdb/storage/reads/datatypes" - "github.com/influxdata/influxdb/tcp" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" "golang.org/x/sync/errgroup" @@ -23,8 +22,11 @@ import ( // MetaExecutor executes meta queries on one or more data nodes. type MetaExecutor struct { + pool *clientPool timeout time.Duration dialTimeout time.Duration + idleTime time.Duration + maxStreams int nodeExecutor interface { executeOnNode(nodeID uint64, stmt influxql.Statement, database string) error @@ -41,10 +43,13 @@ type MetaExecutor struct { } // NewMetaExecutor returns a new initialized *MetaExecutor. -func NewMetaExecutor(timeout, dialTimeout time.Duration) *MetaExecutor { +func NewMetaExecutor(timeout, dialTimeout, idleTime time.Duration, maxStreams int) *MetaExecutor { e := &MetaExecutor{ + pool: newClientPool(), timeout: timeout, dialTimeout: dialTimeout, + idleTime: idleTime, + maxStreams: maxStreams, } e.nodeExecutor = e return e @@ -113,13 +118,15 @@ func (e *MetaExecutor) executeOnNode(nodeID uint64, stmt influxql.Statement, dat request.SetDatabase(database) // Write request. - if err := EncodeTLV(conn, executeStatementRequestMessage, &request); err != nil { + if err := EncodeTLVT(conn, executeStatementRequestMessage, &request, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp ExecuteStatementResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } @@ -177,15 +184,17 @@ func (e *MetaExecutor) TaskManagerStatement(nodeID uint64, stmt influxql.Stateme defer conn.Close() // Write request. - if err := EncodeTLV(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{ + if err := EncodeTLVT(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{ Statement: stmt.String(), - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return query.Result{}, err } // Read the response. var resp TaskManagerStatementResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return query.Result{}, err } return resp.Result, resp.Err @@ -199,17 +208,19 @@ func (e *MetaExecutor) MeasurementNames(nodeID uint64, database string, retentio defer conn.Close() // Write request. - if err := EncodeTLV(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{ + if err := EncodeTLVT(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{ Database: database, RetentionPolicy: retentionPolicy, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp MeasurementNamesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.Names, nil @@ -223,16 +234,18 @@ func (e *MetaExecutor) TagKeys(nodeID uint64, shardIDs []uint64, cond influxql.E defer conn.Close() // Write request. - if err := EncodeTLV(conn, tagKeysRequestMessage, &TagKeysRequest{ + if err := EncodeTLVT(conn, tagKeysRequestMessage, &TagKeysRequest{ ShardIDs: shardIDs, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp TagKeysResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.TagKeys, nil @@ -246,16 +259,18 @@ func (e *MetaExecutor) TagValues(nodeID uint64, shardIDs []uint64, cond influxql defer conn.Close() // Write request. - if err := EncodeTLV(conn, tagValuesRequestMessage, &TagValuesRequest{ + if err := EncodeTLVT(conn, tagValuesRequestMessage, &TagValuesRequest{ ShardIDs: shardIDs, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp TagValuesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.TagValues, nil @@ -269,15 +284,17 @@ func (e *MetaExecutor) SeriesSketches(nodeID uint64, database string) (estimator defer conn.Close() // Write request. - if err := EncodeTLV(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{ + if err := EncodeTLVT(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{ Database: database, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp SeriesSketchesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Sketch, resp.TSSketch, nil @@ -291,15 +308,17 @@ func (e *MetaExecutor) MeasurementsSketches(nodeID uint64, database string) (est defer conn.Close() // Write request. - if err := EncodeTLV(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{ + if err := EncodeTLVT(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{ Database: database, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp MeasurementsSketchesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Sketch, resp.TSSketch, nil @@ -313,16 +332,18 @@ func (e *MetaExecutor) FieldDimensions(nodeID uint64, shardIDs []uint64, m *infl defer conn.Close() // Write request. - if err := EncodeTLV(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{ + if err := EncodeTLVT(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{ ShardIDs: shardIDs, Measurement: *m, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp FieldDimensionsResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Fields, resp.Dimensions, resp.Err @@ -336,17 +357,19 @@ func (e *MetaExecutor) MapType(nodeID uint64, shardIDs []uint64, m *influxql.Mea defer conn.Close() // Write request. - if err := EncodeTLV(conn, mapTypeRequestMessage, &MapTypeRequest{ + if err := EncodeTLVT(conn, mapTypeRequestMessage, &MapTypeRequest{ ShardIDs: shardIDs, Measurement: *m, Field: field, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return influxql.Unknown, err } // Read the response. var resp MapTypeResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return influxql.Unknown, err } return resp.Type, nil @@ -371,17 +394,19 @@ func (e *MetaExecutor) CreateIterator(nodeID uint64, shardIDs []uint64, ctx cont var resp CreateIteratorResponse if err := func() error { // Write request. - if err := EncodeTLV(conn, createIteratorRequestMessage, &CreateIteratorRequest{ + if err := EncodeTLVT(conn, createIteratorRequestMessage, &CreateIteratorRequest{ ShardIDs: shardIDs, Measurement: *m, Opt: opt, SpanContext: sc, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -404,17 +429,19 @@ func (e *MetaExecutor) IteratorCost(nodeID uint64, shardIDs []uint64, m *influxq defer conn.Close() // Write request. - if err := EncodeTLV(conn, iteratorCostRequestMessage, &IteratorCostRequest{ + if err := EncodeTLVT(conn, iteratorCostRequestMessage, &IteratorCostRequest{ ShardIDs: shardIDs, Measurement: *m, Opt: opt, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return query.IteratorCost{}, err } // Read the response. var resp IteratorCostResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return query.IteratorCost{}, err } return resp.Cost, resp.Err @@ -428,16 +455,18 @@ func (e *MetaExecutor) ReadFilter(nodeID uint64, shardIDs []uint64, ctx context. if err := func() error { // Write request. - if err := EncodeTLV(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{ + if err := EncodeTLVT(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{ ShardIDs: shardIDs, Request: *req, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp StoreReadFilterResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -460,16 +489,18 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C if err := func() error { // Write request. - if err := EncodeTLV(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{ + if err := EncodeTLVT(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{ ShardIDs: shardIDs, Request: *req, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp StoreReadGroupResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -486,23 +517,27 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C // dial returns a connection to a single node in the cluster. func (e *MetaExecutor) dial(nodeID uint64) (net.Conn, error) { - ni, err := e.MetaClient.DataNode(nodeID) - if err != nil { - return nil, err - } + // If we don't have a connection pool for that addr yet, create one + _, ok := e.pool.getPool(nodeID) + if !ok { + factory := &connFactory{nodeID: nodeID, clientPool: e.pool, timeout: e.dialTimeout, tlsConfig: e.TLSConfig} + factory.metaClient = e.MetaClient - conn, err := tcp.DialTLSTimeout("tcp", ni.TCPAddr, e.TLSConfig, e.dialTimeout) - if err != nil { - return nil, err - } - if e.timeout > 0 { - conn.SetDeadline(time.Now().Add(e.timeout)) + p, err := NewBoundedPool(1, e.maxStreams, e.idleTime, factory.dial) + if err != nil { + return nil, err + } + e.pool.setPool(nodeID, p) } + return e.pool.conn(nodeID) +} - // Write the cluster multiplexing header byte - if _, err := conn.Write([]byte{MuxHeader}); err != nil { - conn.Close() - return nil, err +// Close closes MetaExecutor's pool +func (e *MetaExecutor) Close() error { + if e.pool == nil { + return ErrClientClosed } - return conn, nil + e.pool.close() + e.pool = nil + return nil } diff --git a/coordinator/meta_executor_test.go b/coordinator/meta_executor_test.go index 7dfcdc7..aea65f8 100644 --- a/coordinator/meta_executor_test.go +++ b/coordinator/meta_executor_test.go @@ -21,7 +21,7 @@ func Test_ExecuteStatement(t *testing.T) { mock.expect("DROP DATABASE foo") mock.expect("DROP DATABASE foo") - e := NewMetaExecutor(time.Duration(0), time.Second) + e := NewMetaExecutor(time.Duration(0), time.Second, time.Minute, 1) e.MetaClient = newMockMetaClient(numOfNodes) // Replace MetaExecutor's nodeExecutor with our mock. e.nodeExecutor = mock diff --git a/coordinator/service.go b/coordinator/service.go index 5884fa6..0e1dd13 100644 --- a/coordinator/service.go +++ b/coordinator/service.go @@ -1497,6 +1497,16 @@ func (s *Service) serveHTTP() { srv.Serve(s.httpListener) } +// ReadTLVT reads a type-length-value record with timeout from r. +func ReadTLVT(r net.Conn, t time.Duration) (byte, []byte, error) { + if t > 0 { + if err := r.SetReadDeadline(time.Now().Add(t)); err != nil { + return 0, nil, err + } + } + return ReadTLV(r) +} + // ReadTLV reads a type-length-value record from r. func ReadTLV(r io.Reader) (byte, []byte, error) { typ, err := ReadType(r) @@ -1541,6 +1551,16 @@ func ReadLV(r io.Reader) ([]byte, error) { return buf, nil } +// WriteTLVT writes a type-length-value record with timeout to w. +func WriteTLVT(w net.Conn, typ byte, buf []byte, t time.Duration) error { + if t > 0 { + if err := w.SetWriteDeadline(time.Now().Add(t)); err != nil { + return err + } + } + return WriteTLV(w, typ, buf) +} + // WriteTLV writes a type-length-value record to w. func WriteTLV(w io.Writer, typ byte, buf []byte) error { if err := WriteType(w, typ); err != nil { @@ -1574,6 +1594,16 @@ func WriteLV(w io.Writer, buf []byte) error { return nil } +// EncodeTLVT encodes v to a binary format and writes the record-length-value record with timeout to w. +func EncodeTLVT(w net.Conn, typ byte, v encoding.BinaryMarshaler, t time.Duration) error { + if t > 0 { + if err := w.SetWriteDeadline(time.Now().Add(t)); err != nil { + return err + } + } + return EncodeTLV(w, typ, v) +} + // EncodeTLV encodes v to a binary format and writes the record-length-value record to w. func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error { if err := WriteType(w, typ); err != nil { @@ -1598,7 +1628,17 @@ func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error { return nil } -// DecodeTLV reads the type-length-value record from r and unmarshals it into v. +// DecodeTLVT decodes the type-length-value record with timeout from r and unmarshals it into v. +func DecodeTLVT(r net.Conn, v encoding.BinaryUnmarshaler, t time.Duration) (typ byte, err error) { + if t > 0 { + if err := r.SetReadDeadline(time.Now().Add(t)); err != nil { + return 0, err + } + } + return DecodeTLV(r, v) +} + +// DecodeTLV decodes the type-length-value record from r and unmarshals it into v. func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) { typ, err = ReadType(r) if err != nil { @@ -1610,7 +1650,7 @@ func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) return typ, nil } -// DecodeLV reads the length-value record from r and unmarshals it into v. +// DecodeLV decodes the length-value record from r and unmarshals it into v. func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error { buf, err := ReadLV(r) if err != nil { diff --git a/coordinator/shard_writer.go b/coordinator/shard_writer.go index ba8ac4b..86ed363 100644 --- a/coordinator/shard_writer.go +++ b/coordinator/shard_writer.go @@ -13,11 +13,11 @@ import ( // ShardWriter writes a set of points to a shard. type ShardWriter struct { - pool *clientPool - timeout time.Duration - dialTimeout time.Duration - idleTimeout time.Duration - maxIdleStreams int + pool *clientPool + timeout time.Duration + dialTimeout time.Duration + idleTime time.Duration + maxStreams int MetaClient interface { DataNode(id uint64) (ni *meta.NodeInfo, err error) @@ -28,13 +28,13 @@ type ShardWriter struct { } // NewShardWriter returns a new instance of ShardWriter. -func NewShardWriter(timeout, dialTimeout, idleTimeout time.Duration, maxIdleStreams int) *ShardWriter { +func NewShardWriter(timeout, dialTimeout, idleTime time.Duration, maxStreams int) *ShardWriter { return &ShardWriter{ - pool: newClientPool(), - timeout: timeout, - dialTimeout: dialTimeout, - idleTimeout: idleTimeout, - maxIdleStreams: maxIdleStreams, + pool: newClientPool(), + timeout: timeout, + dialTimeout: dialTimeout, + idleTime: idleTime, + maxStreams: maxStreams, } } @@ -53,18 +53,11 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) // WriteShardBinary writes time series binary points to a shard func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) error { - c, err := w.dial(ownerID) + conn, err := w.dial(ownerID) if err != nil { return err } - - conn, ok := c.(*pooledConn) - if !ok { - panic("wrong connection type") - } - defer func(conn net.Conn) { - conn.Close() // return to pool - }(conn) + defer conn.Close() // Determine the location of this shard and whether it still exists db, rp, sgi := w.MetaClient.ShardOwner(shardID) @@ -89,17 +82,15 @@ func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) } // Write request. - conn.SetWriteDeadline(time.Now().Add(w.timeout)) - if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil { - conn.MarkUnusable() + if err := WriteTLVT(conn, writeShardRequestMessage, buf, w.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. - conn.SetReadDeadline(time.Now().Add(w.timeout)) - _, buf, err = ReadTLV(conn) + _, buf, err = ReadTLVT(conn, w.timeout) if err != nil { - conn.MarkUnusable() + MarkUnusable(conn) return err } @@ -116,6 +107,7 @@ func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) return nil } +// dial returns a connection to a single node in the cluster. func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) { // If we don't have a connection pool for that addr yet, create one _, ok := w.pool.getPool(nodeID) @@ -123,7 +115,7 @@ func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) { factory := &connFactory{nodeID: nodeID, clientPool: w.pool, timeout: w.dialTimeout, tlsConfig: w.TLSConfig} factory.metaClient = w.MetaClient - p, err := NewBoundedPool(1, w.maxIdleStreams, w.idleTimeout, DefaultPoolWaitTimeout, factory.dial) + p, err := NewBoundedPool(1, w.maxStreams, w.idleTime, factory.dial) if err != nil { return nil, err } @@ -135,7 +127,7 @@ func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) { // Close closes ShardWriter's pool func (w *ShardWriter) Close() error { if w.pool == nil { - return fmt.Errorf("client already closed") + return ErrClientClosed } w.pool.close() w.pool = nil @@ -143,7 +135,7 @@ func (w *ShardWriter) Close() error { } const ( - maxConnections = 1000 + maxConnections = 5000 maxRetries = 3 ) diff --git a/coordinator/shard_writer_test.go b/coordinator/shard_writer_test.go index 07eeaae..b435419 100644 --- a/coordinator/shard_writer_test.go +++ b/coordinator/shard_writer_test.go @@ -152,7 +152,7 @@ func TestShardWriter_Write_ErrReadTimeout(t *testing.T) { t.Fatal(err) } - w := coordinator.NewShardWriter(10*time.Second, time.Second, time.Millisecond, 1) + w := coordinator.NewShardWriter(10*time.Second, time.Second, time.Minute, 1) w.MetaClient = &metaClient{addr: ln.Addr().String()} now := time.Now() @@ -183,7 +183,7 @@ func TestShardWriter_Write_PoolMax(t *testing.T) { defer s.Close() defer ts.Close() - w := coordinator.NewShardWriter(10*time.Second, time.Second, 100*time.Millisecond, 0) + w := coordinator.NewShardWriter(10*time.Second, time.Second, 100*time.Minute, 0) w.MetaClient = &metaClient{addr: ts.ln.Addr().String()} now := time.Now()