Skip to content

Commit

Permalink
Merge pull request #37 from vmware/spentakota_passContext
Browse files Browse the repository at this point in the history
fix: pass in ctx with cancel for renewLease
  • Loading branch information
spentakota authored Apr 7, 2023
2 parents 16c5c53 + 4482696 commit b12921d
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions clientlibrary/worker/polling-shard-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error {
defer sc.releaseLease(sc.shard.ID)
ctx, cancelFunc := context.WithCancel(context.Background())
defer func() {
// cancel renewLease()
cancelFunc()
sc.releaseLease(sc.shard.ID)
}()

log := sc.kclConfig.Logger

Expand Down Expand Up @@ -133,10 +138,11 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.callsLeft = kinesisReadTPSLimit
sc.bytesRead = 0
sc.remBytes = MaxBytes

// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease()
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
for {
getRecordsStartTime := time.Now()
Expand Down Expand Up @@ -300,18 +306,29 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
return getResp, 0, err
}

func (sc *PollingShardConsumer) renewLease() error {
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
for {
time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond)
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
timer := time.NewTimer(renewDuration)
select {
case <-timer.C:
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case <-ctx.Done():
// clean up timer resources
if !timer.Stop() {
<-timer.C
}
log.Debugf("renewLease was canceled")
return nil
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
}

0 comments on commit b12921d

Please sign in to comment.