diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 70ee9fd..3028ca3 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "strings" "sync" "sync/atomic" "time" @@ -554,7 +553,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points))) err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points) - if err != nil && isRetryable(err) { + if err != nil && hh.IsRetryable(err) { // The remote write failed so queue it via hinted handoff atomic.AddInt64(&w.stats.PointWriteReqHH, int64(len(points))) hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points) @@ -594,6 +593,10 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. atomic.AddInt64(&w.stats.WriteErr, 1) w.Logger.Error("Write failed", zap.Uint64("node_id", result.Owner.NodeID), zap.Uint64("shard_id", shard.ID), zap.Error(result.Err)) + if result.Err.Error() == hh.ErrQueueBlocked.Error() { + continue + } + // Keep track of the first error we see to return back to the client if writeError == nil { writeError = result.Err @@ -622,14 +625,3 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. return ErrWriteFailed } - -// isRetryable returns true if this error is temporary and could be retried -func isRetryable(err error) bool { - if err == nil { - return true - } - if strings.HasPrefix(err.Error(), tsdb.ErrFieldTypeConflict.Error()) { - return false - } - return true -} diff --git a/coordinator/rpc.go b/coordinator/rpc.go index 28e2266..37182e5 100644 --- a/coordinator/rpc.go +++ b/coordinator/rpc.go @@ -47,6 +47,9 @@ func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionP // Points returns the time series Points func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() } +// SetBinaryPoints sets the time series binary points +func (w *WriteShardRequest) SetBinaryPoints(points [][]byte) { w.pb.Points = points } + // AddPoint adds a new time series point func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { pt, err := models.NewPoint( diff --git a/coordinator/shard_writer.go b/coordinator/shard_writer.go index 1039be3..ba8ac4b 100644 --- a/coordinator/shard_writer.go +++ b/coordinator/shard_writer.go @@ -40,6 +40,19 @@ func NewShardWriter(timeout, dialTimeout, idleTimeout time.Duration, maxIdleStre // WriteShard writes time series points to a shard func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error { + pts := make([][]byte, 0, len(points)) + for _, p := range points { + b, err := p.MarshalBinary() + if err != nil { + continue + } + pts = append(pts, b) + } + return w.WriteShardBinary(shardID, ownerID, pts) +} + +// WriteShardBinary writes time series binary points to a shard +func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) error { c, err := w.dial(ownerID) if err != nil { return err @@ -67,7 +80,7 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) request.SetShardID(shardID) request.SetDatabase(db) request.SetRetentionPolicy(rp) - request.AddPoints(points) + request.SetBinaryPoints(points) // Marshal into protocol buffers. buf, err := request.MarshalBinary() diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 8425282..7c6fb64 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -684,7 +684,7 @@ dir = "/var/lib/influxdb/hh" # Maximum number of writes into the hinted-handoff queue that can be pending. # This is writes incoming to the hh queue, not outbound from the queue. -# max-pending-writes = 1024 +# max-writes-pending = 1024 ### ### [anti-entropy] diff --git a/services/hh/config.go b/services/hh/config.go index 6ada00c..a036622 100644 --- a/services/hh/config.go +++ b/services/hh/config.go @@ -99,6 +99,9 @@ func (c *Config) Validate() error { if c.PurgeInterval <= 0 { return errors.New("purge-interval must be positive") } + if c.MaxWritesPending < 0 { + return errors.New("max-writes-pending must be non-negative") + } return nil } diff --git a/services/hh/node_processor.go b/services/hh/node_processor.go index 85c3c72..10d3a70 100644 --- a/services/hh/node_processor.go +++ b/services/hh/node_processor.go @@ -1,9 +1,11 @@ package hh import ( + "encoding/binary" "fmt" "io" "os" + "strings" "sync" "sync/atomic" "time" @@ -20,6 +22,7 @@ type NodeProcessor struct { RetryInterval time.Duration // Interval between periodic write-to-node attempts. RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts. RetryRateLimit int64 // Limits the rate data is sent to node. + MaxWritesPending int // Maximum number of incoming pending writes. MaxSize int64 // Maximum size an underlying queue can get. MaxAge time.Duration // Maximum age queue data can get before purging. nodeID uint64 @@ -47,6 +50,7 @@ func NewNodeProcessor(cfg Config, nodeID, shardID uint64, dir string, w shardWri RetryInterval: time.Duration(cfg.RetryInterval), RetryMaxInterval: time.Duration(cfg.RetryMaxInterval), RetryRateLimit: cfg.RetryRateLimit, + MaxWritesPending: cfg.MaxWritesPending, MaxSize: cfg.MaxSize, MaxAge: time.Duration(cfg.MaxAge), nodeID: nodeID, @@ -84,7 +88,7 @@ func (n *NodeProcessor) Open() error { } // Create the queue of hinted-handoff data. - queue, err := newQueue(n.dir, n.MaxSize) + queue, err := newQueue(n.dir, n.MaxSize, n.MaxWritesPending) if err != nil { return err } @@ -126,7 +130,7 @@ func (n *NodeProcessor) Statistics(tags map[string]string) []models.Statistic { statBytesRead: atomic.LoadInt64(&n.stats.BytesRead), statBytesWritten: atomic.LoadInt64(&n.stats.BytesWritten), statQueueBytes: n.queue.diskUsage(), - statQueueDepth: len(n.queue.segments), + statQueueDepth: int64(len(n.queue.segments)), statWriteBlocked: atomic.LoadInt64(&n.stats.WriteBlocked), statWriteDropped: atomic.LoadInt64(&n.stats.WriteDropped), statWriteShardReq: atomic.LoadInt64(&n.stats.WriteShardReq), @@ -166,16 +170,22 @@ func (n *NodeProcessor) WriteShard(points []models.Point) error { i, j := 0, len(points) for i < j { - b := marshalWrite(points[i:j]) + b := marshalWrite(n.shardID, points[i:j]) for len(b) > defaultSegmentSize { if j == i+1 { return ErrSegmentFull } j = (i + j + 1) / 2 - b = marshalWrite(points[i:j]) + b = marshalWrite(n.shardID, points[i:j]) } atomic.AddInt64(&n.stats.BytesWritten, int64(len(b))) if err := n.queue.Append(b); err != nil { + switch err { + case ErrQueueBlocked: + atomic.AddInt64(&n.stats.WriteBlocked, 1) + case ErrQueueFull: + atomic.AddInt64(&n.stats.WriteDropped, 1) + } return err } if j == len(points) { @@ -263,14 +273,25 @@ func (n *NodeProcessor) SendWrite() (int, error) { // Get the current block from the queue buf, err := n.queue.Current() if err != nil { + if err != io.EOF { + n.Logger.Error("Failed to current queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err)) + // Try to truncate it. + if err := n.queue.Truncate(); err != nil { + n.Logger.Error("Failed to truncate queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err)) + } + } else { + // Try to skip it. + if err := n.queue.Advance(); err != nil { + n.Logger.Error("Failed to advance queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err)) + } + } return 0, err } // unmarshal the byte slice back to shard ID and points - points, err := unmarshalWrite(buf) + _, points, err := unmarshalWrite(buf) if err != nil { - atomic.AddInt64(&n.stats.WriteDropped, int64(len(buf))) - n.Logger.Error("Unmarshal write failed", zap.Error(err)) + n.Logger.Error("Unmarshal write failed", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err)) // Try to skip it. if err := n.queue.Advance(); err != nil { n.Logger.Error("Failed to advance queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err)) @@ -278,10 +299,11 @@ func (n *NodeProcessor) SendWrite() (int, error) { return 0, err } - if err := n.writer.WriteShard(n.shardID, n.nodeID, points); err != nil { + if err := n.writer.WriteShardBinary(n.shardID, n.nodeID, points); err != nil && IsRetryable(err) { atomic.AddInt64(&n.stats.WriteNodeReqFail, 1) return 0, err } + atomic.AddInt64(&n.stats.BytesRead, int64(len(buf))) atomic.AddInt64(&n.stats.WriteNodeReq, 1) atomic.AddInt64(&n.stats.WriteNodeReqPoints, int64(len(points))) @@ -324,21 +346,50 @@ func (n *NodeProcessor) Empty() bool { return n.queue.Empty() } -func marshalWrite(points []models.Point) []byte { - var b []byte - if len(points) > 0 { - b = make([]byte, 0, (len(points[0].String())+1)*len(points)) +// IsRetryable returns true if this error is temporary and could be retried +func IsRetryable(err error) bool { + if err == nil { + return false + } + if strings.Contains(err.Error(), "field type conflict") || strings.Contains(err.Error(), "partial write") { + return false } + return true +} + +func marshalWrite(shardID uint64, points []models.Point) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, shardID) + nb := make([]byte, 4) for _, p := range points { - b = append(b, []byte(p.String())...) - b = append(b, '\n') + pb, err := p.MarshalBinary() + if err != nil { + continue + } + binary.BigEndian.PutUint32(nb, uint32(len(pb))) + b = append(b, nb...) + b = append(b, pb...) } return b } -func unmarshalWrite(b []byte) ([]models.Point, error) { - if len(b) == 0 { - return nil, fmt.Errorf("too short: zero") +func unmarshalWrite(b []byte) (uint64, [][]byte, error) { + if len(b) < 8 { + return 0, nil, fmt.Errorf("too short: len = %d", len(b)) + } + shardID, b := binary.BigEndian.Uint64(b[:8]), b[8:] + var points [][]byte + var n int + for len(b) > 0 { + if len(b) < 4 { + return shardID, points, io.ErrShortBuffer + } + n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:] + if len(b) < n { + return shardID, points, io.ErrShortBuffer + } + points = append(points, b[:n]) + b = b[n:] } - return models.ParsePoints(b) + return shardID, points, nil } diff --git a/services/hh/node_processor_test.go b/services/hh/node_processor_test.go index a91f071..0937df9 100644 --- a/services/hh/node_processor_test.go +++ b/services/hh/node_processor_test.go @@ -4,6 +4,7 @@ import ( "io" "io/ioutil" "os" + "strings" "testing" "time" @@ -12,10 +13,10 @@ import ( ) type fakeShardWriter struct { - ShardWriteFn func(shardID, nodeID uint64, points []models.Point) error + ShardWriteFn func(shardID, nodeID uint64, points [][]byte) error } -func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Point) error { +func (f *fakeShardWriter) WriteShardBinary(shardID, nodeID uint64, points [][]byte) error { return f.ShardWriteFn(shardID, nodeID, points) } @@ -38,7 +39,7 @@ func TestNodeProcessorSendBlock(t *testing.T) { pt := models.MustNewPoint("cpu", models.NewTags(map[string]string{"foo": "bar"}), models.Fields{"value": 1.0}, time.Unix(0, 0)) sh := &fakeShardWriter{ - ShardWriteFn: func(shardID, nodeID uint64, points []models.Point) error { + ShardWriteFn: func(shardID, nodeID uint64, points [][]byte) error { count++ if shardID != expShardID { t.Errorf("SendWrite() shardID mismatch: got %v, exp %v", shardID, expShardID) @@ -51,8 +52,13 @@ func TestNodeProcessorSendBlock(t *testing.T) { t.Fatalf("SendWrite() points mismatch: got %v, exp %v", len(points), exp) } - if points[0].String() != pt.String() { - t.Fatalf("SendWrite() points mismatch:\n got %v\n exp %v", points[0].String(), pt.String()) + p, err := models.NewPointFromBytes(points[0]) + if err != nil { + t.Fatalf("SendWrite() point bytes mismatch: got %v, exp %v", err, pt.String()) + } + + if p.String() != pt.String() { + t.Fatalf("SendWrite() points mismatch:\n got %v\n exp %v", p.String(), pt.String()) } return nil @@ -110,7 +116,7 @@ func TestNodeProcessorSendBlock(t *testing.T) { } // Make the node inactive. - sh.ShardWriteFn = func(shardID, nodeID uint64, points []models.Point) error { + sh.ShardWriteFn = func(shardID, nodeID uint64, points [][]byte) error { t.Fatalf("write sent to inactive node") return nil } @@ -153,3 +159,32 @@ func TestNodeProcessorSendBlock(t *testing.T) { t.Fatalf("Node processor directory still present after purge") } } + +func TestNodeProcessorMarshalWrite(t *testing.T) { + expShardID := uint64(127) + expPointsStr := `cpu value1=1.0,value2=1.0,value3=3.0,value4=4,value5="five" 1000000000 +cpu,env=prod,host=serverA,region=us-west,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5,target=servers,zone=1c value=1i 1000000000` + points, _ := models.ParsePointsString(expPointsStr) + b := marshalWrite(expShardID, points) + + shardID, pts, err := unmarshalWrite(b) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if shardID != expShardID { + t.Fatalf("unexpected shardID: %d, exp: %d", shardID, expShardID) + } + + var lines []string + for _, pt := range pts { + p, err := models.NewPointFromBytes(pt) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + lines = append(lines, p.String()) + } + pointsStr := strings.Join(lines, "\n") + if pointsStr != expPointsStr { + t.Fatalf("unexpected points string: %s, exp: %s", pointsStr, expPointsStr) + } +} diff --git a/services/hh/queue.go b/services/hh/queue.go index b2e34de..6d0f8e8 100644 --- a/services/hh/queue.go +++ b/services/hh/queue.go @@ -1,22 +1,27 @@ package hh import ( + "bytes" "encoding/binary" "fmt" "io" "io/ioutil" "os" "path/filepath" + "sort" "strconv" "sync" "time" + + limit "github.com/influxdata/influxdb/pkg/limiter" ) // Possible errors returned by a hinted handoff queue. var ( - ErrNotOpen = fmt.Errorf("queue not open") - ErrQueueFull = fmt.Errorf("queue is full") - ErrSegmentFull = fmt.Errorf("segment is full") + ErrNotOpen = fmt.Errorf("queue not open") + ErrQueueBlocked = fmt.Errorf("queue is blocked") + ErrQueueFull = fmt.Errorf("queue is full") + ErrSegmentFull = fmt.Errorf("segment is full") ) const ( @@ -70,9 +75,13 @@ type queue struct { // an error maxSize int64 + // The limiter of incoming pending writes allowed in the queue + limiter limit.Fixed + // The segments that exist on disk segments segments } + type queuePos struct { head string tail string @@ -80,13 +89,18 @@ type queuePos struct { type segments []*segment +func (a segments) Len() int { return len(a) } +func (a segments) Less(i, j int) bool { return a[i].id < a[j].id } +func (a segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + // newQueue create a queue that will store segments in dir and that will // consume more than maxSize on disk. -func newQueue(dir string, maxSize int64) (*queue, error) { +func newQueue(dir string, maxSize int64, maxWrites int) (*queue, error) { return &queue{ dir: dir, maxSegmentSize: defaultSegmentSize, maxSize: maxSize, + limiter: limit.NewFixed(maxWrites), segments: segments{}, }, nil } @@ -263,7 +277,7 @@ func (l *queue) addSegment() (*segment, error) { return nil, err } - segment, err := newSegment(filepath.Join(l.dir, strconv.FormatUint(nextID, 10)), l.maxSegmentSize) + segment, err := newSegment(nextID, l.dir, l.maxSegmentSize) if err != nil { return nil, err } @@ -274,7 +288,7 @@ func (l *queue) addSegment() (*segment, error) { // loadSegments loads all segments on disk func (l *queue) loadSegments() (segments, error) { - segments := []*segment{} + var segments segments files, err := ioutil.ReadDir(l.dir) if err != nil { @@ -288,18 +302,19 @@ func (l *queue) loadSegments() (segments, error) { } // Segments file names are all numeric - _, err := strconv.ParseUint(segment.Name(), 10, 64) + id, err := strconv.ParseUint(segment.Name(), 10, 64) if err != nil { continue } - segment, err := newSegment(filepath.Join(l.dir, segment.Name()), l.maxSegmentSize) + segment, err := newSegment(id, l.dir, l.maxSegmentSize) if err != nil { return segments, err } segments = append(segments, segment) } + sort.Sort(segments) return segments, nil } @@ -333,6 +348,11 @@ func (l *queue) nextSegmentID() (uint64, error) { // Append appends a byte slice to the end of the queue func (l *queue) Append(b []byte) error { + if !l.limiter.TryTake() { + return ErrQueueBlocked + } + defer l.limiter.Release() + l.mu.Lock() defer l.mu.Unlock() @@ -344,15 +364,26 @@ func (l *queue) Append(b []byte) error { return ErrQueueFull } + buffered := len(l.limiter) >= 10 + defer func() { + if buffered && len(l.limiter) <= 1 { + l.tail.mu.Lock() + defer l.tail.mu.Unlock() + l.tail.flush() + } + }() + // Append the entry to the tail, if the segment is full, // try to create new segment and retry the append - if err := l.tail.append(b); err == ErrSegmentFull { + if err := l.tail.append(b, buffered); err == ErrSegmentFull { segment, err := l.addSegment() if err != nil { return err } l.tail = segment - return l.tail.append(b) + return l.tail.append(b, buffered) + } else if err != nil { + return err } return nil } @@ -366,6 +397,15 @@ func (l *queue) Current() ([]byte, error) { return l.head.current() } +// Truncate truncates the corrupt block in a corrupted segment to minimize data loss +func (l *queue) Truncate() error { + if l.head == nil { + return ErrNotOpen + } + + return l.head.truncate() +} + // Advance moves the head point to the next byte slice in the queue func (l *queue) Advance() error { l.mu.Lock() @@ -421,6 +461,8 @@ func (l *queue) trimHead() error { type segment struct { mu sync.RWMutex + id uint64 + buf *bytes.Buffer size int64 file *os.File path string @@ -430,7 +472,8 @@ type segment struct { maxSize int64 } -func newSegment(path string, maxSize int64) (*segment, error) { +func newSegment(id uint64, dir string, maxSize int64) (*segment, error) { + path := filepath.Join(dir, strconv.FormatUint(id, 10)) f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err @@ -441,10 +484,12 @@ func newSegment(path string, maxSize int64) (*segment, error) { return nil, err } - s := &segment{file: f, path: path, size: stats.Size(), maxSize: maxSize} + s := &segment{id: id, file: f, path: path, size: stats.Size(), maxSize: maxSize} if err := s.open(); err != nil { - return nil, err + if err := s.truncate(); err != nil && err != io.EOF { + return nil, err + } } return s, nil @@ -484,6 +529,7 @@ func (l *segment) open() error { l.pos = int64(pos) if err := l.seekToCurrent(); err != nil { + l.pos = 0 return err } @@ -501,7 +547,7 @@ func (l *segment) open() error { } // append adds byte slice to the end of segment -func (l *segment) append(b []byte) error { +func (l *segment) append(b []byte, buffered bool) error { l.mu.Lock() defer l.mu.Unlock() @@ -509,23 +555,54 @@ func (l *segment) append(b []byte) error { return ErrNotOpen } - if l.size+int64(len(b)) > l.maxSize { + if l.buf == nil { + l.buf = &bytes.Buffer{} + } + + if l.size+int64(l.buf.Len())+int64(len(b)) > l.maxSize { + if err := l.flush(); err != nil { + return err + } return ErrSegmentFull } - if err := l.seekEnd(-footerSize); err != nil { + if err := binary.Write(l.buf, binary.BigEndian, uint64(len(b))); err != nil { return err } - if err := l.writeUint64(uint64(len(b))); err != nil { + if _, err := l.buf.Write(b); err != nil { return err } - if err := l.writeBytes(b); err != nil { + if !buffered { + if err := l.flush(); err != nil { + return err + } + } + + return nil +} + +// flush flushes byte slice to the end of segment +func (l *segment) flush() error { + if l.buf == nil { + return nil + } + b := l.buf.Bytes() + if len(b) == 0 { + return nil + } + + if err := l.seekEnd(-footerSize); err != nil { return err } - if err := l.writeUint64(uint64(l.pos)); err != nil { + buf := bytes.NewBuffer(b) + if err := binary.Write(buf, binary.BigEndian, uint64(l.pos)); err != nil { + return err + } + + if err := l.writeBytes(buf.Bytes()); err != nil { return err } @@ -534,10 +611,11 @@ func (l *segment) append(b []byte) error { } if l.currentSize == 0 { - l.currentSize = int64(len(b)) + l.currentSize = int64(binary.BigEndian.Uint64(b[:8])) } - l.size += int64(len(b)) + 8 // uint64 for slice length + l.size += int64(len(b)) + l.buf = nil return nil } @@ -547,7 +625,7 @@ func (l *segment) current() ([]byte, error) { l.mu.Lock() defer l.mu.Unlock() - if int64(l.pos) == l.size-8 { + if int64(l.pos) == l.size-footerSize { return nil, io.EOF } @@ -574,6 +652,38 @@ func (l *segment) current() ([]byte, error) { return b, nil } +// truncate truncates the corrupt block in a corrupted segment +func (l *segment) truncate() error { + l.mu.Lock() + defer l.mu.Unlock() + + if int64(l.pos) == l.size-footerSize { + return io.EOF + } + + if err := l.seekToCurrent(); err != nil { + return err + } + + if err := l.writeUint64(uint64(l.pos)); err != nil { + return err + } + + size := int64(l.pos) + footerSize + if err := l.file.Truncate(size); err != nil { + return err + } + + if err := l.file.Sync(); err != nil { + return err + } + + l.currentSize = 0 + l.size = size + + return nil +} + // advance advances the current value pointer func (l *segment) advance() error { l.mu.Lock() @@ -659,7 +769,7 @@ func (l *segment) seekToCurrent() error { } func (l *segment) seek(pos int64) error { - n, err := l.file.Seek(pos, os.SEEK_SET) + n, err := l.file.Seek(pos, io.SeekStart) if err != nil { return err } @@ -672,7 +782,7 @@ func (l *segment) seek(pos int64) error { } func (l *segment) seekEnd(pos int64) error { - _, err := l.file.Seek(pos, os.SEEK_END) + _, err := l.file.Seek(pos, io.SeekEnd) if err != nil { return err } @@ -681,7 +791,7 @@ func (l *segment) seekEnd(pos int64) error { } func (l *segment) filePos() int64 { - n, _ := l.file.Seek(0, os.SEEK_CUR) + n, _ := l.file.Seek(0, io.SeekCurrent) return n } diff --git a/services/hh/queue_test.go b/services/hh/queue_test.go index fd9f5b1..19c266f 100644 --- a/services/hh/queue_test.go +++ b/services/hh/queue_test.go @@ -17,7 +17,7 @@ func BenchmarkQueueAppend(b *testing.B) { } defer os.RemoveAll(dir) - q, err := newQueue(dir, 1024*1024*1024) + q, err := newQueue(dir, 1024*1024*1024, 1024) if err != nil { b.Fatalf("failed to create queue: %v", err) } @@ -41,7 +41,7 @@ func TestQueueAppendOne(t *testing.T) { } defer os.RemoveAll(dir) - q, err := newQueue(dir, 1024) + q, err := newQueue(dir, 1024, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -82,7 +82,7 @@ func TestQueueAppendMultiple(t *testing.T) { } defer os.RemoveAll(dir) - q, err := newQueue(dir, 1024) + q, err := newQueue(dir, 1024, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -123,7 +123,7 @@ func TestQueueAdvancePastEnd(t *testing.T) { defer os.RemoveAll(dir) // create the queue - q, err := newQueue(dir, 1024) + q, err := newQueue(dir, 1024, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -198,7 +198,7 @@ func TestQueueFull(t *testing.T) { defer os.RemoveAll(dir) // create the queue - q, err := newQueue(dir, 10) + q, err := newQueue(dir, 10, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -220,7 +220,7 @@ func TestQueueReopen(t *testing.T) { defer os.RemoveAll(dir) // create the queue - q, err := newQueue(dir, 1024) + q, err := newQueue(dir, 1024, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } @@ -291,7 +291,7 @@ func TestPurgeQueue(t *testing.T) { defer os.RemoveAll(dir) // create the queue - q, err := newQueue(dir, 1024) + q, err := newQueue(dir, 1024, 1024) if err != nil { t.Fatalf("failed to create queue: %v", err) } diff --git a/services/hh/service.go b/services/hh/service.go index 6a5b70a..7b66c84 100644 --- a/services/hh/service.go +++ b/services/hh/service.go @@ -62,7 +62,7 @@ type Service struct { } type shardWriter interface { - WriteShard(shardID, ownerID uint64, points []models.Point) error + WriteShardBinary(shardID, ownerID uint64, points [][]byte) error } type metaClient interface { @@ -126,6 +126,14 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic { statistics = append(statistics, p.Statistics(nil)...) } } + for key := range statistics[0].Values { + if key == statWriteShardReq || key == statWriteShardReqPoints { + continue + } + for i := 1; i < len(statistics); i++ { + statistics[0].Values[key] = statistics[0].Values[key].(int64) + statistics[i].Values[key].(int64) + } + } return statistics }