From a5d5635cbc73a1f7e4589d1a1ad17c5cbaa378b9 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Fri, 11 Mar 2016 10:47:21 -0800 Subject: [PATCH 1/9] SetBytes isn't cumulative per run. Accumulate the number of bytes and report the average. --- log_line_reader_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/log_line_reader_test.go b/log_line_reader_test.go index fa11c41..89be8ff 100644 --- a/log_line_reader_test.go +++ b/log_line_reader_test.go @@ -57,6 +57,7 @@ func (tc TestConsumer) Consume(in <-chan LogLine) { func doBasicLogLineReaderBenchmark(b *testing.B, frontBuffSize int) { b.ResetTimer() + var tb int for i := 0; i < b.N; i++ { b.StopTimer() logs := make(chan LogLine, frontBuffSize) @@ -66,10 +67,11 @@ func doBasicLogLineReaderBenchmark(b *testing.B, frontBuffSize int) { llp := NewInputProducer(TestProducerLines) b.StartTimer() rdr.ReadLogLines(llp) - b.SetBytes(int64(llp.TotalBytes)) + tb += llp.TotalBytes close(logs) testConsumer.Wait() } + b.SetBytes(int64(tb / b.N)) } func BenchmarkLogLineReaderWithFrontBuffEqual0(b *testing.B) { From ef8648586b1ff348744e98f99ba28a381f9d0847 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Fri, 11 Mar 2016 10:48:53 -0800 Subject: [PATCH 2/9] This is cleaner and makes the intent clearer --- shuttle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shuttle.go b/shuttle.go index 26f341a..8ba8b2e 100644 --- a/shuttle.go +++ b/shuttle.go @@ -100,7 +100,7 @@ func (s *Shuttle) LoadReader(rdr io.ReadCloser) { // CloseReaders closes all tracked readers and returns any errors returned by // Close()ing the readers func (s *Shuttle) CloseReaders() []error { - errors := make([]error, len(s.readers)) + var errors []error for _, closer := range s.readers { if err := closer.Close(); err != nil { errors = append(errors, err) From 09a0f0faf58bbd003ff5296d6c82e1746dffb6ef Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Fri, 11 Mar 2016 10:49:48 -0800 Subject: [PATCH 3/9] Test cleanups * No need for a new type (TestInput), just use what is in the stdlib * New loopingBuffer, which will just loop over the buffer again and again while Reading until closed. * shuttle.Land() also closes the readers and waits. No need to do that before Land() --- shuttle_test.go | 137 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 42 deletions(-) diff --git a/shuttle_test.go b/shuttle_test.go index bc0d970..5527343 100644 --- a/shuttle_test.go +++ b/shuttle_test.go @@ -2,6 +2,7 @@ package shuttle import ( "bytes" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -9,22 +10,10 @@ import ( "regexp" "sync" "testing" + "time" ) -func newTestConfig() Config { - // Defaults should be good for most tests - config := NewConfig() - config.NumBatchers = 1 - config.LogsURL = "http://" - return config -} - -type TestInput struct { - *bytes.Reader -} - -func NewLongerTestInput() *TestInput { - return &TestInput{bytes.NewReader([]byte(`Lebowski ipsum what in God's holy name are you blathering about? +var longerTestData = []byte(`Lebowski ipsum what in God's holy name are you blathering about? Dolor sit amet, consectetur adipiscing elit praesent ac magna justo. They're nihilists. Pellentesque ac lectus quis elit blandit fringilla a ut turpis praesent. @@ -34,23 +23,89 @@ Felis ligula, malesuada suscipit malesuada non, ultrices non. Shomer shabbos. Urna sed orci ipsum, placerat id condimentum rutrum, rhoncus. Yeah man, it really tied the room together. -Ac lorem aliquam placerat.`))} +Ac lorem aliquam placerat. +`) + +func newTestConfig() Config { + // Defaults should be good for most tests + config := NewConfig() + config.NumBatchers = 1 + config.LogsURL = "http://" + return config } -func NewTestInput() *TestInput { - return &TestInput{bytes.NewReader([]byte(`Hello World -Test Line 2 -`))} +type loopingBuffer struct { + b []byte + close chan struct{} + p int + + mu *sync.Mutex + r int +} + +func NewLoopingBuffer(b []byte) *loopingBuffer { + return &loopingBuffer{ + mu: new(sync.Mutex), + b: b, + close: make(chan struct{}), + } } -func NewTestInputWithHeaders() *TestInput { - return &TestInput{bytes.NewReader([]byte("<13>1 2013-09-25T01:16:49.371356+00:00 host token web.1 - [meta sequenceId=\"1\"] message 1\n<13>1 2013-09-25T01:16:49.402923+00:00 host token web.1 - [meta sequenceId=\"2\"] message 2\n"))} +func (b *loopingBuffer) Read(p []byte) (n int, err error) { + for n < len(p) && err == nil { + select { + case <-b.close: + return n, io.EOF + default: + } + + c := copy(p[n:], b.b[b.p:]) + n += c + b.p += c + + b.mu.Lock() + b.r += c + b.mu.Unlock() + if b.p >= len(b.b) { + b.p = 0 + return + } + } + return } -func (i *TestInput) Close() error { +func (b *loopingBuffer) Close() error { + close(b.close) return nil } +// bytesRead since last time this was called +func (b *loopingBuffer) bytesRead() int { + b.mu.Lock() + defer b.mu.Unlock() + v := b.r + b.r = 0 + return v +} + +func NewLongerTestInput() *bytes.Reader { + return bytes.NewReader(longerTestData) +} + +func NewTestInput() *bytes.Reader { + data := []byte(`Hello World +Test Line 2 +`) + return bytes.NewReader(data) +} + +func NewTestInputWithHeaders() *bytes.Reader { + data := []byte(`<13>1 2013-09-25T01:16:49.371356+00:00 host token web.1 - [meta sequenceId="1"] message 1 +<13>1 2013-09-25T01:16:49.402923+00:00 host token web.1 - [meta sequenceId="2"] message 2 +`) + return bytes.NewReader(data) +} + type noopTestHelper struct{} func (th *noopTestHelper) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -91,7 +146,7 @@ func TestIntegration(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.WaitForReadersToFinish() shut.Land() @@ -122,7 +177,7 @@ func TestInputFormatRFC5424Integration(t *testing.T) { shut := NewShuttle(config) input := NewTestInputWithHeaders() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.WaitForReadersToFinish() shut.Land() @@ -149,7 +204,7 @@ func TestDrops(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.Drops.Add(1) shut.Drops.Add(1) @@ -187,7 +242,7 @@ func TestLost(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.Lost.Add(1) @@ -228,7 +283,7 @@ func TestUserAgentHeader(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.WaitForReadersToFinish() @@ -256,7 +311,7 @@ func TestRequestId(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(input) + shut.LoadReader(ioutil.NopCloser(input)) shut.Launch() shut.WaitForReadersToFinish() shut.Land() @@ -276,20 +331,19 @@ func BenchmarkPipeline(b *testing.B) { config.LogsURL = ts.URL config.InputFormat = InputFormatRaw - b.ResetTimer() + shut := NewShuttle(config) + input := NewLoopingBuffer(longerTestData) + shut.LoadReader(input) + shut.Launch() + var tb int for i := 0; i < b.N; i++ { - b.StopTimer() - shut := NewShuttle(config) - input := NewLongerTestInput() - b.SetBytes(int64(input.Len())) - - shut.LoadReader(input) - - b.StartTimer() - shut.Launch() - shut.WaitForReadersToFinish() - shut.Land() + // This sleep is here to allow the reading goroutines to make + // progress reading. Open to better ideas. + time.Sleep(10 * time.Microsecond) + tb += input.bytesRead() } + b.SetBytes(int64(tb / b.N)) + shut.Land() } func ExampleShuttle() { @@ -298,6 +352,5 @@ func ExampleShuttle() { s := NewShuttle(config) s.LoadReader(os.Stdin) s.Launch() // Start up the batching/delivering go routines - s.WaitForReadersToFinish() - s.Land() // Spin down the batching/delivering go routines + s.Land() // Spin down the batching/delivering go routines } From 36d2c5c3a56a9ab881558f11d29b87f2a551a803 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Thu, 10 Mar 2016 09:18:58 -0800 Subject: [PATCH 4/9] Remove Batcher / Frontbuff This seems to be faster overall and should reduce memory usage. This effectively makes each Reader it's own Batcher and does away with the frontbuff. Mutexes are less overhead than sending on a channel, so benchmarks show more bytes being moved around over the same amount of time. In all there should be fewer copies of a different structs floating around as well. Plus no seperate Batcher goroutines. Running benchmarks on this branch vs the previous: ```console $ go test -cpu 1,4 -bench Pipe -benchmem . PASS BenchmarkPipeline 50 25605141 ns/op 45.76 MB/s 3263675 B/op 32618 allocs/op BenchmarkPipeline-4 2000 1311399 ns/op 30.85 MB/s 163586 B/op 1582 allocs/op $ git checkout remove_front_buff Switched to branch 'remove_front_buff' $ go test -cpu 1,4 -bench Pipe -benchmem . PASS BenchmarkPipeline 100 13675065 ns/op 110.69 MB/s 3564797 B/op 36274 allocs/op BenchmarkPipeline-4 2000 501283 ns/op 57.37 MB/s 96341 B/op 952 allocs/op ok github.com/heroku/log-shuttle 5.028s ``` --- batcher.go | 1 - batcher_test.go | 37 ------------- config.go | 6 --- log_line_reader_test.go | 37 +++++++------ reader.go | 113 ++++++++++++++++++++++++++++++++++------ shuttle.go | 69 ++++++++++-------------- shuttle_test.go | 7 ++- 7 files changed, 150 insertions(+), 120 deletions(-) delete mode 100644 batcher_test.go diff --git a/batcher.go b/batcher.go index f2405af..f51bc5e 100644 --- a/batcher.go +++ b/batcher.go @@ -25,7 +25,6 @@ type Batcher struct { // NewBatcher created an empty Batcher for the provided shuttle func NewBatcher(s *Shuttle) Batcher { return Batcher{ - inLogs: s.LogLines, drops: s.Drops, outBatches: s.Batches, timeout: s.config.WaitDuration, diff --git a/batcher_test.go b/batcher_test.go deleted file mode 100644 index e4feebc..0000000 --- a/batcher_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package shuttle - -import ( - "sync" - "testing" - "time" -) - -func ProduceLogLines(count int, c chan<- LogLine) { - ll := LogLine{ - line: TestData, - when: time.Now(), - } - for i := 0; i < count; i++ { - c <- ll - } -} - -func BenchmarkBatcher(b *testing.B) { - config := newTestConfig() - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - s := NewShuttle(config) - batcher := NewBatcher(s) - wg := new(sync.WaitGroup) - wg.Add(1) - b.StartTimer() - go func() { - defer wg.Done() - batcher.Batch() - }() - ProduceLogLines(TestProducerLines, s.LogLines) - close(s.LogLines) - wg.Wait() - } -} diff --git a/config.go b/config.go index e4222e3..6733f80 100644 --- a/config.go +++ b/config.go @@ -18,7 +18,6 @@ const ( const ( DefaultMaxLineLength = 10000 // Logplex max is 10000 bytes, so default to that DefaultInputFormat = InputFormatRaw - DefaultFrontBuff = 1000 DefaultBackBuff = 50 DefaultTimeout = 5 * time.Second DefaultWaitDuration = 250 * time.Millisecond @@ -34,7 +33,6 @@ const ( DefaultHostname = "shuttle" DefaultMsgID = "- -" DefaultLogsURL = "" - DefaultNumBatchers = 2 DefaultNumOutlets = 4 DefaultBatchSize = 500 DefaultID = "" @@ -65,9 +63,7 @@ type errData struct { type Config struct { MaxLineLength int BackBuff int - FrontBuff int BatchSize int - NumBatchers int NumOutlets int InputFormat int MaxAttempts int @@ -114,11 +110,9 @@ func NewConfig() Config { StatsInterval: time.Duration(DefaultStatsInterval), MaxAttempts: DefaultMaxAttempts, InputFormat: DefaultInputFormat, - NumBatchers: DefaultNumBatchers, NumOutlets: DefaultNumOutlets, WaitDuration: time.Duration(DefaultWaitDuration), BatchSize: DefaultBatchSize, - FrontBuff: DefaultFrontBuff, BackBuff: DefaultBackBuff, Timeout: time.Duration(DefaultTimeout), ID: DefaultID, diff --git a/log_line_reader_test.go b/log_line_reader_test.go index 89be8ff..837bdda 100644 --- a/log_line_reader_test.go +++ b/log_line_reader_test.go @@ -4,8 +4,6 @@ import ( "io" "sync" "testing" - - "github.com/rcrowley/go-metrics" ) const ( @@ -24,8 +22,7 @@ type InputProducer struct { } func NewInputProducer(c int) *InputProducer { - curr := 0 - tb := 0 + var curr, tb int return &InputProducer{Total: c, Curr: curr, TotalBytes: tb, Data: TestData} } @@ -46,7 +43,7 @@ type TestConsumer struct { *sync.WaitGroup } -func (tc TestConsumer) Consume(in <-chan LogLine) { +func (tc TestConsumer) Consume(in <-chan Batch) { tc.Add(1) go func() { defer tc.Done() @@ -55,45 +52,47 @@ func (tc TestConsumer) Consume(in <-chan LogLine) { }() } -func doBasicLogLineReaderBenchmark(b *testing.B, frontBuffSize int) { +func doBasicLogLineReaderBenchmark(b *testing.B, backBuffSize int) { b.ResetTimer() var tb int for i := 0; i < b.N; i++ { b.StopTimer() - logs := make(chan LogLine, frontBuffSize) - rdr := NewLogLineReader(logs, metrics.NewRegistry()) - testConsumer := TestConsumer{new(sync.WaitGroup)} - testConsumer.Consume(logs) + batches := make(chan Batch, backBuffSize) + s := NewShuttle(NewConfig()) llp := NewInputProducer(TestProducerLines) + rdr := NewLogLineReader(llp, s) + testConsumer := TestConsumer{new(sync.WaitGroup)} + testConsumer.Consume(batches) + b.StartTimer() - rdr.ReadLogLines(llp) + rdr.ReadLines() tb += llp.TotalBytes - close(logs) + close(batches) testConsumer.Wait() } b.SetBytes(int64(tb / b.N)) } -func BenchmarkLogLineReaderWithFrontBuffEqual0(b *testing.B) { +func BenchmarkLogLineReaderWithBackBuffEqual0(b *testing.B) { doBasicLogLineReaderBenchmark(b, 0) } -func BenchmarkLogLineReaderWithFrontBuffEqual1(b *testing.B) { +func BenchmarkLogLineReaderWithBackBuffEqual1(b *testing.B) { doBasicLogLineReaderBenchmark(b, 1) } -func BenchmarkLogLineReaderWithFrontBuffEqual100(b *testing.B) { +func BenchmarkLogLineReaderWithBackBuffEqual100(b *testing.B) { doBasicLogLineReaderBenchmark(b, 100) } -func BenchmarkLogLineReaderWithFrontBuffEqual1000(b *testing.B) { +func BenchmarkLogLineReaderWithBackBuffEqual1000(b *testing.B) { doBasicLogLineReaderBenchmark(b, 1000) } -func BenchmarkLogLineReaderWithFrontBuffEqual10000(b *testing.B) { +func BenchmarkLogLineReaderWithBackBuffEqual10000(b *testing.B) { doBasicLogLineReaderBenchmark(b, 10000) } -func BenchmarkLogLineReaderWithDefaultFrontBuff(b *testing.B) { - doBasicLogLineReaderBenchmark(b, DefaultFrontBuff) +func BenchmarkLogLineReaderWithDefaultBackBuff(b *testing.B) { + doBasicLogLineReaderBenchmark(b, DefaultBackBuff) } diff --git a/reader.go b/reader.go index 6f32e2c..703d5e4 100644 --- a/reader.go +++ b/reader.go @@ -3,6 +3,7 @@ package shuttle import ( "bufio" "io" + "sync" "time" "github.com/rcrowley/go-metrics" @@ -11,39 +12,121 @@ import ( // LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating // lines into a LogLine and emitting them on outbox type LogLineReader struct { - outbox chan<- LogLine - linesRead metrics.Counter + input io.ReadCloser // The input to read from + out chan<- Batch // Where to send batches + close chan struct{} + batchSize int // size of new batches + timeOut time.Duration // batch timeout + timer *time.Timer // timer to actually enforce timeout + drops *Counter + drop bool // Should we drop or block + + linesRead metrics.Counter + linesBatchedCount metrics.Counter + linesDroppedCount metrics.Counter + batchFillTime metrics.Timer + + mu *sync.Mutex // protects access to bewlo + b Batch } // NewLogLineReader constructs a new reader with it's own Outbox. -func NewLogLineReader(o chan<- LogLine, m metrics.Registry) LogLineReader { - return LogLineReader{ - outbox: o, - linesRead: metrics.GetOrRegisterCounter("lines.read", m), +func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader { + t := time.NewTimer(time.Second) + t.Stop() // we only need a timer running when we actually have log lines in the batch + + ll := LogLineReader{ + input: input, + out: s.Batches, + close: make(chan struct{}), + batchSize: s.config.BatchSize, + timeOut: s.config.WaitDuration, + timer: t, + drops: s.Drops, + drop: s.config.Drop, + + linesRead: metrics.GetOrRegisterCounter("lines.read", s.MetricsRegistry), + linesBatchedCount: metrics.GetOrRegisterCounter("lines.batched", s.MetricsRegistry), + linesDroppedCount: metrics.GetOrRegisterCounter("lines.dropped", s.MetricsRegistry), + batchFillTime: metrics.GetOrRegisterTimer("batch.fill", s.MetricsRegistry), + + mu: new(sync.Mutex), + b: NewBatch(s.config.BatchSize), } + + go ll.expireBatches() + + return &ll } -// ReadLogLines reads lines from the Reader and returns with an error if there -// is an error -func (rdr LogLineReader) ReadLogLines(input io.Reader) error { - rdrIo := bufio.NewReader(input) +func (rdr *LogLineReader) expireBatches() { + for { + select { + case <-rdr.close: + return + + case <-rdr.timer.C: + rdr.mu.Lock() + rdr.deliverOrDropCurrent() + rdr.mu.Unlock() + } + } +} + +//Close the reader for input +func (rdr *LogLineReader) Close() error { + return rdr.input.Close() +} + +// ReadLines from the input created for. Return any errors +// blocks until the underlying reader is closed +func (rdr *LogLineReader) ReadLines() error { + rdrIo := bufio.NewReader(rdr.input) for { line, err := rdrIo.ReadBytes('\n') if len(line) > 0 { currentLogTime := time.Now() - rdr.Enqueue(LogLine{line, currentLogTime}) + rdr.linesRead.Inc(1) + rdr.mu.Lock() + if full := rdr.b.Add(LogLine{line, currentLogTime}); full { + rdr.deliverOrDropCurrent() + } + if rdr.b.MsgCount() == 1 { // First line so restart the timer + rdr.timer.Reset(rdr.timeOut) + } + rdr.mu.Unlock() } if err != nil { + rdr.mu.Lock() + rdr.deliverOrDropCurrent() + rdr.mu.Unlock() + close(rdr.close) return err } } } -// Enqueue a single log line and increment the line counters -func (rdr LogLineReader) Enqueue(ll LogLine) { - rdr.outbox <- ll - rdr.linesRead.Inc(1) +// Should only be called when rdr.mu.Lock() is held +func (rdr *LogLineReader) deliverOrDropCurrent() { + rdr.timer.Stop() + // There is the possibility of a new batch being expired while this is happening. + // so guard against queueing up an empty batch + if c := rdr.b.MsgCount(); c > 0 { + if rdr.drop { + select { + case rdr.out <- rdr.b: + rdr.linesBatchedCount.Inc(int64(c)) + default: + rdr.linesDroppedCount.Inc(int64(c)) + rdr.drops.Add(c) + } + } else { + rdr.out <- rdr.b + rdr.linesBatchedCount.Inc(int64(c)) + } + rdr.b = NewBatch(rdr.batchSize) + } } diff --git a/shuttle.go b/shuttle.go index 8ba8b2e..edd7284 100644 --- a/shuttle.go +++ b/shuttle.go @@ -1,6 +1,7 @@ package shuttle import ( + "fmt" "io" "io/ioutil" "log" @@ -17,35 +18,31 @@ var ( // Shuttle is the main entry point into the library type Shuttle struct { LogLineReader - config Config - LogLines chan LogLine - Batches chan Batch - readers []io.ReadCloser - MetricsRegistry metrics.Registry - bWaiter, oWaiter, rWaiter *sync.WaitGroup - Drops, Lost *Counter - NewFormatterFunc NewHTTPFormatterFunc - Logger *log.Logger - ErrLogger *log.Logger + config Config + Batches chan Batch + readers []*LogLineReader + MetricsRegistry metrics.Registry + oWaiter, rWaiter *sync.WaitGroup + Drops, Lost *Counter + NewFormatterFunc NewHTTPFormatterFunc + Logger *log.Logger + ErrLogger *log.Logger } // NewShuttle returns a properly constructed Shuttle with a given config func NewShuttle(config Config) *Shuttle { - ll := make(chan LogLine, config.FrontBuff) + b := make(chan Batch, config.BackBuff) mr := metrics.NewRegistry() return &Shuttle{ config: config, - LogLineReader: NewLogLineReader(ll, mr), - LogLines: ll, - Batches: make(chan Batch, config.BackBuff), + Batches: b, Drops: NewCounter(0), Lost: NewCounter(0), MetricsRegistry: mr, NewFormatterFunc: config.FormatterFunc, - readers: make([]io.ReadCloser, 0), + readers: make([]*LogLineReader, 0), oWaiter: new(sync.WaitGroup), - bWaiter: new(sync.WaitGroup), rWaiter: new(sync.WaitGroup), Logger: discardLogger, ErrLogger: discardLogger, @@ -56,7 +53,13 @@ func NewShuttle(config Config) *Shuttle { // is the reverse of shutdown. func (s *Shuttle) Launch() { s.startOutlets() - s.startBatchers() + for _, rdr := range s.readers { + s.rWaiter.Add(1) + go func(rdr *LogLineReader) { + rdr.ReadLines() + s.rWaiter.Done() + }(rdr) + } } // startOutlet launches config.NumOutlets number of outlets. When inbox is @@ -65,22 +68,9 @@ func (s *Shuttle) startOutlets() { for i := 0; i < s.config.NumOutlets; i++ { s.oWaiter.Add(1) go func() { - defer s.oWaiter.Done() outlet := NewHTTPOutlet(s) outlet.Outlet() - }() - } -} - -// startBatchers starts config.NumBatchers number of batchers. When inLogs is -// closed the batchers will finsih up and exit. -func (s *Shuttle) startBatchers() { - for i := 0; i < s.config.NumBatchers; i++ { - s.bWaiter.Add(1) - go func() { - defer s.bWaiter.Done() - batcher := NewBatcher(s) - batcher.Batch() + s.oWaiter.Done() }() } } @@ -89,12 +79,8 @@ func (s *Shuttle) startBatchers() { // log-shuttle to track the readers for you. The errors returned by ReadLogLines // are discarded. func (s *Shuttle) LoadReader(rdr io.ReadCloser) { - s.rWaiter.Add(1) - s.readers = append(s.readers, rdr) - go func() { - s.ReadLogLines(rdr) - s.rWaiter.Done() - }() + r := NewLogLineReader(rdr, s) + s.readers = append(s.readers, r) } // CloseReaders closes all tracked readers and returns any errors returned by @@ -126,9 +112,10 @@ func (s *Shuttle) DockReaders() []error { // read is batched and delivered. A panic is likely to happen if Land() is // called before any readers passed to any ReadLogLines() calls aren't closed. func (s *Shuttle) Land() { + fmt.Println("doc readers") s.DockReaders() - close(s.LogLines) // Close the log line channel, all of the batchers will stop once they are done - s.bWaiter.Wait() // Wait for them to be done - close(s.Batches) // Close the batch channel, all of the outlets will stop once they are done - s.oWaiter.Wait() // Wait for them to be done + fmt.Println("closing batches") + close(s.Batches) // Close the batch channel, all of the outlets will stop once they are done + fmt.Println("outlet waiter") + s.oWaiter.Wait() // Wait for them to be done } diff --git a/shuttle_test.go b/shuttle_test.go index 5527343..6b512a5 100644 --- a/shuttle_test.go +++ b/shuttle_test.go @@ -2,6 +2,7 @@ package shuttle import ( "bytes" + "fmt" "io" "io/ioutil" "net/http" @@ -29,7 +30,6 @@ Ac lorem aliquam placerat. func newTestConfig() Config { // Defaults should be good for most tests config := NewConfig() - config.NumBatchers = 1 config.LogsURL = "http://" return config } @@ -123,6 +123,7 @@ type testHelper struct { } func (ts *testHelper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Println("REQUEST") var err error d, err := ioutil.ReadAll(r.Body) if err != nil { @@ -322,6 +323,10 @@ func TestRequestId(t *testing.T) { } } +type lenner interface { + Len() int +} + func BenchmarkPipeline(b *testing.B) { th := new(noopTestHelper) ts := httptest.NewServer(th) From 66905d84cfe5bd0eaebe5f7ba8addd4e59c34a44 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Fri, 11 Mar 2016 16:54:51 -0800 Subject: [PATCH 5/9] cleanup debugging + fix lint --- gzip_formatter_test.go | 2 +- shuttle.go | 4 ---- shuttle_test.go | 2 -- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/gzip_formatter_test.go b/gzip_formatter_test.go index aef0bcd..4e68285 100644 --- a/gzip_formatter_test.go +++ b/gzip_formatter_test.go @@ -32,7 +32,7 @@ func TestGzipFormatter(t *testing.T) { gr := NewGzipFormatter(f) if gr.MsgCount() != 1 { - t.Fatal(gr.MsgCount) + t.Fatal(gr.MsgCount()) } // read the compressed bytes diff --git a/shuttle.go b/shuttle.go index edd7284..c3ece7f 100644 --- a/shuttle.go +++ b/shuttle.go @@ -1,7 +1,6 @@ package shuttle import ( - "fmt" "io" "io/ioutil" "log" @@ -112,10 +111,7 @@ func (s *Shuttle) DockReaders() []error { // read is batched and delivered. A panic is likely to happen if Land() is // called before any readers passed to any ReadLogLines() calls aren't closed. func (s *Shuttle) Land() { - fmt.Println("doc readers") s.DockReaders() - fmt.Println("closing batches") close(s.Batches) // Close the batch channel, all of the outlets will stop once they are done - fmt.Println("outlet waiter") s.oWaiter.Wait() // Wait for them to be done } diff --git a/shuttle_test.go b/shuttle_test.go index 6b512a5..13da9a7 100644 --- a/shuttle_test.go +++ b/shuttle_test.go @@ -2,7 +2,6 @@ package shuttle import ( "bytes" - "fmt" "io" "io/ioutil" "net/http" @@ -123,7 +122,6 @@ type testHelper struct { } func (ts *testHelper) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Println("REQUEST") var err error d, err := ioutil.ReadAll(r.Body) if err != nil { From 69e4a1b9d2ca6a244c5845542ef008a72239a741 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Fri, 11 Mar 2016 17:18:33 -0800 Subject: [PATCH 6/9] Cleanup cli for changes --- cmd/log-shuttle/main.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/cmd/log-shuttle/main.go b/cmd/log-shuttle/main.go index 0b652f4..3e2698f 100644 --- a/cmd/log-shuttle/main.go +++ b/cmd/log-shuttle/main.go @@ -105,10 +105,12 @@ func parseFlags(c shuttle.Config) (shuttle.Config, error) { flag.DurationVar(&c.Timeout, "timeout", c.Timeout, "Duration to wait for a response from logs-url.") flag.IntVar(&c.MaxAttempts, "max-attempts", c.MaxAttempts, "Max number of retries.") - flag.IntVar(&c.NumBatchers, "num-batchers", c.NumBatchers, "The number of batchers to run.") + var b int + flag.IntVar(&b, "num-batchers", b, "[NO EFFECT/REMOVED] The number of batchers to run.") flag.IntVar(&c.NumOutlets, "num-outlets", c.NumOutlets, "The number of outlets to run.") flag.IntVar(&c.BatchSize, "batch-size", c.BatchSize, "Number of messages to pack into an application/logplex-1 http request.") - flag.IntVar(&c.FrontBuff, "front-buff", c.FrontBuff, "Number of messages to buffer in log-shuttle's input channel.") + var f int + flag.IntVar(&f, "front-buff", f, "[NO EFFECT/REMOVED] Number of messages to buffer in log-shuttle's input channel.") flag.IntVar(&c.BackBuff, "back-buff", c.BackBuff, "Number of batches to buffer before dropping.") flag.IntVar(&c.MaxLineLength, "max-line-length", c.MaxLineLength, "Number of bytes that the backend allows per line.") flag.IntVar(&c.KinesisShards, "kinesis-shards", c.KinesisShards, "Number of unique partition keys to use per app.") @@ -120,6 +122,14 @@ func parseFlags(c shuttle.Config) (shuttle.Config, error) { os.Exit(0) } + if f != 0 { + log.Println("Warning: Use of -front-buff is no longer supported. The flag has no effect and will be removed in the future.") + } + + if b != 0 { + log.Println("Warning: Use of -num-batchers is no longer supported. The flag has no effect and will be removed in the future.") + } + if statsAddr != "" { log.Println("Warning: Use of -stats-addr is deprecated and will be dropped in the future.") } @@ -245,13 +255,14 @@ func main() { s.ErrLogger = errLogger } + s.LoadReader(os.Stdin) + s.Launch() go LogFmtMetricsEmitter(s.MetricsRegistry, config.StatsSource, config.StatsInterval, s.Logger) - // Blocks until os.Stdin errors - s.ReadLogLines(os.Stdin) - os.Stdin.Close() + // blocks until the readers all exit + s.WaitForReadersToFinish() // Shutdown the shuttle. s.Land() From f6ed5f15f0f0054a515e76c78b3c92f636b6cd67 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Mon, 25 Apr 2016 13:41:23 -0700 Subject: [PATCH 7/9] Mutex Changes Don't need a pointer to the sync.Mutex. Zero value is fine. --- reader.go | 7 +++---- shuttle_test.go | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/reader.go b/reader.go index 703d5e4..b61a814 100644 --- a/reader.go +++ b/reader.go @@ -26,7 +26,7 @@ type LogLineReader struct { linesDroppedCount metrics.Counter batchFillTime metrics.Timer - mu *sync.Mutex // protects access to bewlo + mu sync.Mutex // protects access to below b Batch } @@ -50,8 +50,7 @@ func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader { linesDroppedCount: metrics.GetOrRegisterCounter("lines.dropped", s.MetricsRegistry), batchFillTime: metrics.GetOrRegisterTimer("batch.fill", s.MetricsRegistry), - mu: new(sync.Mutex), - b: NewBatch(s.config.BatchSize), + b: NewBatch(s.config.BatchSize), } go ll.expireBatches() @@ -109,7 +108,7 @@ func (rdr *LogLineReader) ReadLines() error { } } -// Should only be called when rdr.mu.Lock() is held +// Should only be called when rdr.mu is held func (rdr *LogLineReader) deliverOrDropCurrent() { rdr.timer.Stop() // There is the possibility of a new batch being expired while this is happening. diff --git a/shuttle_test.go b/shuttle_test.go index 13da9a7..018c16c 100644 --- a/shuttle_test.go +++ b/shuttle_test.go @@ -38,13 +38,12 @@ type loopingBuffer struct { close chan struct{} p int - mu *sync.Mutex + mu sync.Mutex r int } func NewLoopingBuffer(b []byte) *loopingBuffer { return &loopingBuffer{ - mu: new(sync.Mutex), b: b, close: make(chan struct{}), } From 36d4ffe122588567d40783b3445410fd67a7c646 Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Mon, 25 Apr 2016 13:42:14 -0700 Subject: [PATCH 8/9] Just return an io.ReadCloser from the test helpers This is what's needed, no need to explictly do this inline, just return one. --- shuttle_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/shuttle_test.go b/shuttle_test.go index 018c16c..0532029 100644 --- a/shuttle_test.go +++ b/shuttle_test.go @@ -90,18 +90,18 @@ func NewLongerTestInput() *bytes.Reader { return bytes.NewReader(longerTestData) } -func NewTestInput() *bytes.Reader { +func NewTestInput() io.ReadCloser { data := []byte(`Hello World Test Line 2 `) - return bytes.NewReader(data) + return ioutil.NopCloser(bytes.NewReader(data)) } -func NewTestInputWithHeaders() *bytes.Reader { +func NewTestInputWithHeaders() io.ReadCloser { data := []byte(`<13>1 2013-09-25T01:16:49.371356+00:00 host token web.1 - [meta sequenceId="1"] message 1 <13>1 2013-09-25T01:16:49.402923+00:00 host token web.1 - [meta sequenceId="2"] message 2 `) - return bytes.NewReader(data) + return ioutil.NopCloser(bytes.NewReader(data)) } type noopTestHelper struct{} @@ -144,7 +144,7 @@ func TestIntegration(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.WaitForReadersToFinish() shut.Land() @@ -175,7 +175,7 @@ func TestInputFormatRFC5424Integration(t *testing.T) { shut := NewShuttle(config) input := NewTestInputWithHeaders() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.WaitForReadersToFinish() shut.Land() @@ -202,7 +202,7 @@ func TestDrops(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.Drops.Add(1) shut.Drops.Add(1) @@ -240,7 +240,7 @@ func TestLost(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.Lost.Add(1) @@ -281,7 +281,7 @@ func TestUserAgentHeader(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.WaitForReadersToFinish() @@ -309,7 +309,7 @@ func TestRequestId(t *testing.T) { shut := NewShuttle(config) input := NewTestInput() - shut.LoadReader(ioutil.NopCloser(input)) + shut.LoadReader(input) shut.Launch() shut.WaitForReadersToFinish() shut.Land() From a6c6c2bf4d9e87021a42068c4ed5ec9610e2648d Mon Sep 17 00:00:00 2001 From: Edward Muller Date: Tue, 26 Apr 2016 10:24:23 -0700 Subject: [PATCH 9/9] Test cleanup No need to declare curr/tb as they InputProducer's zero values are fine. Re-use the TestConsumer --- log_line_reader_test.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/log_line_reader_test.go b/log_line_reader_test.go index 837bdda..b5b61d5 100644 --- a/log_line_reader_test.go +++ b/log_line_reader_test.go @@ -22,8 +22,7 @@ type InputProducer struct { } func NewInputProducer(c int) *InputProducer { - var curr, tb int - return &InputProducer{Total: c, Curr: curr, TotalBytes: tb, Data: TestData} + return &InputProducer{Total: c, Data: TestData} } func (llp *InputProducer) Read(p []byte) (n int, err error) { @@ -40,10 +39,10 @@ func (llp InputProducer) Close() error { } type TestConsumer struct { - *sync.WaitGroup + sync.WaitGroup } -func (tc TestConsumer) Consume(in <-chan Batch) { +func (tc *TestConsumer) Consume(in <-chan Batch) { tc.Add(1) go func() { defer tc.Done() @@ -55,20 +54,20 @@ func (tc TestConsumer) Consume(in <-chan Batch) { func doBasicLogLineReaderBenchmark(b *testing.B, backBuffSize int) { b.ResetTimer() var tb int + var tc TestConsumer for i := 0; i < b.N; i++ { b.StopTimer() batches := make(chan Batch, backBuffSize) + tc.Consume(batches) s := NewShuttle(NewConfig()) llp := NewInputProducer(TestProducerLines) rdr := NewLogLineReader(llp, s) - testConsumer := TestConsumer{new(sync.WaitGroup)} - testConsumer.Consume(batches) - b.StartTimer() + rdr.ReadLines() tb += llp.TotalBytes close(batches) - testConsumer.Wait() + tc.Wait() } b.SetBytes(int64(tb / b.N)) }