Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add --pid-file option to write PID files #25474

Open
wants to merge 4 commits into
base: main-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ type InfluxdOpts struct {
TracingType string
ReportingDisabled bool

PIDFile string
OverwritePIDFile bool

AssetsPath string
BoltPath string
SqLitePath string
Expand Down Expand Up @@ -213,6 +216,9 @@ func NewOpts(viper *viper.Viper) *InfluxdOpts {
FluxLogEnabled: false,
ReportingDisabled: false,

PIDFile: "",
OverwritePIDFile: false,

BoltPath: filepath.Join(dir, bolt.DefaultFilename),
SqLitePath: filepath.Join(dir, sqlite.DefaultFilename),
EnginePath: filepath.Join(dir, "engine"),
Expand Down Expand Up @@ -325,6 +331,18 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
Default: o.ReportingDisabled,
Desc: "disable sending telemetry data to https://telemetry.influxdata.com every 8 hours",
},
{
DestP: &o.PIDFile,
Flag: "pid-file",
Default: o.PIDFile,
Desc: "write process ID to a file",
},
{
DestP: &o.OverwritePIDFile,
Flag: "overwrite-pid-file",
Default: o.OverwritePIDFile,
Desc: "overwrite PID file if it already exists instead of exiting",
},
{
DestP: &o.SessionLength,
Flag: "session-length",
Expand Down
79 changes: 79 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"crypto/tls"
"errors"
"fmt"
"io/fs"
"net"
nethttp "net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -109,6 +111,11 @@ const (
JaegerTracing = "jaeger"
)

var (
// ErrPIDFileExists indicates that a PID file already exists.
ErrPIDFileExists = errors.New("PID file exists (possible unclean shutdown or another instance already running)")
)

type labeledCloser struct {
label string
closer func(context.Context) error
Expand Down Expand Up @@ -248,6 +255,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
}
}

if err := m.writePIDFile(opts.PIDFile, opts.OverwritePIDFile); err != nil {
return fmt.Errorf("error writing PIDFile %q: %w", opts.PIDFile, err)
}

m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry")))
m.reg.MustRegister(collectors.NewGoCollector())

Expand Down Expand Up @@ -970,6 +981,74 @@ func (m *Launcher) initTracing(opts *InfluxdOpts) {
}
}

// writePIDFile will write the process ID to pidFilename and register a cleanup function to delete it during
// shutdown. If pidFilename is empty, then no PID file is written and no cleanup function is registered.
// If pidFilename already exists and overwrite is false, then pidFilename is not overwritten and a
// ErrPIDFileExists error is returned. If pidFilename already exists and overwrite is true, then pidFilename
// will be overwritten but a warning will be logged.
func (m *Launcher) writePIDFile(pidFilename string, overwrite bool) error {
if pidFilename == "" {
return nil
}

// Create directory to PIDfile if needed.
if err := os.MkdirAll(filepath.Dir(pidFilename), 0777); err != nil {
return fmt.Errorf("mkdir: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add the path in the fmt.Errorf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context of the PID filename is added by the caller:

if err := m.writePIDFile(opts.PIDFile); err != nil {
return fmt.Errorf("error writing PIDFile %q: %w", opts.PIDFile, err)
}

I figured this was acceptable elimination of code inside writePIDFile since it is a private, single-use method. For a public method I definitely would have added the filename into each message.

}

// Write PID to file, but don't clobber an existing PID file.
pidBytes := []byte(strconv.Itoa(os.Getpid()))
pidMode := fs.FileMode(0666)
openFlags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
pidFile, err := os.OpenFile(pidFilename, openFlags|os.O_EXCL, pidMode)
if err != nil {
if !errors.Is(err, fs.ErrExist) {
return fmt.Errorf("open file: %w", err)
}
if !overwrite {
return ErrPIDFileExists
} else {
m.log.Warn("PID file already exists, attempting to overwrite", zap.String("pidFile", pidFilename))
pidFile, err = os.OpenFile(pidFilename, openFlags, pidMode)
if err != nil {
return fmt.Errorf("overwrite file: %w", err)
}
}
}
_, writeErr := pidFile.Write(pidBytes) // Contract says Write must return an error if count < len(pidBytes).
closeErr := pidFile.Close() // always close the file
if writeErr != nil || closeErr != nil {
var errs []error
if writeErr != nil {
errs = append(errs, fmt.Errorf("write file: %w", writeErr))
}
if closeErr != nil {
errs = append(errs, fmt.Errorf("close file: %w", closeErr))
}

// Let's make sure we don't leave a PID file behind on error.
removeErr := os.Remove(pidFilename)
if removeErr != nil {
errs = append(errs, fmt.Errorf("remove file: %w", removeErr))
}

return errors.Join(errs...)
}

// Add a cleanup function.
m.closers = append(m.closers, labeledCloser{
label: "pidfile",
closer: func(context.Context) error {
if err := os.Remove(pidFilename); err != nil {
return fmt.Errorf("removing PID file %q: %w", pidFilename, err)
}
return nil
},
})

return nil
}

// openMetaStores opens the embedded DBs used to store metadata about influxd resources, migrating them to
// the latest schema expected by the server.
// On success, a unique ID is returned to be used as an identifier for the influxd instance in telemetry.
Expand Down
7 changes: 6 additions & 1 deletion cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type TestLauncher struct {
Bucket *influxdb.Bucket
Auth *influxdb.Authorization

Logger *zap.Logger

httpClient *httpc.Client
apiClient *api.APIClient

Expand Down Expand Up @@ -146,7 +148,10 @@ func (tl *TestLauncher) Run(tb zaptest.TestingT, ctx context.Context, setters ..
}

// Set up top-level logger to write into the test-case.
tl.Launcher.log = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name()))
if tl.Logger == nil {
tl.Logger = zaptest.NewLogger(tb, zaptest.Level(opts.LogLevel)).With(zap.String("test_name", tb.Name()))
}
tl.Launcher.log = tl.Logger
return tl.Launcher.run(ctx, opts)
}

