Skip to content

Commit

Permalink
Read journal entries from all boots (#41244) (#41404)
Browse files Browse the repository at this point in the history
Some versions of journalctl will only return messages from the current
boot when --follow is passed, it will even ignore the cursor or date
arguments.

This commit reads messages from all boots by first calling journalctl
without the --follow flag, reading all entries and once it
successfully exits, then we restart journalctl with the cursor and the
--follow flag.

The parser test is upsted to use ndjson parser instead of
multiline because the multiline parser can have issues when journald
input is reading from files. There is a corner case where the
journalctl exits successfully and the reader goroutine gets an error,
this makes Next to return early, making the multiline to also return
early.

TestJournaldInput assumed journalctl was run only once, however that
has changed recently. The test is updated to accommodate for that and rename
to TestJournaldInputRunsAndRecoversFromJournalctlFailures, which
better reflects what it is actually testing.

---------

Co-authored-by: Pierre HILBERT <[email protected]>
(cherry picked from commit d21ed32)

Co-authored-by: Tiago Queiroz <[email protected]>
  • Loading branch information
mergify[bot] and belimawr authored Oct 24, 2024
1 parent a64621c commit 3fcfd9a
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 131 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Log bad handshake details when websocket connection fails {pull}41300[41300]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]

*Heartbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -107,7 +107,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
e.t.Helper()
msg := strings.Builder{}
fmt.Fprintf(&msg, "did not find the expected %d events", count)
assert.Eventually(e.t, func() bool {
require.Eventually(e.t, func() bool {
sum := len(e.pipeline.GetAllEvents())
if sum == count {
return true
Expand Down
8 changes: 5 additions & 3 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ func TestInputSeek(t *testing.T) {

env.waitUntilEventCount(len(testCase.expectedMessages))

for idx, event := range env.pipeline.GetAllEvents() {
if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected {
t.Fatalf("expecting event message %q, got %q", expected, got)
if !t.Failed() {
for idx, event := range env.pipeline.GetAllEvents() {
if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected {
t.Fatalf("expecting event message %q, got %q", expected, got)
}
}
}
})
Expand Down
36 changes: 23 additions & 13 deletions filebeat/input/journald/input_parsers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,41 @@ import (
// it only tests a single parser, but that is enough to ensure
// we're correctly using the parsers
func TestInputParsers(t *testing.T) {
inputParsersExpected := []string{"1st line\n2nd line\n3rd line", "4th line\n5th line\n6th line"}
env := newInputTestingEnvironment(t)

inp := env.mustCreateInput(mapstr.M{
"paths": []string{path.Join("testdata", "input-multiline-parser.journal")},
"include_matches.match": []string{"_SYSTEMD_USER_UNIT=log-service.service"},
"paths": []string{path.Join("testdata", "ndjson-parser.journal")},
"parsers": []mapstr.M{
{
"multiline": mapstr.M{
"type": "count",
"count_lines": 3,
"ndjson": mapstr.M{
"target": "",
},
},
},
})

ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)
env.startInput(ctx, inp)
env.waitUntilEventCount(len(inputParsersExpected))
env.waitUntilEventCount(1)
event := env.pipeline.clients[0].GetEvents()[0]

foo, isString := event.Fields["foo"].(string)
if !isString {
t.Errorf("expecting field 'foo' to be string, got %T", event.Fields["foo"])
}

for idx, event := range env.pipeline.clients[0].GetEvents() {
if got, expected := event.Fields["message"], inputParsersExpected[idx]; got != expected {
t.Errorf("expecting event message %q, got %q", expected, got)
}
answer, isInt := event.Fields["answer"].(int64)
if !isInt {
t.Errorf("expecting field 'answer' to be int64, got %T", event.Fields["answer"])
}

cancelInput()
// The JSON in the test journal is: '{"foo": "bar", "answer":42}'
expectedFoo := "bar"
expectedAnswer := int64(42)
if foo != expectedFoo {
t.Errorf("expecting 'foo' from the Journal JSON to be '%s' got '%s' instead", expectedFoo, foo)
}
if answer != expectedAnswer {
t.Errorf("expecting 'answer' from the Journal JSON to be '%d' got '%d' instead", expectedAnswer, answer)
}
}
64 changes: 12 additions & 52 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,59 +39,19 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

// How to write to journal from CLI:
// https://www.baeldung.com/linux/systemd-journal-message-terminal
func TestInputCanReadAllBoots(t *testing.T) {
env := newInputTestingEnvironment(t)
cfg := mapstr.M{
"paths": []string{path.Join("testdata", "multiple-boots.journal")},
}
inp := env.mustCreateInput(cfg)

// TestGenerateJournalEntries generates entries in the user's journal.
// It is kept commented out at the top of the file as reference and
// easy access.
//
// How to generate a journal file with only the entries you want:
// 1. Add the dependencies for this test
// go get github.com/ssgreg/journald
// 2. Uncomment and run the test:
// 3. Add the following import:
// journaldlogger "github.com/ssgreg/journald"
// 4. Get a VM, ssh into it, make sure you can access the test from it
// 5. Find the journal file, usually at /var/log/journal/<machine ID>/user-1000.journal
// 7. Clean and rotate the journal
// sudo journalctl --vacuum-time=1s
// sudo journalctl --rotate
// 8. Run this test: `go test -run=TestGenerateJournalEntries`
// 9. Copy the journal file somewhere else
// cp /var/log/journal/21282bcb80a74c08a0d14a047372256c/user-1000.journal /tmp/foo.journal
// 10. Read the journal file:
// journalctl --file=/tmp/foo.journal -n 10
// 11. Read the journal with all fields as JSON
// journalctl --file=/tmp/foo.journal -n 10 -o json
// func TestGenerateJournalEntries(t *testing.T) {
// fields := []map[string]any{
// {
// "BAR": "bar",
// },
// {
// "FOO": "foo",
// },
// {
// "BAR": "bar",
// "FOO": "foo",
// },
// {
// "FOO_BAR": "foo",
// },
// {
// "FOO_BAR": "bar",
// },
// {
// "FOO_BAR": "foo bar",
// },
// }
// for i, m := range fields {
// if err := journaldlogger.Send(fmt.Sprintf("message %d", i), journaldlogger.PriorityInfo, m); err != nil {
// t.Fatal(err)
// }
// }
// }
ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)

env.startInput(ctx, inp)
env.waitUntilEventCount(6)
}

func TestInputFieldsTranslation(t *testing.T) {
// A few random keys to verify
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/journald/pkg/journalctl/jctlmock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 25 additions & 7 deletions filebeat/input/journald/pkg/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"os/exec"
"strings"
"sync"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -37,6 +38,7 @@ type journalctl struct {

logger *logp.Logger
canceler input.Canceler
waitDone sync.WaitGroup
}

// Factory returns an instance of journalctl ready to use.
Expand Down Expand Up @@ -95,7 +97,7 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args
data, err := reader.ReadBytes('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("cannot read from journalctl stdout: %s", err)
logger.Errorf("cannot read from journalctl stdout: '%s'", err)
}
return
}
Expand All @@ -118,10 +120,13 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args

// Whenever the journalctl process exits, the `Wait` call returns,
// if there was an error it is logged and this goroutine exits.
jctl.waitDone.Add(1)
go func() {
defer jctl.waitDone.Done()
if err := cmd.Wait(); err != nil {
jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode())
}
jctl.logger.Debugf("journalctl exit code: %d", cmd.ProcessState.ExitCode())
}()

return &jctl, nil
Expand All @@ -130,18 +135,31 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args
// Kill Terminates the journalctl process using a SIGKILL.
func (j *journalctl) Kill() error {
j.logger.Debug("sending SIGKILL to journalctl")
err := j.cmd.Process.Kill()
return err
return j.cmd.Process.Kill()
}

func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) {
// Next returns the next journal entry (as JSON). If `finished` is true, then
// journalctl finished returning all data and exited successfully, if journalctl
// exited unexpectedly, then `err` is non-nil, `finished` is false and an empty
// byte array is returned.
func (j *journalctl) Next(cancel input.Canceler) ([]byte, bool, error) {
select {
case <-cancel.Done():
return []byte{}, ErrCancelled
return []byte{}, false, ErrCancelled
case d, open := <-j.dataChan:
if !open {
return []byte{}, errors.New("no more data to read, journalctl might have exited unexpectedly")
// Wait for the process to exit, so we can read the exit code.
j.waitDone.Wait()
if j.cmd.ProcessState.ExitCode() == 0 {
return []byte{}, true, nil
}
return []byte{},
false,
fmt.Errorf(
"no more data to read, journalctl exited unexpectedly, exit code: %d",
j.cmd.ProcessState.ExitCode())
}
return d, nil

return d, false, nil
}
}
Loading

0 comments on commit 3fcfd9a

Please sign in to comment.