Skip to content

Commit

Permalink
feat: add option to flush WAL on shutdown
Browse files Browse the repository at this point in the history
Add `--storage-wal-flush-on-shutdown` to flush WAL on database shutdown.
On successful shutdown, all WAL data will be committed to TSM files and the
WAL directories will not contain any .wal files.

Clean cherry-pick of #25444 from main-2.x.

Closes: #25422
(cherry picked from commit 96bade4)
  • Loading branch information
gwossum committed Oct 10, 2024
1 parent 7483bea commit f04bdfa
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 68 deletions.
5 changes: 5 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
Default: o.StorageConfig.Data.WALMaxWriteDelay,
Desc: "The max amount of time a write will wait when the WAL already has `storage-wal-max-concurrent-writes` active writes. Set to 0 to disable the timeout.",
},
{
DestP: &o.StorageConfig.Data.WALFlushOnShutdown,
Flag: "storage-wal-flush-on-shutdown",
Desc: "Flushes and clears the WAL on shutdown",
},
{
DestP: &o.StorageConfig.Data.ValidateKeys,
Flag: "storage-validate-keys",
Expand Down
4 changes: 4 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type Config struct {
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`

// WALFlushOnShutdown determines if the WAL should be flushed when influxd is shutdown.
// This is useful in upgrade and downgrade scenarios to prevent WAL format compatibility issues.
WALFlushOnShutdown bool `toml:"wal-flush-on-shutdown"`

// Enables unicode validation on series keys on write.
ValidateKeys bool `toml:"validate-keys"`

Expand Down
34 changes: 13 additions & 21 deletions tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,29 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/stretchr/testify/require"

"github.com/influxdata/influxdb/v2/tsdb"
)

func TestConfig_Parse(t *testing.T) {
// Parse configuration.
c := tsdb.NewConfig()
if _, err := toml.Decode(`
_, err := toml.Decode(`
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "10s"
wal-flush-on-shutdown = true
tsm-use-madv-willneed = true
`, &c); err != nil {
t.Fatal(err)
}

if err := c.Validate(); err != nil {
t.Errorf("unexpected validate error: %s", err)
}

if got, exp := c.Dir, "/var/lib/influxdb/data"; got != exp {
t.Errorf("unexpected dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp {
t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() {
t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
if got, exp := c.TSMWillNeed, true; got != exp {
t.Errorf("unexpected tsm-madv-willneed:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
}
`, &c)
require.NoError(t, err)

require.NoError(t, c.Validate())
require.Equal(t, "/var/lib/influxdb/data", c.Dir)
require.Equal(t, "/var/lib/influxdb/wal", c.WALDir)
require.Equal(t, time.Duration(10*time.Second), time.Duration(c.WALFsyncDelay))
require.True(t, c.WALFlushOnShutdown)
require.True(t, c.TSMWillNeed)
}

func TestConfig_Validate_Error(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open(ctx context.Context) error
Close() error
Close(flush bool) error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error
Expand Down
67 changes: 48 additions & 19 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,25 +774,31 @@ func (e *Engine) Open(ctx context.Context) error {
}

// Close closes the engine. Subsequent calls to Close are a nop.
func (e *Engine) Close() error {
// If flush is true, then the WAL is flushed and cleared before the
// engine is closed.
func (e *Engine) Close(flush bool) error {
// Flushing the WAL involves writing a snapshot, which has to be done before
// compactions are disabled.
var flushErr error
if e.WALEnabled && flush {
// Even if the snapshot fails, we still have to proceed and close the engine.
flushErr = e.flushWAL()
}

e.SetCompactionsEnabled(false)

// Lock now and close everything else down.
e.mu.Lock()
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.

var err error = nil
err = e.fieldset.Close()
if err2 := e.FileStore.Close(); err2 != nil && err == nil {
err = err2
}
setCloseErr := e.fieldset.Close()
storeCloseErr := e.FileStore.Close()
var walCloseErr error
if e.WALEnabled {
if err2 := e.WAL.Close(); err2 != nil && err == nil {
err = err2
}
walCloseErr = e.WAL.Close()
}
return err
return errors.Join(flushErr, setCloseErr, storeCloseErr, walCloseErr)
}

// WithLogger sets the logger for the engine.
Expand Down Expand Up @@ -1835,7 +1841,11 @@ func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) err
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }

// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *Engine) WriteSnapshot() (err error) {
func (e *Engine) WriteSnapshot() error {
return e.doWriteSnapshot(false)
}

func (e *Engine) doWriteSnapshot(flush bool) (err error) {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot

Expand All @@ -1858,8 +1868,14 @@ func (e *Engine) WriteSnapshot() (err error) {
defer e.mu.Unlock()

if e.WALEnabled {
if err = e.WAL.CloseSegment(); err != nil {
return
if !flush {
if err = e.WAL.CloseSegment(); err != nil {
return
}
} else {
if err = e.WAL.CloseAllSegments(); err != nil {
return
}
}

segments, err = e.WAL.ClosedSegments()
Expand Down Expand Up @@ -1897,17 +1913,30 @@ func (e *Engine) WriteSnapshot() (err error) {
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}

// flushWAL flushes the WAL and empties the WAL directory.
func (e *Engine) flushWAL() error {
return e.writeSnapshotWithRetries(true)
}

// writeSnapshotWithRetries calls WriteSnapshot and will retry with a backoff if WriteSnapshot
// fails with ErrSnapshotInProgress. If flush is true then no new WAL segments are opened so
// that the WAL has no segment files on success.
func (e *Engine) writeSnapshotWithRetries(flush bool) error {
err := e.doWriteSnapshot(flush)
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
err = e.doWriteSnapshot(flush)
}
return err
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress.
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
err := e.WriteSnapshot()
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
err = e.WriteSnapshot()
}
err := e.writeSnapshotWithRetries(false)
if err == ErrSnapshotInProgress && skipCacheOk {
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
} else if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
t.Cleanup(func() { idx.Close() })

e := tsm1.NewEngine(1, idx, dir, walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
t.Cleanup(func() { e.Close() })
t.Cleanup(func() { e.Close(false) })

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
Expand Down Expand Up @@ -2644,7 +2644,7 @@ func (e *Engine) close(cleanup bool) error {
os.RemoveAll(e.root)
}
}()
return e.Engine.Close()
return e.Engine.Close(false)
}

// Reopen closes and reopens the engine.
Expand Down
23 changes: 20 additions & 3 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ func (l *WAL) CloseSegment() error {
return nil
}

// CloseAllSegments closes the current segment regardless of whether it contains data
// and does not open a new one.
func (l *WAL) CloseAllSegments() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.closeCurrentSegmentFile()
}

// Delete deletes the given keys, returning the segment ID for the operation.
func (l *WAL) Delete(ctx context.Context, keys [][]byte) (int, error) {
if len(keys) == 0 {
Expand Down Expand Up @@ -634,15 +642,24 @@ func segmentFileNames(dir string) ([]string, error) {
return names, nil
}

// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
func (l *WAL) newSegmentFile() error {
l.currentSegmentID++
// closeCurrentSegmentFile will close the current segment file. l.mu must be held before calling this method.
func (l *WAL) closeCurrentSegmentFile() error {
if l.currentSegmentWriter != nil {
l.sync()

if err := l.currentSegmentWriter.close(); err != nil {
return err
}
l.currentSegmentWriter = nil
}
return nil
}

// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
func (l *WAL) newSegmentFile() error {
l.currentSegmentID++
if err := l.closeCurrentSegmentFile(); err != nil {
return err
}

fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
Expand Down
29 changes: 21 additions & 8 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (s *Shard) openNoLock(ctx context.Context) (bool, error) {

return nil
}(); err != nil {
s.closeNoLock()
s.closeNoLock(false)
return true, NewShardError(s.id, err)
}

Expand All @@ -508,12 +508,18 @@ func (s *Shard) openNoLock(ctx context.Context) (bool, error) {
return false, nil
}

// Close shuts down the shard's store.
func (s *Shard) Close() error {
// FlushAndClose flushes the shard's WAL and then closes down the shard's store.
func (s *Shard) FlushAndClose() error {
return s.closeAndWait(true)
}

// closeAndWait shuts down the shard's store and waits for the close to complete.
// If flush is true, the WAL is flushed and cleared before closing.
func (s *Shard) closeAndWait(flush bool) error {
err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.closeNoLock()
return s.closeNoLock(flush)
}()
// make sure not to hold a lock while waiting for close to finish
werr := s.closeWait()
Expand All @@ -522,12 +528,19 @@ func (s *Shard) Close() error {
return err
}
return werr

}

// Close shuts down the shard's store.
func (s *Shard) Close() error {
return s.closeAndWait(false)
}

// closeNoLock closes the shard an removes reference to the shard from associated
// indexes. The s.mu mutex must be held before calling closeNoLock. closeWait should always
// indexes. If flush is true, the WAL is flushed and cleared before closing.
// The s.mu mutex must be held before calling closeNoLock. closeWait should always
// be called after calling closeNoLock.
func (s *Shard) closeNoLock() error {
func (s *Shard) closeNoLock(flush bool) error {
if s._engine == nil {
return nil
}
Expand All @@ -536,7 +549,7 @@ func (s *Shard) closeNoLock() error {
close(s.metricUpdater.closing)
}

err := s._engine.Close()
err := s._engine.Close(flush)
if err == nil {
s._engine = nil
}
Expand Down Expand Up @@ -1271,7 +1284,7 @@ func (s *Shard) Restore(ctx context.Context, r io.Reader, basePath string) error

// Close shard.
closeWaitNeeded = true // about to call closeNoLock, closeWait will be needed
if err := s.closeNoLock(); err != nil {
if err := s.closeNoLock(false); err != nil {
return closeWaitNeeded, err
}
return closeWaitNeeded, nil
Expand Down
6 changes: 5 additions & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,11 @@ func (s *Store) Close() error {

// Close all the shards in parallel.
if err := s.walkShards(s.shardsSlice(), func(sh *Shard) error {
return sh.Close()
if s.EngineOptions.Config.WALFlushOnShutdown {
return sh.FlushAndClose()
} else {
return sh.Close()
}
}); err != nil {
return err
}
Expand Down
Loading

0 comments on commit f04bdfa

Please sign in to comment.