From 93c70b531c15692e016e7297099cc953cbefb7ac Mon Sep 17 00:00:00 2001 From: Marcus Crestani Date: Thu, 26 Mar 2020 10:50:32 +0100 Subject: [PATCH] Add log-run mode that submits given files to Logstash and prints the results. --- README.md | 13 ++++++ logstash-filter-verifier.go | 89 ++++++++++++++++++++++++++++++++++++- logstash/eventreader.go | 20 +++++++++ logstash/process.go | 12 ++++- 4 files changed, 130 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d14990b..6be9d2a 100644 --- a/README.md +++ b/README.md @@ -300,6 +300,19 @@ There are a few points to be made here: cases that behave differently depending on the host where they're run, we ignore that field with the `ignore` property. +## Log-run mode + +Log-run mode is useful if you want to pass example log files through your filter +and take a look at the result. Log-run mmode makes this easy by using your test +configuration so that you do not have to modify your pipeline configuration or +maintain a development pipeline configuration. + +The command-line argument `--logfile=LOGFILE` enables log-run mode and passes +`LOGFILE` to Logstash and prints the result of your pipeline to stdout. You can +provide command-line argument `--logfile LOGFILE` multiple times. If `LOGFILE` +is a directory, log-run mode finds all the log files that match +`--logfile-extension` below the given directory and processes them (default +logfile extension is `log`). ## Test case file reference diff --git a/logstash-filter-verifier.go b/logstash-filter-verifier.go index 36b0139..613e708 100644 --- a/logstash-filter-verifier.go +++ b/logstash-filter-verifier.go @@ -5,6 +5,7 @@ package main import ( "errors" "fmt" + "io" "os" "path/filepath" "runtime" @@ -76,6 +77,15 @@ var ( Flag("sockets-timeout", "Timeout (duration) for the communication with Logstash via Unix domain sockets. Has no effect unless --sockets is used."). Default("60s"). Duration() + logfilePaths = kingpin. + Flag("logfile", "Switches to log-run mode that submits given log files to Logstash and prints the results. If LOGFILE is a directory, it uses all files below that path that match logfile-extension."). + PlaceHolder("LOGFILE"). + Strings() + logfileExtension = kingpin. + Flag("logfile-extension", "Extension that identify logfiles to use. Has no effect unless --logfile is a directory. Default is `log`."). + PlaceHolder("EXTENSION"). + Default("log"). + String() // Arguments testcasePath = kingpin. @@ -88,7 +98,7 @@ var ( ExistingFilesOrDirs() ) -// findExecutable examines the passed file paths and returns the first +// examines findExecutable the passed file paths and returns the first // one that is an existing executable file. func findExecutable(paths []string) (string, error) { for _, p := range paths { @@ -138,7 +148,7 @@ func runTests(inv *logstash.Invocation, tests []testcase.TestCaseSet, diffComman return err } - result, err := p.Wait() + result, err := p.WaitAndRead() if err != nil || *logstashOutput { message := getLogstashOutputMessage(result.Output, result.Log) if err != nil { @@ -157,6 +167,71 @@ func runTests(inv *logstash.Invocation, tests []testcase.TestCaseSet, diffComman return nil } +func visit(files *[]string, extension string) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Fatal(err) + } + if info.IsDir() { + return nil + } + if filepath.Ext(path) != "." + extension { + return nil + } + *files = append(*files, path) + return nil + } +} + +// runLogs runs Logstash with a set of configuration files against log files +func runLogs(inv *logstash.Invocation, tests []testcase.TestCaseSet, logFilePaths []string, logFileExtension string, keptEnvVars []string) error { + for _, t := range tests { + fmt.Printf("Using test setup from %s...\n", filepath.Base(t.File)) + p, err := logstash.NewProcess(inv, t.Codec, t.InputFields, keptEnvVars) + if err != nil { + return err + } + defer p.Release() + if err = p.Start(); err != nil { + return err + } + var file *os.File + var logFiles []string + var written int64 + for _, logFilePath := range logFilePaths { + err = filepath.Walk(logFilePath, visit(&logFiles, logFileExtension)) + if err != nil { + return err + } + for _, logFile := range logFiles { + fmt.Printf("Piping %s: ", logFile) + file, err = os.Open(logFile) + if err != nil { + return err + } + written, err = io.Copy(p.Input, file) + fmt.Printf("%d bytes\n", written) + if err != nil { + return err + } + } + } + if err = p.Input.Close(); err != nil { + return err + } + + result, err := p.WaitAndPrint() + if err != nil || *logstashOutput { + message := getLogstashOutputMessage(result.Output, result.Log) + if err != nil { + return fmt.Errorf("Error running Logstash: %s.%s", err, message) + } + userError("%s", message) + } + } + return nil +} + // runParallelTests runs multiple set of configuration in a single // instance of Logstash against a slice of test cases and compares // the actual events against the expected set. Returns an error if @@ -324,6 +399,16 @@ func mainEntrypoint() int { return 1 } defer inv.Release() + + if len(*logfilePaths) > 0 { + if err = runLogs(inv, tests, *logfilePaths, *logfileExtension, allKeptEnvVars); err != nil { + userError(err.Error()) + return 1 + } else { + return 0 + } + } + if *unixSockets { if runtime.GOOS == "windows" { userError("Use of Unix domain sockets for communication with Logstash is not supported on Windows.") diff --git a/logstash/eventreader.go b/logstash/eventreader.go index adacaf6..9daa8e4 100644 --- a/logstash/eventreader.go +++ b/logstash/eventreader.go @@ -36,3 +36,23 @@ func readEvents(r io.Reader) ([]Event, error) { } return events, scanner.Err() } + +// formatAndPRintEvents reads zero, one, or more Logstash events from a stream +// of newline-separated JSON strings and pretty-prints them to stdout. +func formatAndPrintEvents(r io.Reader) ([]Event, error) { + events := []Event{} + scanner := bufio.NewScanner(r) + for scanner.Scan() { + var event Event + err := json.Unmarshal([]byte(scanner.Text()), &event) + if err != nil { + return events, BadLogstashOutputError{scanner.Text()} + } + buf, err := json.MarshalIndent(event, "", " ") + if err != nil { + return events, fmt.Errorf("Failed to marshal %+v as JSON: %s", event, err) + } + fmt.Printf(string(buf) + "\n") + } + return events, scanner.Err() +} diff --git a/logstash/process.go b/logstash/process.go index 87dfb3e..6aa985c 100644 --- a/logstash/process.go +++ b/logstash/process.go @@ -105,7 +105,7 @@ func (p *Process) Start() error { // Wait blocks until the started Logstash process terminates and // returns the result of the execution. -func (p *Process) Wait() (*Result, error) { +func (p *Process) Wait_(eventFunc func(r io.Reader) ([]Event, error)) (*Result, error) { if p.child.Process == nil { return nil, errors.New("can't wait on an unborn process") } @@ -136,11 +136,19 @@ func (p *Process) Wait() (*Result, error) { } var err error - result.Events, err = readEvents(p.output) + result.Events, err = eventFunc(p.output) result.Success = err == nil return &result, err } +func (p *Process) WaitAndPrint() (*Result, error) { + return p.Wait_(formatAndPrintEvents) +} + +func (p *Process) WaitAndRead() (*Result, error) { + return p.Wait_(readEvents) +} + // Release frees all allocated resources connected to this process. func (p *Process) Release() { _ = p.output.Close()