Skip to content

Commit

Permalink
Add log-run mode that submits given files to Logstash and prints the …
Browse files Browse the repository at this point in the history
…results.
  • Loading branch information
cresh committed Mar 26, 2020
1 parent 22cff4b commit 93c70b5
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 4 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,19 @@ There are a few points to be made here:
cases that behave differently depending on the host where they're
run, we ignore that field with the `ignore` property.

## Log-run mode

Log-run mode is useful if you want to pass example log files through your filter
and take a look at the result. Log-run mmode makes this easy by using your test
configuration so that you do not have to modify your pipeline configuration or
maintain a development pipeline configuration.

The command-line argument `--logfile=LOGFILE` enables log-run mode and passes
`LOGFILE` to Logstash and prints the result of your pipeline to stdout. You can
provide command-line argument `--logfile LOGFILE` multiple times. If `LOGFILE`
is a directory, log-run mode finds all the log files that match
`--logfile-extension` below the given directory and processes them (default
logfile extension is `log`).

## Test case file reference

Expand Down
89 changes: 87 additions & 2 deletions logstash-filter-verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -76,6 +77,15 @@ var (
Flag("sockets-timeout", "Timeout (duration) for the communication with Logstash via Unix domain sockets. Has no effect unless --sockets is used.").
Default("60s").
Duration()
logfilePaths = kingpin.
Flag("logfile", "Switches to log-run mode that submits given log files to Logstash and prints the results. If LOGFILE is a directory, it uses all files below that path that match logfile-extension.").
PlaceHolder("LOGFILE").
Strings()
logfileExtension = kingpin.
Flag("logfile-extension", "Extension that identify logfiles to use. Has no effect unless --logfile is a directory. Default is `log`.").
PlaceHolder("EXTENSION").
Default("log").
String()

// Arguments
testcasePath = kingpin.
Expand All @@ -88,7 +98,7 @@ var (
ExistingFilesOrDirs()
)

// findExecutable examines the passed file paths and returns the first
// examines findExecutable the passed file paths and returns the first
// one that is an existing executable file.
func findExecutable(paths []string) (string, error) {
for _, p := range paths {
Expand Down Expand Up @@ -138,7 +148,7 @@ func runTests(inv *logstash.Invocation, tests []testcase.TestCaseSet, diffComman
return err
}

result, err := p.Wait()
result, err := p.WaitAndRead()
if err != nil || *logstashOutput {
message := getLogstashOutputMessage(result.Output, result.Log)
if err != nil {
Expand All @@ -157,6 +167,71 @@ func runTests(inv *logstash.Invocation, tests []testcase.TestCaseSet, diffComman
return nil
}

func visit(files *[]string, extension string) filepath.WalkFunc {
return func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Fatal(err)
}
if info.IsDir() {
return nil
}
if filepath.Ext(path) != "." + extension {
return nil
}
*files = append(*files, path)
return nil
}
}

// runLogs runs Logstash with a set of configuration files against log files
func runLogs(inv *logstash.Invocation, tests []testcase.TestCaseSet, logFilePaths []string, logFileExtension string, keptEnvVars []string) error {
for _, t := range tests {
fmt.Printf("Using test setup from %s...\n", filepath.Base(t.File))
p, err := logstash.NewProcess(inv, t.Codec, t.InputFields, keptEnvVars)
if err != nil {
return err
}
defer p.Release()
if err = p.Start(); err != nil {
return err
}
var file *os.File
var logFiles []string
var written int64
for _, logFilePath := range logFilePaths {
err = filepath.Walk(logFilePath, visit(&logFiles, logFileExtension))
if err != nil {
return err
}
for _, logFile := range logFiles {
fmt.Printf("Piping %s: ", logFile)
file, err = os.Open(logFile)
if err != nil {
return err
}
written, err = io.Copy(p.Input, file)
fmt.Printf("%d bytes\n", written)
if err != nil {
return err
}
}
}
if err = p.Input.Close(); err != nil {
return err
}

result, err := p.WaitAndPrint()
if err != nil || *logstashOutput {
message := getLogstashOutputMessage(result.Output, result.Log)
if err != nil {
return fmt.Errorf("Error running Logstash: %s.%s", err, message)
}
userError("%s", message)
}
}
return nil
}

// runParallelTests runs multiple set of configuration in a single
// instance of Logstash against a slice of test cases and compares
// the actual events against the expected set. Returns an error if
Expand Down Expand Up @@ -324,6 +399,16 @@ func mainEntrypoint() int {
return 1
}
defer inv.Release()

if len(*logfilePaths) > 0 {
if err = runLogs(inv, tests, *logfilePaths, *logfileExtension, allKeptEnvVars); err != nil {
userError(err.Error())
return 1
} else {
return 0
}
}

if *unixSockets {
if runtime.GOOS == "windows" {
userError("Use of Unix domain sockets for communication with Logstash is not supported on Windows.")
Expand Down
20 changes: 20 additions & 0 deletions logstash/eventreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,23 @@ func readEvents(r io.Reader) ([]Event, error) {
}
return events, scanner.Err()
}

// formatAndPRintEvents reads zero, one, or more Logstash events from a stream
// of newline-separated JSON strings and pretty-prints them to stdout.
func formatAndPrintEvents(r io.Reader) ([]Event, error) {
events := []Event{}
scanner := bufio.NewScanner(r)
for scanner.Scan() {
var event Event
err := json.Unmarshal([]byte(scanner.Text()), &event)
if err != nil {
return events, BadLogstashOutputError{scanner.Text()}
}
buf, err := json.MarshalIndent(event, "", " ")
if err != nil {
return events, fmt.Errorf("Failed to marshal %+v as JSON: %s", event, err)
}
fmt.Printf(string(buf) + "\n")
}
return events, scanner.Err()
}
12 changes: 10 additions & 2 deletions logstash/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *Process) Start() error {

// Wait blocks until the started Logstash process terminates and
// returns the result of the execution.
func (p *Process) Wait() (*Result, error) {
func (p *Process) Wait_(eventFunc func(r io.Reader) ([]Event, error)) (*Result, error) {
if p.child.Process == nil {
return nil, errors.New("can't wait on an unborn process")
}
Expand Down Expand Up @@ -136,11 +136,19 @@ func (p *Process) Wait() (*Result, error) {
}

var err error
result.Events, err = readEvents(p.output)
result.Events, err = eventFunc(p.output)
result.Success = err == nil
return &result, err
}

func (p *Process) WaitAndPrint() (*Result, error) {
return p.Wait_(formatAndPrintEvents)
}

func (p *Process) WaitAndRead() (*Result, error) {
return p.Wait_(readEvents)
}

// Release frees all allocated resources connected to this process.
func (p *Process) Release() {
_ = p.output.Close()
Expand Down

0 comments on commit 93c70b5

Please sign in to comment.