Skip to content

Commit

Permalink
optimize logger output under write timeout failed
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Sep 8, 2024
1 parent 6d1bfcc commit 6f110b0
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,18 +452,11 @@ func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, dat

if err == nil && len(shardMappings.Dropped) > 0 {
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}

}
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout.C:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
// return timeout error to caller
return ErrTimeout
case err := <-ch:
if err != nil {
return err
Expand Down Expand Up @@ -519,6 +512,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.
go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
if w.MetaClient.NodeID() == owner.NodeID {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
// Except tsdb.ErrShardNotFound no error can be handled here
err := writeToShard(shardID, points)
if err == tsdb.ErrShardNotFound {
// Shard doesn't exist -- lets create it and try again..
Expand All @@ -527,9 +521,11 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.
// retry the write
err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
if err != nil {
w.Logger.Warn("Write failed with creating shard", zap.Uint64("node_id", owner.NodeID), zap.Uint64("shard_id", shardID), zap.Error(err))
ch <- &AsyncWriteResult{owner, err}
return
}
// Now that we've created the shard, try to write to it again.
err = writeToShard(shardID, points)
}
ch <- &AsyncWriteResult{owner, err}
Expand Down Expand Up @@ -574,15 +570,17 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.
}

var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
case <-timeout.C:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
// return timeout error to caller
w.Logger.Warn("Write failed with writing to shard", zap.Uint64("shard_id", shard.ID), zap.Float64("write_timeout", w.WriteTimeout.Seconds()), zap.Error(ErrTimeout))
return ErrTimeout
case result := <-ch:
// If the write returned an error, continue to the next response
Expand Down

0 comments on commit 6f110b0

Please sign in to comment.