diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 503ac73..a72f251 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -452,18 +452,11 @@ func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, dat if err == nil && len(shardMappings.Dropped) > 0 { err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)} - } - timeout := time.NewTimer(w.WriteTimeout) - defer timeout.Stop() for range shardMappings.Points { select { case <-w.closing: return ErrWriteFailed - case <-timeout.C: - atomic.AddInt64(&w.stats.WriteTimeout, 1) - // return timeout error to caller - return ErrTimeout case err := <-ch: if err != nil { return err @@ -519,6 +512,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. go func(shardID uint64, owner meta.ShardOwner, points []models.Point) { if w.MetaClient.NodeID() == owner.NodeID { atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points))) + // Except tsdb.ErrShardNotFound no error can be handled here err := writeToShard(shardID, points) if err == tsdb.ErrShardNotFound { // Shard doesn't exist -- lets create it and try again.. @@ -527,9 +521,11 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. // retry the write err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true) if err != nil { + w.Logger.Warn("Write failed with creating shard", zap.Uint64("node_id", owner.NodeID), zap.Uint64("shard_id", shardID), zap.Error(err)) ch <- &AsyncWriteResult{owner, err} return } + // Now that we've created the shard, try to write to it again. err = writeToShard(shardID, points) } ch <- &AsyncWriteResult{owner, err} @@ -574,15 +570,17 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. } var wrote int - timeout := time.After(w.WriteTimeout) var writeError error + timeout := time.NewTimer(w.WriteTimeout) + defer timeout.Stop() for range shard.Owners { select { case <-w.closing: return ErrWriteFailed - case <-timeout: + case <-timeout.C: atomic.AddInt64(&w.stats.WriteTimeout, 1) // return timeout error to caller + w.Logger.Warn("Write failed with writing to shard", zap.Uint64("shard_id", shard.ID), zap.Float64("write_timeout", w.WriteTimeout.Seconds()), zap.Error(ErrTimeout)) return ErrTimeout case result := <-ch: // If the write returned an error, continue to the next response