From db6fdb1829764611ee2d81f2ed67b22e0d863c14 Mon Sep 17 00:00:00 2001 From: chengshiwen Date: Thu, 5 Sep 2024 21:57:41 +0800 Subject: [PATCH] optimize meta executor and shard writer --- cmd/influxd/run/server.go | 3 +- coordinator/meta_executor.go | 143 ++++++++++++++++++------------ coordinator/meta_executor_test.go | 2 +- coordinator/service.go | 44 ++++++++- coordinator/shard_writer.go | 47 ++++------ coordinator/shard_writer_test.go | 4 +- 6 files changed, 150 insertions(+), 93 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 2e50981..ea8fc64 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -204,7 +204,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.PointsWriter.Subscriber = s.Subscriber // Initialize meta executor. - s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.QueryTimeout), time.Duration(c.Coordinator.DialTimeout)) + s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.QueryTimeout), time.Duration(c.Coordinator.DialTimeout), + time.Duration(c.Coordinator.PoolMaxIdleTime), c.Coordinator.PoolMaxIdleStreams) s.MetaExecutor.MetaClient = s.MetaClient s.MetaExecutor.TLSConfig = tlsClientConfig diff --git a/coordinator/meta_executor.go b/coordinator/meta_executor.go index 3ba7b11..fddef23 100644 --- a/coordinator/meta_executor.go +++ b/coordinator/meta_executor.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/storage/reads" "github.com/influxdata/influxdb/storage/reads/datatypes" - "github.com/influxdata/influxdb/tcp" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" "golang.org/x/sync/errgroup" @@ -23,8 +22,11 @@ import ( // MetaExecutor executes meta queries on one or more data nodes. type MetaExecutor struct { + pool *clientPool timeout time.Duration dialTimeout time.Duration + idleTimeout time.Duration + maxStreams int nodeExecutor interface { executeOnNode(nodeID uint64, stmt influxql.Statement, database string) error @@ -41,10 +43,13 @@ type MetaExecutor struct { } // NewMetaExecutor returns a new initialized *MetaExecutor. -func NewMetaExecutor(timeout, dialTimeout time.Duration) *MetaExecutor { +func NewMetaExecutor(timeout, dialTimeout, idleTimeout time.Duration, maxStreams int) *MetaExecutor { e := &MetaExecutor{ + pool: newClientPool(), timeout: timeout, dialTimeout: dialTimeout, + idleTimeout: idleTimeout, + maxStreams: maxStreams, } e.nodeExecutor = e return e @@ -113,13 +118,15 @@ func (e *MetaExecutor) executeOnNode(nodeID uint64, stmt influxql.Statement, dat request.SetDatabase(database) // Write request. - if err := EncodeTLV(conn, executeStatementRequestMessage, &request); err != nil { + if err := EncodeTLVT(conn, executeStatementRequestMessage, &request, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp ExecuteStatementResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } @@ -177,15 +184,17 @@ func (e *MetaExecutor) TaskManagerStatement(nodeID uint64, stmt influxql.Stateme defer conn.Close() // Write request. - if err := EncodeTLV(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{ + if err := EncodeTLVT(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{ Statement: stmt.String(), - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return query.Result{}, err } // Read the response. var resp TaskManagerStatementResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return query.Result{}, err } return resp.Result, resp.Err @@ -199,17 +208,19 @@ func (e *MetaExecutor) MeasurementNames(nodeID uint64, database string, retentio defer conn.Close() // Write request. - if err := EncodeTLV(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{ + if err := EncodeTLVT(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{ Database: database, RetentionPolicy: retentionPolicy, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp MeasurementNamesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.Names, nil @@ -223,16 +234,18 @@ func (e *MetaExecutor) TagKeys(nodeID uint64, shardIDs []uint64, cond influxql.E defer conn.Close() // Write request. - if err := EncodeTLV(conn, tagKeysRequestMessage, &TagKeysRequest{ + if err := EncodeTLVT(conn, tagKeysRequestMessage, &TagKeysRequest{ ShardIDs: shardIDs, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp TagKeysResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.TagKeys, nil @@ -246,16 +259,18 @@ func (e *MetaExecutor) TagValues(nodeID uint64, shardIDs []uint64, cond influxql defer conn.Close() // Write request. - if err := EncodeTLV(conn, tagValuesRequestMessage, &TagValuesRequest{ + if err := EncodeTLVT(conn, tagValuesRequestMessage, &TagValuesRequest{ ShardIDs: shardIDs, Condition: cond, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } // Read the response. var resp TagValuesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, err } return resp.TagValues, nil @@ -269,15 +284,17 @@ func (e *MetaExecutor) SeriesSketches(nodeID uint64, database string) (estimator defer conn.Close() // Write request. - if err := EncodeTLV(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{ + if err := EncodeTLVT(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{ Database: database, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp SeriesSketchesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Sketch, resp.TSSketch, nil @@ -291,15 +308,17 @@ func (e *MetaExecutor) MeasurementsSketches(nodeID uint64, database string) (est defer conn.Close() // Write request. - if err := EncodeTLV(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{ + if err := EncodeTLVT(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{ Database: database, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp MeasurementsSketchesResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Sketch, resp.TSSketch, nil @@ -313,16 +332,18 @@ func (e *MetaExecutor) FieldDimensions(nodeID uint64, shardIDs []uint64, m *infl defer conn.Close() // Write request. - if err := EncodeTLV(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{ + if err := EncodeTLVT(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{ ShardIDs: shardIDs, Measurement: *m, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } // Read the response. var resp FieldDimensionsResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return nil, nil, err } return resp.Fields, resp.Dimensions, resp.Err @@ -336,17 +357,19 @@ func (e *MetaExecutor) MapType(nodeID uint64, shardIDs []uint64, m *influxql.Mea defer conn.Close() // Write request. - if err := EncodeTLV(conn, mapTypeRequestMessage, &MapTypeRequest{ + if err := EncodeTLVT(conn, mapTypeRequestMessage, &MapTypeRequest{ ShardIDs: shardIDs, Measurement: *m, Field: field, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return influxql.Unknown, err } // Read the response. var resp MapTypeResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return influxql.Unknown, err } return resp.Type, nil @@ -371,17 +394,19 @@ func (e *MetaExecutor) CreateIterator(nodeID uint64, shardIDs []uint64, ctx cont var resp CreateIteratorResponse if err := func() error { // Write request. - if err := EncodeTLV(conn, createIteratorRequestMessage, &CreateIteratorRequest{ + if err := EncodeTLVT(conn, createIteratorRequestMessage, &CreateIteratorRequest{ ShardIDs: shardIDs, Measurement: *m, Opt: opt, SpanContext: sc, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -404,17 +429,19 @@ func (e *MetaExecutor) IteratorCost(nodeID uint64, shardIDs []uint64, m *influxq defer conn.Close() // Write request. - if err := EncodeTLV(conn, iteratorCostRequestMessage, &IteratorCostRequest{ + if err := EncodeTLVT(conn, iteratorCostRequestMessage, &IteratorCostRequest{ ShardIDs: shardIDs, Measurement: *m, Opt: opt, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return query.IteratorCost{}, err } // Read the response. var resp IteratorCostResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return query.IteratorCost{}, err } return resp.Cost, resp.Err @@ -428,16 +455,18 @@ func (e *MetaExecutor) ReadFilter(nodeID uint64, shardIDs []uint64, ctx context. if err := func() error { // Write request. - if err := EncodeTLV(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{ + if err := EncodeTLVT(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{ ShardIDs: shardIDs, Request: *req, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp StoreReadFilterResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -460,16 +489,18 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C if err := func() error { // Write request. - if err := EncodeTLV(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{ + if err := EncodeTLVT(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{ ShardIDs: shardIDs, Request: *req, - }); err != nil { + }, e.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. var resp StoreReadGroupResponse - if _, err := DecodeTLV(conn, &resp); err != nil { + if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil { + MarkUnusable(conn) return err } else if resp.Err != nil { return err @@ -485,24 +516,18 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C } // dial returns a connection to a single node in the cluster. -func (e *MetaExecutor) dial(nodeID uint64) (net.Conn, error) { - ni, err := e.MetaClient.DataNode(nodeID) - if err != nil { - return nil, err - } - - conn, err := tcp.DialTLSTimeout("tcp", ni.TCPAddr, e.TLSConfig, e.dialTimeout) - if err != nil { - return nil, err - } - if e.timeout > 0 { - conn.SetDeadline(time.Now().Add(e.timeout)) - } - - // Write the cluster multiplexing header byte - if _, err := conn.Write([]byte{MuxHeader}); err != nil { - conn.Close() - return nil, err +func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) { + // If we don't have a connection pool for that addr yet, create one + _, ok := m.pool.getPool(nodeID) + if !ok { + factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.dialTimeout, tlsConfig: m.TLSConfig} + factory.metaClient = m.MetaClient + + p, err := NewBoundedPool(1, m.maxStreams, m.idleTimeout, factory.dial) + if err != nil { + return nil, err + } + m.pool.setPool(nodeID, p) } - return conn, nil + return m.pool.conn(nodeID) } diff --git a/coordinator/meta_executor_test.go b/coordinator/meta_executor_test.go index 7dfcdc7..aea65f8 100644 --- a/coordinator/meta_executor_test.go +++ b/coordinator/meta_executor_test.go @@ -21,7 +21,7 @@ func Test_ExecuteStatement(t *testing.T) { mock.expect("DROP DATABASE foo") mock.expect("DROP DATABASE foo") - e := NewMetaExecutor(time.Duration(0), time.Second) + e := NewMetaExecutor(time.Duration(0), time.Second, time.Minute, 1) e.MetaClient = newMockMetaClient(numOfNodes) // Replace MetaExecutor's nodeExecutor with our mock. e.nodeExecutor = mock diff --git a/coordinator/service.go b/coordinator/service.go index 5884fa6..0e1dd13 100644 --- a/coordinator/service.go +++ b/coordinator/service.go @@ -1497,6 +1497,16 @@ func (s *Service) serveHTTP() { srv.Serve(s.httpListener) } +// ReadTLVT reads a type-length-value record with timeout from r. +func ReadTLVT(r net.Conn, t time.Duration) (byte, []byte, error) { + if t > 0 { + if err := r.SetReadDeadline(time.Now().Add(t)); err != nil { + return 0, nil, err + } + } + return ReadTLV(r) +} + // ReadTLV reads a type-length-value record from r. func ReadTLV(r io.Reader) (byte, []byte, error) { typ, err := ReadType(r) @@ -1541,6 +1551,16 @@ func ReadLV(r io.Reader) ([]byte, error) { return buf, nil } +// WriteTLVT writes a type-length-value record with timeout to w. +func WriteTLVT(w net.Conn, typ byte, buf []byte, t time.Duration) error { + if t > 0 { + if err := w.SetWriteDeadline(time.Now().Add(t)); err != nil { + return err + } + } + return WriteTLV(w, typ, buf) +} + // WriteTLV writes a type-length-value record to w. func WriteTLV(w io.Writer, typ byte, buf []byte) error { if err := WriteType(w, typ); err != nil { @@ -1574,6 +1594,16 @@ func WriteLV(w io.Writer, buf []byte) error { return nil } +// EncodeTLVT encodes v to a binary format and writes the record-length-value record with timeout to w. +func EncodeTLVT(w net.Conn, typ byte, v encoding.BinaryMarshaler, t time.Duration) error { + if t > 0 { + if err := w.SetWriteDeadline(time.Now().Add(t)); err != nil { + return err + } + } + return EncodeTLV(w, typ, v) +} + // EncodeTLV encodes v to a binary format and writes the record-length-value record to w. func EncodeTLV(w io.Writer, typ byte, v encoding.BinaryMarshaler) error { if err := WriteType(w, typ); err != nil { @@ -1598,7 +1628,17 @@ func EncodeLV(w io.Writer, v encoding.BinaryMarshaler) error { return nil } -// DecodeTLV reads the type-length-value record from r and unmarshals it into v. +// DecodeTLVT decodes the type-length-value record with timeout from r and unmarshals it into v. +func DecodeTLVT(r net.Conn, v encoding.BinaryUnmarshaler, t time.Duration) (typ byte, err error) { + if t > 0 { + if err := r.SetReadDeadline(time.Now().Add(t)); err != nil { + return 0, err + } + } + return DecodeTLV(r, v) +} + +// DecodeTLV decodes the type-length-value record from r and unmarshals it into v. func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) { typ, err = ReadType(r) if err != nil { @@ -1610,7 +1650,7 @@ func DecodeTLV(r io.Reader, v encoding.BinaryUnmarshaler) (typ byte, err error) return typ, nil } -// DecodeLV reads the length-value record from r and unmarshals it into v. +// DecodeLV decodes the length-value record from r and unmarshals it into v. func DecodeLV(r io.Reader, v encoding.BinaryUnmarshaler) error { buf, err := ReadLV(r) if err != nil { diff --git a/coordinator/shard_writer.go b/coordinator/shard_writer.go index ba8ac4b..a0f4778 100644 --- a/coordinator/shard_writer.go +++ b/coordinator/shard_writer.go @@ -13,11 +13,11 @@ import ( // ShardWriter writes a set of points to a shard. type ShardWriter struct { - pool *clientPool - timeout time.Duration - dialTimeout time.Duration - idleTimeout time.Duration - maxIdleStreams int + pool *clientPool + timeout time.Duration + dialTimeout time.Duration + idleTimeout time.Duration + maxStreams int MetaClient interface { DataNode(id uint64) (ni *meta.NodeInfo, err error) @@ -28,13 +28,13 @@ type ShardWriter struct { } // NewShardWriter returns a new instance of ShardWriter. -func NewShardWriter(timeout, dialTimeout, idleTimeout time.Duration, maxIdleStreams int) *ShardWriter { +func NewShardWriter(timeout, dialTimeout, idleTimeout time.Duration, maxStreams int) *ShardWriter { return &ShardWriter{ - pool: newClientPool(), - timeout: timeout, - dialTimeout: dialTimeout, - idleTimeout: idleTimeout, - maxIdleStreams: maxIdleStreams, + pool: newClientPool(), + timeout: timeout, + dialTimeout: dialTimeout, + idleTimeout: idleTimeout, + maxStreams: maxStreams, } } @@ -53,18 +53,11 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) // WriteShardBinary writes time series binary points to a shard func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) error { - c, err := w.dial(ownerID) + conn, err := w.dial(ownerID) if err != nil { return err } - - conn, ok := c.(*pooledConn) - if !ok { - panic("wrong connection type") - } - defer func(conn net.Conn) { - conn.Close() // return to pool - }(conn) + defer conn.Close() // Determine the location of this shard and whether it still exists db, rp, sgi := w.MetaClient.ShardOwner(shardID) @@ -89,17 +82,15 @@ func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) } // Write request. - conn.SetWriteDeadline(time.Now().Add(w.timeout)) - if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil { - conn.MarkUnusable() + if err := WriteTLVT(conn, writeShardRequestMessage, buf, w.timeout); err != nil { + MarkUnusable(conn) return err } // Read the response. - conn.SetReadDeadline(time.Now().Add(w.timeout)) - _, buf, err = ReadTLV(conn) + _, buf, err = ReadTLVT(conn, w.timeout) if err != nil { - conn.MarkUnusable() + MarkUnusable(conn) return err } @@ -123,7 +114,7 @@ func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) { factory := &connFactory{nodeID: nodeID, clientPool: w.pool, timeout: w.dialTimeout, tlsConfig: w.TLSConfig} factory.metaClient = w.MetaClient - p, err := NewBoundedPool(1, w.maxIdleStreams, w.idleTimeout, DefaultPoolWaitTimeout, factory.dial) + p, err := NewBoundedPool(1, w.maxStreams, w.idleTimeout, factory.dial) if err != nil { return nil, err } @@ -143,7 +134,7 @@ func (w *ShardWriter) Close() error { } const ( - maxConnections = 1000 + maxConnections = 5000 maxRetries = 3 ) diff --git a/coordinator/shard_writer_test.go b/coordinator/shard_writer_test.go index 07eeaae..b435419 100644 --- a/coordinator/shard_writer_test.go +++ b/coordinator/shard_writer_test.go @@ -152,7 +152,7 @@ func TestShardWriter_Write_ErrReadTimeout(t *testing.T) { t.Fatal(err) } - w := coordinator.NewShardWriter(10*time.Second, time.Second, time.Millisecond, 1) + w := coordinator.NewShardWriter(10*time.Second, time.Second, time.Minute, 1) w.MetaClient = &metaClient{addr: ln.Addr().String()} now := time.Now() @@ -183,7 +183,7 @@ func TestShardWriter_Write_PoolMax(t *testing.T) { defer s.Close() defer ts.Close() - w := coordinator.NewShardWriter(10*time.Second, time.Second, 100*time.Millisecond, 0) + w := coordinator.NewShardWriter(10*time.Second, time.Second, 100*time.Minute, 0) w.MetaClient = &metaClient{addr: ts.ln.Addr().String()} now := time.Now()