Skip to content

Commit

Permalink
[chore][tracker]: save most recent (archive) write index to disk (#36799
Browse files Browse the repository at this point in the history
)

This PR stores the most recent index to disk. Much similar to what
happens for persistent queue. It also adds `Batch` methods to
`operator.Persister`, as saving the metadata and saving the index should
be a transaction and it can only be achieved via `Batch`.

For eg. if user has configured archiving to store 100 poll cycles, let's
assume:
- For first collector run, it stores 10 cycles and `archiveIndex` is 11
(pointing to the next index).
- When the collector is restarted, we will restore the `archiveIndex`
from disk and continue from index 11

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Closes
#32727

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added UT for checking index is in-bounds and to test archive
restoration.

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
VihasMakwana and djaglowski authored Feb 3, 2025
1 parent 116bafc commit 4543060
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 21 deletions.
6 changes: 3 additions & 3 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *Manager) Start(persister operator.Persister) error {
}

// instantiate the tracker
m.instantiateTracker(persister)
m.instantiateTracker(ctx, persister)

if persister != nil {
m.persister = persister
Expand Down Expand Up @@ -271,12 +271,12 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
return r, nil
}

func (m *Manager) instantiateTracker(persister operator.Persister) {
func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) {
var t tracker.Tracker
if m.noTracking {
t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles)
} else {
t = tracker.NewFileTracker(m.set, m.maxBatchFiles, m.pollsToArchive, persister)
t = tracker.NewFileTracker(ctx, m.set, m.maxBatchFiles, m.pollsToArchive, persister)
}
m.tracker = t
}
8 changes: 5 additions & 3 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/extension/xextension/storage"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -21,7 +23,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...*storage.Operation) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -37,8 +39,8 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M
errs = append(errs, fmt.Errorf("encode metadata: %w", err))
}
}

if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
ops = append(ops, storage.SetOperation(key, buf.Bytes()))
if err := persister.Batch(ctx, ops...); err != nil {
errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
180 changes: 167 additions & 13 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"bytes"
"context"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/xextension/storage"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
Expand All @@ -17,6 +20,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const (
archiveIndexKey = "knownFilesArchiveIndex"
archivePollsToArchiveKey = "knonwFilesPollsToArchive"
)

// Interface for tracking files that are being consumed.
type Tracker interface {
Add(reader *reader.Reader)
Expand Down Expand Up @@ -52,13 +60,14 @@ type fileTracker struct {
archiveIndex int
}

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
func NewFileTracker(ctx context.Context, set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker"))
return &fileTracker{

t := &fileTracker{
set: set,
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
Expand All @@ -68,6 +77,11 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA
persister: persister,
archiveIndex: 0,
}
if t.archiveEnabled() {
t.restoreArchiveIndex(ctx)
}

return t
}

func (t *fileTracker) Add(reader *reader.Reader) {
Expand Down Expand Up @@ -131,7 +145,9 @@ func (t *fileTracker) EndPoll() {
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
if t.archiveEnabled() {
t.archive(t.knownFiles[2])
}
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -144,6 +160,110 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) restoreArchiveIndex(ctx context.Context) {
// remove extra "keys" once archive restoration is done
defer t.removeExtraKeys(ctx)
defer func() {
// store current pollsToArchive
if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil {
t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err))
}
}()

previousPollsToArchive, err := t.getPreviousPollsToArchive(ctx)
if err != nil {
// if there's an error reading previousPollsToArchive, default to current value
previousPollsToArchive = t.pollsToArchive
}

t.archiveIndex, err = t.getArchiveIndex(ctx)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
return
}

if previousPollsToArchive < t.pollsToArchive {
// if archive size has increased, we just increment the index until we enconter a nil value
for t.archiveIndex < t.pollsToArchive && t.isSet(ctx, t.archiveIndex) {
t.archiveIndex++
}
} else if previousPollsToArchive > t.pollsToArchive {
// we will only attempt to rewrite archive if the archive size has shrunk
t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive")
t.rewriteArchive(ctx, previousPollsToArchive)
}
}

func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) {
// helper to rewrite data from oldIndex to newIndex
rewrite := func(newIdx, oldIdex int) error {
oldVal, err := t.persister.Get(ctx, archiveKey(oldIdex))
if err != nil {
return err
}
return t.persister.Set(ctx, archiveKey(newIdx), oldVal)
}
// Calculate the least recent index, w.r.t. new archive size

leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive)

// Refer archive.md for the detailed design
if mod(t.archiveIndex-1, previousPollsToArchive) > t.pollsToArchive {
for i := 0; i < t.pollsToArchive; i++ {
if err := rewrite(i, leastRecentIndex); err != nil {
t.set.Logger.Error("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
t.archiveIndex = 0
} else {
if !t.isSet(ctx, t.archiveIndex) {
// If the current index points at an unset key, no need to do anything
return
}
for i := 0; i < t.pollsToArchive-t.archiveIndex; i++ {
if err := rewrite(t.archiveIndex+i, leastRecentIndex); err != nil {
t.set.Logger.Warn("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
}
}

func (t *fileTracker) removeExtraKeys(ctx context.Context) {
for i := t.pollsToArchive; t.isSet(ctx, i); i++ {
if err := t.persister.Delete(ctx, archiveKey(i)); err != nil {
t.set.Logger.Error("error while cleaning extra keys", zap.Error(err))
}
}
}

func (t *fileTracker) getPreviousPollsToArchive(ctx context.Context) (int, error) {
byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err))
return 0, err
}
previousPollsToArchive, err := decodeIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err))
return 0, err
}
return previousPollsToArchive, nil
}

func (t *fileTracker) getArchiveIndex(ctx context.Context) (int, error) {
byteIndex, err := t.persister.Get(ctx, archiveIndexKey)
if err != nil {
return 0, err
}
archiveIndex, err := decodeIndex(byteIndex)
if err != nil {
return 0, err
}
return archiveIndex, nil
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
Expand All @@ -162,19 +282,17 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
return
}
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
index := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata
if err := t.writeArchive(index, metadata, indexOp); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index))
if err != nil {
return nil, err
}
Expand All @@ -184,9 +302,17 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata]
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...*storage.Operation) error {
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...)
}

func (t *fileTracker) archiveEnabled() bool {
return t.pollsToArchive > 0 && t.persister != nil
}

func (t *fileTracker) isSet(ctx context.Context, index int) bool {
val, err := t.persister.Get(ctx, archiveKey(index))
return val != nil && err == nil
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
Expand Down Expand Up @@ -295,3 +421,31 @@ func (t *noStateTracker) EndPoll() {}
func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }

func encodeIndex(val int) []byte {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the index
if err := enc.Encode(val); err != nil {
return nil
}
return buf.Bytes()
}

func decodeIndex(buf []byte) (int, error) {
var index int

// Decode the index
dec := json.NewDecoder(bytes.NewReader(buf))
err := dec.Decode(&index)
return max(index, 0), err
}

func archiveKey(i int) string {
return fmt.Sprintf("knownFiles%d", i)
}

func mod(x, y int) int {
return (x + y) % y
}
Loading

0 comments on commit 4543060

Please sign in to comment.