Expand Down
132 changes: 132 additions & 0 deletions cmd/influxd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ package launcher_test
import (
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
nethttp "net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"testing"
"time"

Expand All @@ -14,6 +20,10 @@ import (
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/tenant"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

// Default context.
Expand Down Expand Up @@ -164,3 +174,125 @@ func TestLauncher_PingHeaders(t *testing.T) {
assert.Equal(t, []string{"OSS"}, resp.Header.Values("X-Influxdb-Build"))
assert.Equal(t, []string{"dev"}, resp.Header.Values("X-Influxdb-Version"))
}

func TestLauncher_PIDFile(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")

l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
})
defer func() {
l.ShutdownOrFail(t, ctx)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran in to a similar thing as this recently: #25398 (comment)

I think that this method call ShutdownOrFail will get called immediately? Might be good to check but this may not be the case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's inside a lambda, so it will not be evaluated until the defer calls the lambda. Also note that if it got called immediately the require.FileExists() on line 184 would fail because the PID file gets removed when l.ShutdownOrFail(t, ctx) is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devanbenz - the arguments to a deferred function are evaluated on the execution of the defer line, and those evaluated arguments are stored for the eventual call, but the deferred call itself doesn't happen until after the end of the closing method.

require.NoFileExists(t, pidFilename)
}()

require.FileExists(t, pidFilename)
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(os.Getpid()), string(pidBytes))
}

func TestLauncher_PIDFile_Locked(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher.
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)
origSt, err := os.Stat(pidFilename)
require.NoError(t, err)

// Make sure we get an error about the PID file from the launcher
l := launcher.NewTestLauncher()
err = l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.FileExists(t, pidFilename)
contents, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, lockContents, contents)
curSt, err := os.Stat(pidFilename)
require.NoError(t, err)

// CircleCI test runners for darwin don't have `noatime` / `relatime`, so
// the atime will differ, which is inside the system specific data.
if runtime.GOOS != "darwin" {
require.Equal(t, origSt, curSt)
}
}()

require.ErrorIs(t, err, launcher.ErrPIDFileExists)
require.ErrorContains(t, err, fmt.Sprintf("error writing PIDFile %q: PID file exists (possible unclean shutdown or another instance already running)", pidFilename))
}

func TestLauncher_PIDFile_Overwrite(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher (or not in this case).
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)

// Make sure we get an error about the PID file from the launcher.
l := launcher.NewTestLauncher()
loggerCore, ol := observer.New(zap.WarnLevel)
l.Logger = zap.New(loggerCore)
err := l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
o.OverwritePIDFile = true
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.NoFileExists(t, pidFilename)
}()
require.NoError(t, err)

expLogs := []observer.LoggedEntry{
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "PID file already exists, attempting to overwrite"},
Context: []zapcore.Field{zap.String("pidFile", pidFilename)},
},
}
require.Equal(t, expLogs, ol.AllUntimed())
require.FileExists(t, pidFilename)
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(os.Getpid()), string(pidBytes))
}

func TestLauncher_PIDFile_OverwriteFail(t *testing.T) {
pidDir := t.TempDir()
pidFilename := filepath.Join(pidDir, "influxd.pid")
lockContents := []byte("foobar") // something wouldn't appear in normal lock file

// Write PID file to lock out the launcher.
require.NoError(t, os.WriteFile(pidFilename, lockContents, 0666))
require.FileExists(t, pidFilename)
require.NoError(t, os.Chmod(pidFilename, 0000))

// Make sure we get an error about the PID file from the launcher
l := launcher.NewTestLauncher()
err := l.Run(t, ctx, func(o *launcher.InfluxdOpts) {
o.PIDFile = pidFilename
o.OverwritePIDFile = true
})
defer func() {
l.ShutdownOrFail(t, ctx)

require.FileExists(t, pidFilename)
require.NoError(t, os.Chmod(pidFilename, 0644))
pidBytes, err := os.ReadFile(pidFilename)
require.NoError(t, err)
require.Equal(t, lockContents, pidBytes)
}()

require.ErrorContains(t, err, fmt.Sprintf("error writing PIDFile %[1]q: overwrite file: open %[1]s:", pidFilename))
require.ErrorIs(t, err, fs.ErrPermission)
}