Skip to content

Commit

Permalink
Merge pull request #55 from vshn/fix/read-line
Browse files Browse the repository at this point in the history
Fix fifo reading if named pipe remains open
  • Loading branch information
glrf authored Nov 21, 2022
2 parents 4810148 + 1386331 commit 05ff965
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 29 deletions.
40 changes: 40 additions & 0 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -65,6 +66,24 @@ func TestE2E_MasterThenFault(t *testing.T) {
assert.NoErrorf(t, done(), "failed to stop master command")
}

func TestE2E_FIFO(t *testing.T) {
conf, cleanup, err := setupConfig(t.Name(), "192.168.1.1/32")
require.NoErrorf(t, err, "failed to setup test env")
defer cleanup()
pname, pipe, removePipe, err := setupFifo(t.Name())
require.NoErrorf(t, err, "failed to setup pipe")
defer removePipe()

cmd := exec.Command("./floaty", "--fifo", conf, pname)
out, stop, err := startCmd(cmd)
require.NoError(t, err)
defer stop()

_, err = pipe.Write([]byte(fmt.Sprintf("INSTANCE %q MASTER 100\n", t.Name())))
require.NoError(t, err)
expectUpdate(t, out, "192.168.1.1/32", 3)
}

func startCmd(cmd *exec.Cmd) (*bytes.Buffer, func() error, error) {
out := &bytes.Buffer{}
cmd.Stdout = out
Expand Down Expand Up @@ -165,3 +184,24 @@ vrrp_instance %s {
}
return confF, cleanup, nil
}

func setupFifo(name string) (string, io.Writer, func() error, error) {
dir, err := os.MkdirTemp("", name)
if err != nil {
return "", nil, nil, err
}
cleanup := func() error {
return os.RemoveAll(dir)
}
pname := filepath.Join(dir, "pipe")
err = syscall.Mkfifo(pname, 0666)
if err != nil {
return "", nil, cleanup, err
}

f, err := os.OpenFile(pname, os.O_RDWR|os.O_CREATE|os.O_APPEND|syscall.O_NONBLOCK, 0777)
if err != nil {
return "", nil, cleanup, err
}
return pname, f, cleanup, nil
}
50 changes: 29 additions & 21 deletions fifo.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"bufio"
"context"
"encoding/csv"
"fmt"
"io"

Expand Down Expand Up @@ -37,33 +37,20 @@ func NewFifoHandler(cfg notifyConfig, pipe io.Reader, events <-chan fsnotify.Eve
}

