diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 36f435a912d..f4fe90cee3d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] - Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] - Log bad handshake details when websocket connection fails {pull}41300[41300] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] +- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244] *Heartbeat* diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 209a2e2dfd8..57f75163e92 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -27,7 +27,7 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -107,7 +107,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) { e.t.Helper() msg := strings.Builder{} fmt.Fprintf(&msg, "did not find the expected %d events", count) - assert.Eventually(e.t, func() bool { + require.Eventually(e.t, func() bool { sum := len(e.pipeline.GetAllEvents()) if sum == count { return true diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index c9ddec9c046..1aa58d1f8bc 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -274,9 +274,11 @@ func TestInputSeek(t *testing.T) { env.waitUntilEventCount(len(testCase.expectedMessages)) - for idx, event := range env.pipeline.GetAllEvents() { - if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected { - t.Fatalf("expecting event message %q, got %q", expected, got) + if !t.Failed() { + for idx, event := range env.pipeline.GetAllEvents() { + if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected { + t.Fatalf("expecting event message %q, got %q", expected, got) + } } } }) diff --git a/filebeat/input/journald/input_parsers_test.go b/filebeat/input/journald/input_parsers_test.go index 720f53b8ce8..c1c2c6f6bb5 100644 --- a/filebeat/input/journald/input_parsers_test.go +++ b/filebeat/input/journald/input_parsers_test.go @@ -31,31 +31,41 @@ import ( // it only tests a single parser, but that is enough to ensure // we're correctly using the parsers func TestInputParsers(t *testing.T) { - inputParsersExpected := []string{"1st line\n2nd line\n3rd line", "4th line\n5th line\n6th line"} env := newInputTestingEnvironment(t) - inp := env.mustCreateInput(mapstr.M{ - "paths": []string{path.Join("testdata", "input-multiline-parser.journal")}, - "include_matches.match": []string{"_SYSTEMD_USER_UNIT=log-service.service"}, + "paths": []string{path.Join("testdata", "ndjson-parser.journal")}, "parsers": []mapstr.M{ { - "multiline": mapstr.M{ - "type": "count", - "count_lines": 3, + "ndjson": mapstr.M{ + "target": "", }, }, }, }) ctx, cancelInput := context.WithCancel(context.Background()) + t.Cleanup(cancelInput) env.startInput(ctx, inp) - env.waitUntilEventCount(len(inputParsersExpected)) + env.waitUntilEventCount(1) + event := env.pipeline.clients[0].GetEvents()[0] + + foo, isString := event.Fields["foo"].(string) + if !isString { + t.Errorf("expecting field 'foo' to be string, got %T", event.Fields["foo"]) + } - for idx, event := range env.pipeline.clients[0].GetEvents() { - if got, expected := event.Fields["message"], inputParsersExpected[idx]; got != expected { - t.Errorf("expecting event message %q, got %q", expected, got) - } + answer, isInt := event.Fields["answer"].(int64) + if !isInt { + t.Errorf("expecting field 'answer' to be int64, got %T", event.Fields["answer"]) } - cancelInput() + // The JSON in the test journal is: '{"foo": "bar", "answer":42}' + expectedFoo := "bar" + expectedAnswer := int64(42) + if foo != expectedFoo { + t.Errorf("expecting 'foo' from the Journal JSON to be '%s' got '%s' instead", expectedFoo, foo) + } + if answer != expectedAnswer { + t.Errorf("expecting 'answer' from the Journal JSON to be '%d' got '%d' instead", expectedAnswer, answer) + } } diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go index 09dd8d1a485..b82663c5262 100644 --- a/filebeat/input/journald/input_test.go +++ b/filebeat/input/journald/input_test.go @@ -39,59 +39,19 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -// How to write to journal from CLI: -// https://www.baeldung.com/linux/systemd-journal-message-terminal +func TestInputCanReadAllBoots(t *testing.T) { + env := newInputTestingEnvironment(t) + cfg := mapstr.M{ + "paths": []string{path.Join("testdata", "multiple-boots.journal")}, + } + inp := env.mustCreateInput(cfg) -// TestGenerateJournalEntries generates entries in the user's journal. -// It is kept commented out at the top of the file as reference and -// easy access. -// -// How to generate a journal file with only the entries you want: -// 1. Add the dependencies for this test -// go get github.com/ssgreg/journald -// 2. Uncomment and run the test: -// 3. Add the following import: -// journaldlogger "github.com/ssgreg/journald" -// 4. Get a VM, ssh into it, make sure you can access the test from it -// 5. Find the journal file, usually at /var/log/journal//user-1000.journal -// 7. Clean and rotate the journal -// sudo journalctl --vacuum-time=1s -// sudo journalctl --rotate -// 8. Run this test: `go test -run=TestGenerateJournalEntries` -// 9. Copy the journal file somewhere else -// cp /var/log/journal/21282bcb80a74c08a0d14a047372256c/user-1000.journal /tmp/foo.journal -// 10. Read the journal file: -// journalctl --file=/tmp/foo.journal -n 10 -// 11. Read the journal with all fields as JSON -// journalctl --file=/tmp/foo.journal -n 10 -o json -// func TestGenerateJournalEntries(t *testing.T) { -// fields := []map[string]any{ -// { -// "BAR": "bar", -// }, -// { -// "FOO": "foo", -// }, -// { -// "BAR": "bar", -// "FOO": "foo", -// }, -// { -// "FOO_BAR": "foo", -// }, -// { -// "FOO_BAR": "bar", -// }, -// { -// "FOO_BAR": "foo bar", -// }, -// } -// for i, m := range fields { -// if err := journaldlogger.Send(fmt.Sprintf("message %d", i), journaldlogger.PriorityInfo, m); err != nil { -// t.Fatal(err) -// } -// } -// } + ctx, cancelInput := context.WithCancel(context.Background()) + t.Cleanup(cancelInput) + + env.startInput(ctx, inp) + env.waitUntilEventCount(6) +} func TestInputFieldsTranslation(t *testing.T) { // A few random keys to verify diff --git a/filebeat/input/journald/pkg/journalctl/jctlmock_test.go b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go index c9244a5fa43..4f113d36f10 100644 --- a/filebeat/input/journald/pkg/journalctl/jctlmock_test.go +++ b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go @@ -39,7 +39,7 @@ var _ Jctl = &JctlMock{} // KillFunc: func() error { // panic("mock out the Kill method") // }, -// NextFunc: func(canceler input.Canceler) ([]byte, error) { +// NextFunc: func(canceler input.Canceler) ([]byte, bool, error) { // panic("mock out the Next method") // }, // } @@ -53,7 +53,7 @@ type JctlMock struct { KillFunc func() error // NextFunc mocks the Next method. - NextFunc func(canceler input.Canceler) ([]byte, error) + NextFunc func(canceler input.Canceler) ([]byte, bool, error) // calls tracks calls to the methods. calls struct { @@ -98,7 +98,7 @@ func (mock *JctlMock) KillCalls() []struct { } // Next calls NextFunc. -func (mock *JctlMock) Next(canceler input.Canceler) ([]byte, error) { +func (mock *JctlMock) Next(canceler input.Canceler) ([]byte, bool, error) { if mock.NextFunc == nil { panic("JctlMock.NextFunc: method is nil but Jctl.Next was just called") } diff --git a/filebeat/input/journald/pkg/journalctl/journalctl.go b/filebeat/input/journald/pkg/journalctl/journalctl.go index 54bcb208b82..c0c21332965 100644 --- a/filebeat/input/journald/pkg/journalctl/journalctl.go +++ b/filebeat/input/journald/pkg/journalctl/journalctl.go @@ -24,6 +24,7 @@ import ( "io" "os/exec" "strings" + "sync" input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" @@ -37,6 +38,7 @@ type journalctl struct { logger *logp.Logger canceler input.Canceler + waitDone sync.WaitGroup } // Factory returns an instance of journalctl ready to use. @@ -95,7 +97,7 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args data, err := reader.ReadBytes('\n') if err != nil { if !errors.Is(err, io.EOF) { - logger.Errorf("cannot read from journalctl stdout: %s", err) + logger.Errorf("cannot read from journalctl stdout: '%s'", err) } return } @@ -118,10 +120,13 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args // Whenever the journalctl process exits, the `Wait` call returns, // if there was an error it is logged and this goroutine exits. + jctl.waitDone.Add(1) go func() { + defer jctl.waitDone.Done() if err := cmd.Wait(); err != nil { jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode()) } + jctl.logger.Debugf("journalctl exit code: %d", cmd.ProcessState.ExitCode()) }() return &jctl, nil @@ -130,18 +135,31 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args // Kill Terminates the journalctl process using a SIGKILL. func (j *journalctl) Kill() error { j.logger.Debug("sending SIGKILL to journalctl") - err := j.cmd.Process.Kill() - return err + return j.cmd.Process.Kill() } -func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) { +// Next returns the next journal entry (as JSON). If `finished` is true, then +// journalctl finished returning all data and exited successfully, if journalctl +// exited unexpectedly, then `err` is non-nil, `finished` is false and an empty +// byte array is returned. +func (j *journalctl) Next(cancel input.Canceler) ([]byte, bool, error) { select { case <-cancel.Done(): - return []byte{}, ErrCancelled + return []byte{}, false, ErrCancelled case d, open := <-j.dataChan: if !open { - return []byte{}, errors.New("no more data to read, journalctl might have exited unexpectedly") + // Wait for the process to exit, so we can read the exit code. + j.waitDone.Wait() + if j.cmd.ProcessState.ExitCode() == 0 { + return []byte{}, true, nil + } + return []byte{}, + false, + fmt.Errorf( + "no more data to read, journalctl exited unexpectedly, exit code: %d", + j.cmd.ProcessState.ExitCode()) } - return d, nil + + return d, false, nil } } diff --git a/filebeat/input/journald/pkg/journalctl/reader.go b/filebeat/input/journald/pkg/journalctl/reader.go index b530e942b23..5e8ef54f543 100644 --- a/filebeat/input/journald/pkg/journalctl/reader.go +++ b/filebeat/input/journald/pkg/journalctl/reader.go @@ -58,10 +58,25 @@ type JctlFactory func(canceller input.Canceler, logger *logp.Logger, binary stri // //go:generate moq --fmt gofmt -out jctlmock_test.go . Jctl type Jctl interface { - Next(input.Canceler) ([]byte, error) + // Next returns the next journal entry. If there is no entry available + // next will block until there is an entry or cancel is cancelled. + // + // If cancel is cancelled, Next returns a zero value JournalEntry + // and ErrCancelled. + // + // If finished is true, then journalctl returned all messages + // and exited successfully + Next(input.Canceler) (data []byte, finished bool, err error) Kill() error } +type readerState uint8 + +const ( + readingOldEntriesState readerState = iota + followingState +) + // Reader reads entries from journald by calling `jouranlctl` // and reading its output. // @@ -74,36 +89,55 @@ type Jctl interface { // More details can be found in the PR introducing this feature and related // issues. PR: https://github.com/elastic/beats/pull/40061. type Reader struct { - args []string + // logger is the logger for the reader + logger *logp.Logger + + // jctlLogger is the logger for the code controlling + // the journalctl process + jctlLogger *logp.Logger + + // args are arguments for journalctl that never change, + // like the message filters, format, etc + args []string + + // firstRunArgs are the arguments used in the first call to + // journalctl that will be replaced by the cursor argument + // once data has been ingested + firstRunArgs []string + + // cursor is the jornalctl cursor, it is also stored in Filebeat's registry cursor string - logger *logp.Logger canceler input.Canceler jctl Jctl jctlFactory JctlFactory - backoff backoff.Backoff + backoff backoff.Backoff + seekMode SeekMode + state readerState } // handleSeekAndCursor returns the correct arguments for seek and cursor. // If there is a cursor, only the cursor is used, seek is ignored. // If there is no cursor, then seek is used -func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) []string { +// The bool parameter indicates whether there might be messages from +// the previous boots +func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) ([]string, bool) { if cursor != "" { - return []string{"--after-cursor", cursor} + return []string{"--after-cursor", cursor}, true } switch mode { case SeekSince: - return []string{"--since", time.Now().Add(since).Format(sinceTimeFormat)} + return []string{"--since", time.Now().Add(since).Format(sinceTimeFormat)}, true case SeekTail: - return []string{"--since", "now"} + return []string{"--since", "now"}, false case SeekHead: - return []string{"--no-tail"} + return []string{"--no-tail"}, true default: // That should never happen - return []string{} + return []string{}, false } } @@ -146,7 +180,9 @@ func New( ) (*Reader, error) { logger = logger.Named("reader") - args := []string{"--utc", "--output=json", "--follow"} + + args := []string{"--utc", "--output=json", "--no-pager"} + if file != "" && file != localSystemJournalID { args = append(args, "--file", file) } @@ -171,26 +207,43 @@ func New( args = append(args, "--facility", fmt.Sprintf("%d", facility)) } - otherArgs := handleSeekAndCursor(mode, since, cursor) - - jctl, err := newJctl(canceler, logger.Named("journalctl-runner"), "journalctl", append(args, otherArgs...)...) - if err != nil { - return &Reader{}, err + firstRunArgs, prevBoots := handleSeekAndCursor(mode, since, cursor) + state := readingOldEntriesState // Initial state + if !prevBoots { + state = followingState } r := Reader{ - args: args, - cursor: cursor, - jctl: jctl, - logger: logger, + logger: logger, + jctlLogger: logger.Named("journalctl-runner"), + + args: args, + firstRunArgs: firstRunArgs, + + state: state, + cursor: cursor, + canceler: canceler, jctlFactory: newJctl, backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second), } + if err := r.newJctl(firstRunArgs...); err != nil { + return &Reader{}, err + } + return &r, nil } +func (r *Reader) newJctl(extraArgs ...string) error { + args := append(r.args, extraArgs...) + + jctl, err := r.jctlFactory(r.canceler, r.jctlLogger, "journalctl", args...) + r.jctl = jctl + + return err +} + // Close stops the `journalctl` process and waits for all // goroutines to return, the canceller passed to `New` should // be cancelled before `Close` is called @@ -210,25 +263,49 @@ func (r *Reader) Close() error { // If cancel is cancelled, Next returns a zero value JournalEntry // and ErrCancelled. func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) { - d, err := r.jctl.Next(cancel) + msg, finished, err := r.jctl.Next(cancel) // Check if the input has been cancelled select { case <-cancel.Done(): - // Input has been cancelled, ignore the message? - return JournalEntry{}, err + // The caller is responsible for calling Reader.Close to terminate + // journalctl. Cancelling this canceller only means this Next call was + // cancelled. Because the input has been cancelled, we ignore the message + // and any error it might have returned. + return JournalEntry{}, ErrCancelled default: - // Two options: - // - No error, go parse the message - // - Error, journalctl is not running any more, restart it + // Three options: + // - Journalctl finished reading messages from previous boots + // successfully, restart it with --follow flag. + // - Error, journalctl exited with an error, restart it in the same + // mode it was running. + // - No error, skip the default block and go parse the message + + var extraArgs []string + var restart bool + + // First of all: handle the error, if any if err != nil { r.logger.Warnf("reader error: '%s', restarting...", err) - // Copy r.args and if needed, add the cursor flag - args := append([]string{}, r.args...) - if r.cursor != "" { - args = append(args, "--after-cursor", r.cursor) + restart = true + + if r.cursor == "" && r.state == readingOldEntriesState { + // Corner case: journalctl exited with an error before reading the + // 1st message. This means we don't have a cursor and need to restart + // it with the initial arguments. + extraArgs = append(extraArgs, r.firstRunArgs...) + } else if r.cursor != "" { + // There is a cursor, so just append it to our arguments + extraArgs = append(extraArgs, "--after-cursor", r.cursor) + + // Last, but not least, add "--follow" if we're in following mode + if r.state == followingState { + extraArgs = append(extraArgs, "--follow") + } } + // Handle backoff + // // If the last restart (if any) was more than 5s ago, // recreate the backoff and do not wait. // We recreate the backoff so r.backoff.Last().IsZero() @@ -239,49 +316,91 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) { } else { r.backoff.Wait() } + } + + // If journalctl finished reading the messages from previous boots + // and exited successfully + if finished { + restart = true + extraArgs = append(extraArgs, "--follow") + if r.cursor != "" { + // If there is a cursor, only use the cursor and the follow argument + extraArgs = append(extraArgs, "--after-cursor", r.cursor) + } else { + // If there is no cursor, it means the first successfully run + // did not return any event, so we have to restart with the + // --follow and all the initial args. + + extraArgs = append(extraArgs, r.firstRunArgs...) + } + + r.state = followingState + r.logger.Info("finished reading journal entries from all boots, restarting journalctl with follow flag") + } - jctl, err := r.jctlFactory(r.canceler, r.logger.Named("journalctl-runner"), "journalctl", args...) - if err != nil { + // Restart journalctl if needed + if restart { + if err := r.newJctl(extraArgs...); err != nil { // If we cannot restart journalct, there is nothing we can do. return JournalEntry{}, fmt.Errorf("cannot restart journalctl: %w", err) } - r.jctl = jctl // Return an empty message and wait for the input to call us again return JournalEntry{}, ErrRestarting } } + return r.handleMessage(msg) +} + +func (r *Reader) handleMessage(msg []byte) (JournalEntry, error) { fields := map[string]any{} - if err := json.Unmarshal(d, &fields); err != nil { - r.logger.Error("journal event cannot be parsed as map[string]any, look at the events log file for the raw journal event") + if err := json.Unmarshal(msg, &fields); err != nil { + r.logger.Error("journal event cannot be parsed as map[string]any, " + + "look at the events log file for the raw journal event") + // Log raw data to events log file - msg := fmt.Sprintf("data cannot be parsed as map[string]any JSON: '%s'", string(d)) - r.logger.Errorw(msg, logp.TypeKey, logp.EventType, "error.message", err.Error()) + msg := fmt.Sprintf("data cannot be parsed as map[string]any. Data: '%s'", + string(msg)) + r.logger.Errorw( + msg, + "error.message", err.Error(), + logp.TypeKey, logp.EventType) + return JournalEntry{}, fmt.Errorf("cannot decode Journald JSON: %w", err) } ts, isString := fields["__REALTIME_TIMESTAMP"].(string) if !isString { - return JournalEntry{}, fmt.Errorf("'__REALTIME_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__REALTIME_TIMESTAMP"]) + return JournalEntry{}, + fmt.Errorf("'__REALTIME_TIMESTAMP': '%[1]v', type %[1]T is not a string", + fields["__REALTIME_TIMESTAMP"]) } unixTS, err := strconv.ParseUint(ts, 10, 64) if err != nil { - return JournalEntry{}, fmt.Errorf("could not convert '__REALTIME_TIMESTAMP' to uint64: %w", err) + return JournalEntry{}, + fmt.Errorf("could not convert '__REALTIME_TIMESTAMP' to uint64: %w", + err) } monotomicTs, isString := fields["__MONOTONIC_TIMESTAMP"].(string) if !isString { - return JournalEntry{}, fmt.Errorf("'__MONOTONIC_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__MONOTONIC_TIMESTAMP"]) + return JournalEntry{}, + fmt.Errorf("'__MONOTONIC_TIMESTAMP': '%[1]v', type %[1]T is not a string", + fields["__MONOTONIC_TIMESTAMP"]) } monotonicTSInt, err := strconv.ParseUint(monotomicTs, 10, 64) if err != nil { - return JournalEntry{}, fmt.Errorf("could not convert '__MONOTONIC_TIMESTAMP' to uint64: %w", err) + return JournalEntry{}, + fmt.Errorf("could not convert '__MONOTONIC_TIMESTAMP' to uint64: %w", + err) } cursor, isString := fields["__CURSOR"].(string) if !isString { - return JournalEntry{}, fmt.Errorf("'_CURSOR': '%[1]v', type %[1]T is not a string", fields["_CURSOR"]) + return JournalEntry{}, + fmt.Errorf("'_CURSOR': '%[1]v', type %[1]T is not a string", + fields["_CURSOR"]) } // Update our cursor so we can restart journalctl if needed diff --git a/filebeat/input/journald/pkg/journalctl/reader_test.go b/filebeat/input/journald/pkg/journalctl/reader_test.go index af3837fd09c..f1c5f3bf4bc 100644 --- a/filebeat/input/journald/pkg/journalctl/reader_test.go +++ b/filebeat/input/journald/pkg/journalctl/reader_test.go @@ -48,8 +48,8 @@ func TestEventWithNonStringData(t *testing.T) { for idx, rawEvent := range testCases { t.Run(fmt.Sprintf("test %d", idx), func(t *testing.T) { mock := JctlMock{ - NextFunc: func(canceler input.Canceler) ([]byte, error) { - return rawEvent, nil + NextFunc: func(canceler input.Canceler) ([]byte, bool, error) { + return rawEvent, false, nil }, } r := Reader{ @@ -72,8 +72,8 @@ func TestRestartsJournalctlOnError(t *testing.T) { ctx := context.Background() mock := JctlMock{ - NextFunc: func(canceler input.Canceler) ([]byte, error) { - return jdEvent, errors.New("journalctl exited with code 42") + NextFunc: func(canceler input.Canceler) ([]byte, bool, error) { + return jdEvent, false, errors.New("journalctl exited with code 42") }, } @@ -90,8 +90,8 @@ func TestRestartsJournalctlOnError(t *testing.T) { // If calls have been made, change the Next function to always succeed // and return it - mock.NextFunc = func(canceler input.Canceler) ([]byte, error) { - return jdEvent, nil + mock.NextFunc = func(canceler input.Canceler) ([]byte, bool, error) { + return jdEvent, false, nil } return &mock, nil diff --git a/filebeat/input/journald/testdata/multiple-boots.export b/filebeat/input/journald/testdata/multiple-boots.export new file mode 100644 index 00000000000..91e5488470b --- /dev/null +++ b/filebeat/input/journald/testdata/multiple-boots.export @@ -0,0 +1,86 @@ +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=1;b=0ffe5f74a4bd49ca8597eb05fe1a512a;m=39f445;t=6225212a5b6da;x=3f056d2626450d83 +__REALTIME_TIMESTAMP=1726585755776730 +__MONOTONIC_TIMESTAMP=3798085 +_BOOT_ID=0ffe5f74a4bd49ca8597eb05fe1a512a +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +PRIORITY=5 +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +MESSAGE=Linux version 6.1.0-25-amd64 (debian-kernel@lists.debian.org) (gcc-12 (Debian 12.2.0-14) 12.2.0, GNU ld (GNU Binutils for Debian) 2.40) #1 SMP PREEMPT_DYNAMIC Debian 6.1.106-3 (2024-08-26) +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system + +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=2;b=0ffe5f74a4bd49ca8597eb05fe1a512a;m=39f452;t=6225212a5b6e7;x=67b36f81fa43ba68 +__REALTIME_TIMESTAMP=1726585755776743 +__MONOTONIC_TIMESTAMP=3798098 +_BOOT_ID=0ffe5f74a4bd49ca8597eb05fe1a512a +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system +PRIORITY=6 +MESSAGE=Command line: BOOT_IMAGE=/boot/vmlinuz-6.1.0-25-amd64 root=UUID=3841998b-4e88-4231-93c8-3fc24b549223 ro quiet + +Sep 17 11:26:36 Debian12 kernel: Linux version 6.1.0-25-amd64 (debian-kernel@lists.debian.org) (gcc-12 (Debian 12.2.0-14) 12.2.0, GNU ld (GNU Binutils for Debian) 2.40) #1 SMP PREEMPT_DYNAMIC Debian 6.1.106-3 (2024-08-26) +Sep 17 11:26:36 Debian12 kernel: Command line: BOOT_IMAGE=/boot/vmlinuz-6.1.0-25-amd64 root=UUID=3841998b-4e88-4231-93c8-3fc24b549223 ro quiet +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=22e3;b=457105b2d84547a4b4549f0eaa700b61;m=35bc29;t=6227ecec5b11f;x=a46eaad8c3930985 +__REALTIME_TIMESTAMP=1726777890550047 +__MONOTONIC_TIMESTAMP=3521577 +_BOOT_ID=457105b2d84547a4b4549f0eaa700b61 +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +PRIORITY=5 +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +MESSAGE=Linux version 6.1.0-25-amd64 (debian-kernel@lists.debian.org) (gcc-12 (Debian 12.2.0-14) 12.2.0, GNU ld (GNU Binutils for Debian) 2.40) #1 SMP PREEMPT_DYNAMIC Debian 6.1.106-3 (2024-08-26) +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system + +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=22e4;b=457105b2d84547a4b4549f0eaa700b61;m=35bc37;t=6227ecec5b12d;x=fcd8a87f1f95be6e +__REALTIME_TIMESTAMP=1726777890550061 +__MONOTONIC_TIMESTAMP=3521591 +_BOOT_ID=457105b2d84547a4b4549f0eaa700b61 +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system +PRIORITY=6 +MESSAGE=Command line: BOOT_IMAGE=/boot/vmlinuz-6.1.0-25-amd64 root=UUID=3841998b-4e88-4231-93c8-3fc24b549223 ro quiet + +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=451d;b=e2fca45429e54522bb2927112eb8e0b5;m=2aad67;t=6228fba6fbe98;x=ab82fca7956545cf +__REALTIME_TIMESTAMP=1726850563817112 +__MONOTONIC_TIMESTAMP=2796903 +_BOOT_ID=e2fca45429e54522bb2927112eb8e0b5 +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +PRIORITY=5 +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +MESSAGE=Linux version 6.1.0-25-amd64 (debian-kernel@lists.debian.org) (gcc-12 (Debian 12.2.0-14) 12.2.0, GNU ld (GNU Binutils for Debian) 2.40) #1 SMP PREEMPT_DYNAMIC Debian 6.1.106-3 (2024-08-26) +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system + +__CURSOR=s=8c7196499b954413a742eb1e2107fa5d;i=451e;b=e2fca45429e54522bb2927112eb8e0b5;m=2aad75;t=6228fba6fbea7;x=f334fe004963f224 +__REALTIME_TIMESTAMP=1726850563817127 +__MONOTONIC_TIMESTAMP=2796917 +_BOOT_ID=e2fca45429e54522bb2927112eb8e0b5 +_SOURCE_MONOTONIC_TIMESTAMP=0 +_TRANSPORT=kernel +SYSLOG_FACILITY=0 +SYSLOG_IDENTIFIER=kernel +_MACHINE_ID=ad88a1859979427ea1a7c24f0ae0320a +_HOSTNAME=Debian12 +_RUNTIME_SCOPE=system +PRIORITY=6 +MESSAGE=Command line: BOOT_IMAGE=/boot/vmlinuz-6.1.0-25-amd64 root=UUID=3841998b-4e88-4231-93c8-3fc24b549223 ro quiet + diff --git a/filebeat/input/journald/testdata/multiple-boots.journal b/filebeat/input/journald/testdata/multiple-boots.journal new file mode 100644 index 00000000000..668b82162d6 Binary files /dev/null and b/filebeat/input/journald/testdata/multiple-boots.journal differ diff --git a/filebeat/input/journald/testdata/ndjson-parser.export b/filebeat/input/journald/testdata/ndjson-parser.export new file mode 100644 index 00000000000..0a24b593f77 Binary files /dev/null and b/filebeat/input/journald/testdata/ndjson-parser.export differ diff --git a/filebeat/input/journald/testdata/ndjson-parser.journal b/filebeat/input/journald/testdata/ndjson-parser.journal new file mode 100644 index 00000000000..aa4aa7960f3 Binary files /dev/null and b/filebeat/input/journald/testdata/ndjson-parser.journal differ diff --git a/filebeat/tests/integration/journald_test.go b/filebeat/tests/integration/journald_test.go index 447e49b82bb..712d2db4871 100644 --- a/filebeat/tests/integration/journald_test.go +++ b/filebeat/tests/integration/journald_test.go @@ -75,7 +75,7 @@ func generateJournaldLogs(t *testing.T, ctx context.Context, syslogID string, ma //go:embed testdata/filebeat_journald.yml var journaldInputCfg string -func TestJournaldInput(t *testing.T) { +func TestJournaldInputRunsAndRecoversFromJournalctlFailures(t *testing.T) { filebeat := integration.NewBeat( t, "filebeat", @@ -90,9 +90,12 @@ func TestJournaldInput(t *testing.T) { filebeat.WriteConfigFile(yamlCfg) filebeat.Start() + // On a normal execution we run journalclt twice, the first time to read all messages from the + // previous boot until 'now' and the second one with the --follow flag that should keep on running. + filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start") filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start") - pidLine := filebeat.GetLogLine("journalctl started with PID") + pidLine := filebeat.GetLastLogLine("journalctl started with PID") logEntry := struct{ Message string }{} if err := json.Unmarshal([]byte(pidLine), &logEntry); err != nil { t.Errorf("could not parse PID log entry as JSON: %s", err) @@ -105,7 +108,7 @@ func TestJournaldInput(t *testing.T) { // Kill journalctl if err := syscall.Kill(pid, syscall.SIGKILL); err != nil { - t.Fatalf("coluld not kill journalctl with PID %d: %s", pid, err) + t.Fatalf("coluld not kill journalctl with PID %d: %s", pid, err) } go generateJournaldLogs(t, context.Background(), syslogID, 5) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 9b8002f1176..8adbc18959d 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -33,6 +33,7 @@ import ( "os/exec" "path/filepath" "regexp" + "slices" "strconv" "strings" "sync" @@ -430,6 +431,29 @@ func (b *BeatProc) GetLogLine(s string) string { return line } +// GetLastLogLine search for the string s starting at the end +// of the logs, if it is found the whole log line is returned, otherwise +// an empty string is returned. GetLastLogLine does not keep track of +// any offset. +func (b *BeatProc) GetLastLogLine(s string) string { + logFile := b.openLogFile() + defer logFile.Close() + + found, line := b.searchStrInLogsReversed(logFile, s) + if found { + return line + } + + eventLogFile := b.openEventLogFile() + if eventLogFile == nil { + return "" + } + defer eventLogFile.Close() + _, line = b.searchStrInLogsReversed(eventLogFile, s) + + return line +} + // searchStrInLogs search for s as a substring of any line in logFile starting // from offset. // @@ -471,6 +495,44 @@ func (b *BeatProc) searchStrInLogs(logFile *os.File, s string, offset int64) (bo return false, offset, "" } +// searchStrInLogs search for s as a substring of any line in logFile starting +// from offset. +// +// It will close logFile and return the current offset. +func (b *BeatProc) searchStrInLogsReversed(logFile *os.File, s string) (bool, string) { + t := b.t + + defer func() { + if err := logFile.Close(); err != nil { + // That's not quite a test error, but it can impact + // next executions of LogContains, so treat it as an error + t.Errorf("could not close log file: %s", err) + } + }() + + r := bufio.NewReader(logFile) + lines := []string{} + for { + line, err := r.ReadString('\n') + if err != nil { + if err != io.EOF { + t.Fatalf("error reading log file '%s': %s", logFile.Name(), err) + } + break + } + lines = append(lines, line) + } + + slices.Reverse(lines) + for _, line := range lines { + if strings.Contains(line, s) { + return true, line + } + } + + return false, "" +} + // WaitForLogs waits for the specified string s to be present in the logs within // the given timeout duration and fails the test if s is not found. // msgAndArgs should be a format string and arguments that will be printed