Skip to content

Commit

Permalink
optimize meta executor and shard writer
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Sep 6, 2024
1 parent 8fe2a44 commit c93a72b
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 90 deletions.
3 changes: 2 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.PointsWriter.Subscriber = s.Subscriber

// Initialize meta executor.
s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.QueryTimeout), time.Duration(c.Coordinator.DialTimeout))
s.MetaExecutor = coordinator.NewMetaExecutor(time.Duration(c.Coordinator.QueryTimeout), time.Duration(c.Coordinator.DialTimeout),
time.Duration(c.Coordinator.PoolMaxIdleTime), c.Coordinator.PoolMaxIdleStreams)
s.MetaExecutor.MetaClient = s.MetaClient
s.MetaExecutor.TLSConfig = tlsClientConfig

Expand Down
3 changes: 3 additions & 0 deletions coordinator/client_pool.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package coordinator

import (
"errors"
"net"
"sync"
)

var ErrClientClosed = errors.New("client already closed")

type clientPool struct {
mu sync.RWMutex
pool map[uint64]Pool
Expand Down
145 changes: 90 additions & 55 deletions coordinator/meta_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tcp"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"golang.org/x/sync/errgroup"
)

// MetaExecutor executes meta queries on one or more data nodes.
type MetaExecutor struct {
pool *clientPool
timeout time.Duration
dialTimeout time.Duration
idleTime time.Duration
maxStreams int

nodeExecutor interface {
executeOnNode(nodeID uint64, stmt influxql.Statement, database string) error
Expand All @@ -41,10 +43,13 @@ type MetaExecutor struct {
}

// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor(timeout, dialTimeout time.Duration) *MetaExecutor {
func NewMetaExecutor(timeout, dialTimeout, idleTime time.Duration, maxStreams int) *MetaExecutor {
e := &MetaExecutor{
pool: newClientPool(),
timeout: timeout,
dialTimeout: dialTimeout,
idleTime: idleTime,
maxStreams: maxStreams,
}
e.nodeExecutor = e
return e
Expand Down Expand Up @@ -113,13 +118,15 @@ func (e *MetaExecutor) executeOnNode(nodeID uint64, stmt influxql.Statement, dat
request.SetDatabase(database)

// Write request.
if err := EncodeTLV(conn, executeStatementRequestMessage, &request); err != nil {
if err := EncodeTLVT(conn, executeStatementRequestMessage, &request, e.timeout); err != nil {
MarkUnusable(conn)
return err
}

// Read the response.
var resp ExecuteStatementResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return err
}

Expand Down Expand Up @@ -177,15 +184,17 @@ func (e *MetaExecutor) TaskManagerStatement(nodeID uint64, stmt influxql.Stateme
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{
if err := EncodeTLVT(conn, taskManagerStatementRequestMessage, &TaskManagerStatementRequest{
Statement: stmt.String(),
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return query.Result{}, err
}

// Read the response.
var resp TaskManagerStatementResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return query.Result{}, err
}
return resp.Result, resp.Err
Expand All @@ -199,17 +208,19 @@ func (e *MetaExecutor) MeasurementNames(nodeID uint64, database string, retentio
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{
if err := EncodeTLVT(conn, measurementNamesRequestMessage, &MeasurementNamesRequest{
Database: database,
RetentionPolicy: retentionPolicy,
Condition: cond,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}

// Read the response.
var resp MeasurementNamesResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}
return resp.Names, nil
Expand All @@ -223,16 +234,18 @@ func (e *MetaExecutor) TagKeys(nodeID uint64, shardIDs []uint64, cond influxql.E
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, tagKeysRequestMessage, &TagKeysRequest{
if err := EncodeTLVT(conn, tagKeysRequestMessage, &TagKeysRequest{
ShardIDs: shardIDs,
Condition: cond,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}

// Read the response.
var resp TagKeysResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}
return resp.TagKeys, nil
Expand All @@ -246,16 +259,18 @@ func (e *MetaExecutor) TagValues(nodeID uint64, shardIDs []uint64, cond influxql
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, tagValuesRequestMessage, &TagValuesRequest{
if err := EncodeTLVT(conn, tagValuesRequestMessage, &TagValuesRequest{
ShardIDs: shardIDs,
Condition: cond,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}

// Read the response.
var resp TagValuesResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, err
}
return resp.TagValues, nil
Expand All @@ -269,15 +284,17 @@ func (e *MetaExecutor) SeriesSketches(nodeID uint64, database string) (estimator
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{
if err := EncodeTLVT(conn, seriesSketchesRequestMessage, &SeriesSketchesRequest{
Database: database,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}

// Read the response.
var resp SeriesSketchesResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}
return resp.Sketch, resp.TSSketch, nil
Expand All @@ -291,15 +308,17 @@ func (e *MetaExecutor) MeasurementsSketches(nodeID uint64, database string) (est
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{
if err := EncodeTLVT(conn, measurementsSketchesRequestMessage, &MeasurementsSketchesRequest{
Database: database,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}

// Read the response.
var resp MeasurementsSketchesResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}
return resp.Sketch, resp.TSSketch, nil
Expand All @@ -313,16 +332,18 @@ func (e *MetaExecutor) FieldDimensions(nodeID uint64, shardIDs []uint64, m *infl
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{
if err := EncodeTLVT(conn, fieldDimensionsRequestMessage, &FieldDimensionsRequest{
ShardIDs: shardIDs,
Measurement: *m,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}

// Read the response.
var resp FieldDimensionsResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return nil, nil, err
}
return resp.Fields, resp.Dimensions, resp.Err
Expand All @@ -336,17 +357,19 @@ func (e *MetaExecutor) MapType(nodeID uint64, shardIDs []uint64, m *influxql.Mea
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, mapTypeRequestMessage, &MapTypeRequest{
if err := EncodeTLVT(conn, mapTypeRequestMessage, &MapTypeRequest{
ShardIDs: shardIDs,
Measurement: *m,
Field: field,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return influxql.Unknown, err
}

// Read the response.
var resp MapTypeResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return influxql.Unknown, err
}
return resp.Type, nil
Expand All @@ -371,17 +394,19 @@ func (e *MetaExecutor) CreateIterator(nodeID uint64, shardIDs []uint64, ctx cont
var resp CreateIteratorResponse
if err := func() error {
// Write request.
if err := EncodeTLV(conn, createIteratorRequestMessage, &CreateIteratorRequest{
if err := EncodeTLVT(conn, createIteratorRequestMessage, &CreateIteratorRequest{
ShardIDs: shardIDs,
Measurement: *m,
Opt: opt,
SpanContext: sc,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return err
}

// Read the response.
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return err
} else if resp.Err != nil {
return err
Expand All @@ -404,17 +429,19 @@ func (e *MetaExecutor) IteratorCost(nodeID uint64, shardIDs []uint64, m *influxq
defer conn.Close()

// Write request.
if err := EncodeTLV(conn, iteratorCostRequestMessage, &IteratorCostRequest{
if err := EncodeTLVT(conn, iteratorCostRequestMessage, &IteratorCostRequest{
ShardIDs: shardIDs,
Measurement: *m,
Opt: opt,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return query.IteratorCost{}, err
}

// Read the response.
var resp IteratorCostResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return query.IteratorCost{}, err
}
return resp.Cost, resp.Err
Expand All @@ -428,16 +455,18 @@ func (e *MetaExecutor) ReadFilter(nodeID uint64, shardIDs []uint64, ctx context.

if err := func() error {
// Write request.
if err := EncodeTLV(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{
if err := EncodeTLVT(conn, storeReadFilterRequestMessage, &StoreReadFilterRequest{
ShardIDs: shardIDs,
Request: *req,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return err
}

// Read the response.
var resp StoreReadFilterResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return err
} else if resp.Err != nil {
return err
Expand All @@ -460,16 +489,18 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C

if err := func() error {
// Write request.
if err := EncodeTLV(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{
if err := EncodeTLVT(conn, storeReadGroupRequestMessage, &StoreReadGroupRequest{
ShardIDs: shardIDs,
Request: *req,
}); err != nil {
}, e.timeout); err != nil {
MarkUnusable(conn)
return err
}

// Read the response.
var resp StoreReadGroupResponse
if _, err := DecodeTLV(conn, &resp); err != nil {
if _, err := DecodeTLVT(conn, &resp, e.timeout); err != nil {
MarkUnusable(conn)
return err
} else if resp.Err != nil {
return err
Expand All @@ -486,23 +517,27 @@ func (e *MetaExecutor) ReadGroup(nodeID uint64, shardIDs []uint64, ctx context.C

// dial returns a connection to a single node in the cluster.
func (e *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
ni, err := e.MetaClient.DataNode(nodeID)
if err != nil {
return nil, err
}
// If we don't have a connection pool for that addr yet, create one
_, ok := e.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: e.pool, timeout: e.dialTimeout, tlsConfig: e.TLSConfig}
factory.metaClient = e.MetaClient

conn, err := tcp.DialTLSTimeout("tcp", ni.TCPAddr, e.TLSConfig, e.dialTimeout)
if err != nil {
return nil, err
}
if e.timeout > 0 {
conn.SetDeadline(time.Now().Add(e.timeout))
p, err := NewBoundedPool(1, e.maxStreams, e.idleTime, factory.dial)
if err != nil {
return nil, err
}
e.pool.setPool(nodeID, p)
}
return e.pool.conn(nodeID)
}

// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
conn.Close()
return nil, err
// Close closes MetaExecutor's pool
func (e *MetaExecutor) Close() error {
if e.pool == nil {
return ErrClientClosed
}
return conn, nil
e.pool.close()
e.pool = nil
return nil
}
2 changes: 1 addition & 1 deletion coordinator/meta_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Test_ExecuteStatement(t *testing.T) {
mock.expect("DROP DATABASE foo")
mock.expect("DROP DATABASE foo")

e := NewMetaExecutor(time.Duration(0), time.Second)
e := NewMetaExecutor(time.Duration(0), time.Second, time.Minute, 1)
e.MetaClient = newMockMetaClient(numOfNodes)
// Replace MetaExecutor's nodeExecutor with our mock.
e.nodeExecutor = mock
Expand Down
Loading

0 comments on commit c93a72b

Please sign in to comment.