Skip to content
This repository has been archived by the owner on Dec 19, 2023. It is now read-only.

Commit

Permalink
Fix race condition (marcinwyszynski#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinwyszynski authored Nov 11, 2021
1 parent b249b45 commit 173cd57
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
1 change: 1 addition & 0 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (g *groupImpl) Open(ctx context.Context, streamName string) io.ReadCloser {
func (g *groupImpl) create(ctx context.Context, streamName string) (*writerImpl, error) {
ret := &writerImpl{
client: g,
closeChan: make(chan struct{}),
ctx: ctx,
events: newEventsBuffer(),
groupName: aws.String(g.groupName),
Expand Down
2 changes: 0 additions & 2 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func (gs *groupTestSuite) TestCreateWithExistingStream_OK() {
}

func (gs *groupTestSuite) TestCreateWithExistingStream_UnexpectedFailure() {
const sequenceToken = "sequenceToken"

gs.creatingLogStreamReturns(errors.New("bacon"))

writer, err := gs.sut.Create(gs.ctx, gs.streamName)
Expand Down
3 changes: 1 addition & 2 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cloudwatch

import (
"context"
"fmt"
"io"

"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
Expand All @@ -17,7 +16,7 @@ type RejectedLogEventsInfoError struct {
}

func (e *RejectedLogEventsInfoError) Error() string {
return fmt.Sprintf("log messages were rejected")
return "log messages were rejected"
}

// CreateOption allows setting various options on the resulting writer.
Expand Down
24 changes: 14 additions & 10 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type writerImpl struct {

ctx context.Context

closed bool
err error
closeChan chan (struct{})
closed bool
err error

events *eventsBuffer
nowFunc func() time.Time
Expand Down Expand Up @@ -71,15 +72,15 @@ func (w *writerImpl) Write(b []byte) (int, error) {
}

// Start continuously flushing the buffered events.
func (w *writerImpl) start() error {
func (w *writerImpl) start() (err error) {
for {
// Exit if the stream is closed.
if w.closed {
return nil
}

if err := w.flushTrottled(); err != nil {
return err
select {
case <-w.closeChan:
return
case <-w.throttle.C:
if err = w.flushBatch(); err != nil {
return
}
}
}
}
Expand All @@ -90,11 +91,14 @@ func (w *writerImpl) Close() error {
defer w.throttle.Stop()

w.closed = true
close(w.closeChan)

for w.events.hasMore() {
if w.flushTrottled() != nil {
break
}
}

return w.err
}

Expand Down

0 comments on commit 173cd57

Please sign in to comment.