func (h FifoHandler) HandleFifo(ctx context.Context) error {
// Yes, this can actually be parsed as a CSV file with spaces as separators and it handles quoted string the same way a shell does.
r := csv.NewReader(h.pipe)
r.Comma = ' '

err := h.handleFifoEvents(ctx)
if err != nil {
logrus.Errorf("Failed to read from named pipe: %s", err)
}
for {
select {
case e := <-h.events:
logrus.Debugf("got event: %q", e.Op.String())
switch e.Op {
case fsnotify.Write:
lines, err := r.ReadAll()
err := h.handleFifoEvents(ctx)
if err != nil {
logrus.Errorf("Failed to read from fifo: %s", err)
continue
logrus.Errorf("Failed to read from named pipe: %s", err)
}
for _, line := range lines {
n, err := parseNotification(line)
if err != nil {
logrus.Errorf("Failed to parse fifo event from keepalived, keepalived might be incompatible with the floaty version: %s", err)
continue
}
err = h.handleNotifyEvent(ctx, n)
if err != nil {
logrus.Errorf("Failed to handle notify event: %s", err)
continue
}
}

case fsnotify.Remove, fsnotify.Rename:
return fmt.Errorf("Named pipe was removed. Quitting")
}
Expand All @@ -72,6 +59,27 @@ func (h FifoHandler) HandleFifo(ctx context.Context) error {
}
}
}

func (h FifoHandler) handleFifoEvents(ctx context.Context) error {
s := bufio.NewScanner(h.pipe)
for s.Scan() {
line := s.Text()
logrus.Debugf("Got line: %q", s.Text())
n, err := parseNotificationLine(line)
if err != nil {
logrus.Errorf("Failed to parse fifo event from keepalived, keepalived might be incompatible with the floaty version: %s", err)
continue
}
err = h.handleNotifyEvent(ctx, n)
if err != nil {
logrus.Errorf("Failed to handle notify event: %s", err)
continue
}
}
// Only returns non EOF errors
return s.Err()
}

func (h FifoHandler) handleNotifyEvent(ctx context.Context, n Notification) error {
stopRunning, ok := h.running[n.Instance]
if ok {
Expand Down
101 changes: 94 additions & 7 deletions fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"io"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestFIFO_interleaving(t *testing.T) {
nh.isEventuallyMaster(t, "foo")
nh.isEventuallyNotMaster(t, "bar")

WriteToPipe(t, pipe, eventChan, "INSTANCE \"foo\" FAULT 100\nINSTANCE \"bar\" FAULT 100\nINSTANCE \"bar\" MASTER 100")
WriteToPipe(t, pipe, eventChan, "INSTANCE \"foo\" FAULT 100\nINSTANCE \"bar\" FAULT 100\nINSTANCE \"bar\" MASTER 100\n")
nh.isEventuallyNotMaster(t, "foo")
nh.isEventuallyMaster(t, "bar")

Expand All @@ -78,9 +79,95 @@ func TestFIFO_interleaving(t *testing.T) {
nh.isEventuallyMaster(t, "foo")
}

func SetupFIFOTest(t *testing.T, fn notificationHandlerFunc) (FifoHandler, *bytes.Buffer, chan fsnotify.Event) {
var pipe bytes.Buffer
eventChan := make(chan fsnotify.Event, 3)
func TestFIFO_beforeStart(t *testing.T) {
nh := newFakeNotificationHandler()
handler, pipe, eventChan := SetupFIFOTest(t, nh.GetHandler(t))
ctx, done := context.WithCancel(context.Background())
defer done()

_, err := pipe.Write([]byte("INSTANCE \"foo\" MASTER 100\n"))
require.NoError(t, err)
_, err = pipe.Write([]byte("INSTANCE \"foo\" FAULT 100\n"))
require.NoError(t, err)
_, err = pipe.Write([]byte("INSTANCE \"foo\" MASTER 100\n"))
require.NoError(t, err)
_, err = pipe.Write([]byte("INSTANCE \"bar\" FAULT 100\n"))
require.NoError(t, err)

go func() {
assert.NoError(t, handler.HandleFifo(ctx), "Handler should not fail")
}()
time.Sleep(100 * time.Millisecond)
nh.isEventuallyMaster(t, "foo")
WriteToPipe(t, pipe, eventChan, "INSTANCE \"foo\" FAULT 100\n")
nh.isEventuallyNotMaster(t, "foo")
}

func TestFIFO_withEOF(t *testing.T) {
nh := newFakeNotificationHandler()
handler, pipe, eventChan := SetupFIFOTest(t, nh.GetHandler(t))
ctx, done := context.WithCancel(context.Background())
defer done()
go func() {
assert.NoError(t, handler.HandleFifo(ctx), "Handler should not fail")
}()
pipe.setSendEOF(true)

WriteToPipe(t, pipe, eventChan, "INSTANCE \"bar\" MASTER 100\n")
nh.isEventuallyMaster(t, "bar")

WriteToPipe(t, pipe, eventChan, "INSTANCE \"foo\" FAULT 100\nINSTANCE \"bar\" FAULT 100\nINSTANCE \"bar\" MASTER 100\n")
nh.isEventuallyMaster(t, "bar")

WriteToPipe(t, pipe, eventChan, "GROUP \"bar\" BACKUP 100\n")
WriteToPipe(t, pipe, eventChan, "G s\"bar\" BACKUP 100\n")
nh.isEventuallyMaster(t, "bar")

WriteToPipe(t, pipe, eventChan, "INSTANCE \"bar\" BACKUP 100\n")
nh.isEventuallyNotMaster(t, "bar")
}

type testBuffer struct {
p bytes.Buffer
mu sync.Mutex

sendEOF bool
}

func (r *testBuffer) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
n, err := r.p.Read(p)
for err == io.EOF {
if n == 0 {
r.mu.Unlock()
// We give the writer time to actually write to the buffer
time.Sleep(time.Millisecond)
r.mu.Lock()
n, err = r.p.Read(p)
} else {
return n, nil
}
}
return n, err
}
func (r *testBuffer) Write(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.p.Write(p)
}
func (r *testBuffer) setSendEOF(sendEOF bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.sendEOF = sendEOF
}

func SetupFIFOTest(t *testing.T, fn notificationHandlerFunc) (FifoHandler, *testBuffer, chan fsnotify.Event) {
pipe := testBuffer{
mu: sync.Mutex{},
sendEOF: false,
}
eventChan := make(chan fsnotify.Event, 30)

handler := FifoHandler{
pipe: &pipe,
Expand All @@ -92,7 +179,7 @@ func SetupFIFOTest(t *testing.T, fn notificationHandlerFunc) (FifoHandler, *byte
return handler, &pipe, eventChan
}

func WriteToPipe(t *testing.T, pipe *bytes.Buffer, events chan fsnotify.Event, content string) {
func WriteToPipe(t *testing.T, pipe io.Writer, events chan fsnotify.Event, content string) {
_, err := pipe.Write([]byte(content))
require.NoError(t, err, "Failed to write to pipe, test is probably wrong")
events <- fsnotify.Event{
Expand Down Expand Up @@ -138,13 +225,13 @@ func (h *fakeNotificationHandler) isEventuallyMaster(t *testing.T, instance stri
defer h.mu.Unlock()
s, ok := h.running[instance]
return ok && s.master
}, time.Second, 10*time.Millisecond, "%s should be in master state", instance)
}, time.Second, 50*time.Millisecond, "%s should be in master state", instance)
}
func (h *fakeNotificationHandler) isEventuallyNotMaster(t *testing.T, instance string) {
require.Eventuallyf(t, func() bool {
h.mu.Lock()
defer h.mu.Unlock()
s, ok := h.running[instance]
return !ok || !s.master
}, time.Second, 10*time.Millisecond, "%s should be in master state", instance)
}, time.Second, 50*time.Millisecond, "%s should be in master state", instance)
}
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ func runFifo(ctx context.Context, cfg notifyConfig) error {
if err != nil {
return fmt.Errorf("Failed to setup FIFO handler: %w", err)
}
return fifoHandler.HandleFifo(ctx)
ctx, done := context.WithCancel(ctx)
go func() {
err = fifoHandler.HandleFifo(ctx)
done()
}()
<-ctx.Done()
return err
}

func runNotify(ctx context.Context, cfg notifyConfig) error {
Expand Down
17 changes: 17 additions & 0 deletions notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/csv"
"errors"
"fmt"
"strings"
Expand All @@ -23,6 +24,22 @@ const (
NotificationBackup NotificationStatus = "BACKUP"
)

func parseNotificationLine(line string) (Notification, error) {
// Yes, this can actually be parsed as a CSV file with spaces as separators and it handles quoted string the same way a shell does.
r := csv.NewReader(strings.NewReader(line))
r.Comma = ' '
notifications, err := r.ReadAll()
if err != nil {
return Notification{}, err
}

if len(notifications) != 1 {
return Notification{}, fmt.Errorf("Failed to parse notification: %q", line)
}

return parseNotification(notifications[0])
}

func parseNotification(fields []string) (Notification, error) {
line := strings.Join(fields, " ")
if len(fields) != 4 {
Expand Down

0 comments on commit 05ff965

Please sign in to comment.