Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Manual Checkpointing #49

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type checkpointer struct {
mutex sync.Mutex
finished bool
finalSequenceNumber string
updateSequencer chan struct{}
}

type checkpointRecord struct {
Expand Down Expand Up @@ -243,6 +244,32 @@ func (cp *checkpointer) update(sequenceNumber string) {
cp.sequenceNumber = sequenceNumber
}

// updateFunc returns a function that will update to sequenceNumber when called, but maintains ordering
func (cp *checkpointer) updateFunc(sequenceNumber string) func() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
// cp.updateSequencer represents whether the previous updateFunc has been called
// If nil there is no previous so we should act like there was one already called
if cp.updateSequencer == nil {
cp.updateSequencer = make(chan struct{})
close(cp.updateSequencer)
}
// Copy the previous channel and create a new one for the link to the next updateFunc
updateSequencer := cp.updateSequencer
cp.updateSequencer = make(chan struct{})
// Return everything in a closure to ensure references are maintained properly
return func(prev chan struct{}, sequenceNumber string, next chan struct{}) func() {
var once sync.Once
return func() {
once.Do(func() {
<-prev // Wait for all prior updateFuncs to be called
cp.update(sequenceNumber) // Actually perform the update
close(next) // Allow the next updateFunc to be called
})
}
}(updateSequencer, sequenceNumber, cp.updateSequencer)
}

// finish marks the given sequence number as the final one for the shard.
// sequenceNumber is the empty string if we never read anything from the shard.
func (cp *checkpointer) finish(sequenceNumber string) {
Expand Down
14 changes: 12 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

// Config holds all configuration values for a single Kinsumer instance
type Config struct {
stats StatReceiver
logger Logger
stats StatReceiver
logger Logger
manualCheckpointing bool

// ---------- [ Per Shard Worker ] ----------
// Time to sleep if no records are found
Expand Down Expand Up @@ -59,6 +60,15 @@ func NewConfig() Config {
}
}

// WithManualCheckpointing returns a Config with a modified manual checkpointing flag
// If set to false, records will be automatically checkpointed upon calls to Next()
// If set to true, NextWithCheckpointer() must be used and the returned checkpointer function
// must be called when the record is fully processed.
func (c Config) WithManualCheckpointing(v bool) Config {
c.manualCheckpointing = v
return c
}

// WithThrottleDelay returns a Config with a modified throttle delay
func (c Config) WithThrottleDelay(delay time.Duration) Config {
c.throttleDelay = delay
Expand Down
34 changes: 33 additions & 1 deletion kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ func (k *Kinsumer) Run() error {
return
case record = <-input:
case output <- record:
record.checkpointer.update(aws.StringValue(record.record.SequenceNumber))
if !k.config.manualCheckpointing {
record.checkpointer.update(aws.StringValue(record.record.SequenceNumber))
}
record = nil
case se := <-k.shardErrors:
k.errors <- fmt.Errorf("shard error (%s) in %s: %s", se.shardID, se.action, se.err)
Expand Down Expand Up @@ -452,6 +454,10 @@ func (k *Kinsumer) Stop() {
// if err is non nil an error occurred in the system.
// if err is nil and data is nil then kinsumer has been stopped
func (k *Kinsumer) Next() (data []byte, err error) {
if k.config.manualCheckpointing {
return nil, fmt.Errorf("manual checkpointing is enabled, use NextWithCheckpointer() instead")
}

select {
case err = <-k.errors:
return nil, err
Expand All @@ -465,6 +471,32 @@ func (k *Kinsumer) Next() (data []byte, err error) {
return data, err
}

// NextWithCheckpointer is a blocking function used to get the next record from the kinesis queue, or errors that
// occurred during the processing of kinesis. It's up to the caller to stop processing by calling 'Stop()'
// checkpointer must be called when the record is fully processed. Kinsumer will ensure checkpointer calls are ordered.
// WARNING: checkpointer() can block indefinitely if not called in order.
//
// if err is non nil an error occurred in the system.
// if err is nil and data is nil then kinsumer has been stopped
func (k *Kinsumer) NextWithCheckpointer() (data []byte, checkpointer func(), err error) {
if !k.config.manualCheckpointing {
return nil, nil, fmt.Errorf("manual checkpointing is disabled, use Next() instead")
}

select {
case err = <-k.errors:
return nil, nil, err
case record, ok := <-k.output:
if ok {
k.config.stats.EventToClient(*record.record.ApproximateArrivalTimestamp, record.retrievedAt)
data = record.record.Data
checkpointer = record.checkpointer.updateFunc(aws.StringValue(record.record.SequenceNumber))
}
}

return data, checkpointer, err
}

// CreateRequiredTables will create the required dynamodb tables
// based on the applicationName
func (k *Kinsumer) CreateRequiredTables() error {
Expand Down