Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Fix: Validate context is canceled du…
Browse files Browse the repository at this point in the history
…ring retries (open-telemetry#30308)

**Description:** Validates that the context is canceled in order to
avoid unnecessary retries in the prometheus remote write exporter


**Link to tracking Issue:** No tracking issue.

**Testing:** Unit tests were added.

---------

Signed-off-by: Raphael Silva <[email protected]>
  • Loading branch information
rapphil authored and cparkins committed Jan 10, 2024
1 parent 4561135 commit d6f2dfa
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 57 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_context_retries_prw.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Check if the context was canceled by a timeout in the component level to avoid unnecessary retries.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30308]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 9 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ

// executeFunc can be used for backoff and non backoff scenarios.
executeFunc := func() error {
// check there was no timeout in the component level to avoid retries
// to continue to run after a timeout
select {
case <-ctx.Done():
return backoff.Permanent(ctx.Err())
default:
// continue
}

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
Expand Down
131 changes: 74 additions & 57 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,69 +998,86 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
assert.Equal(t, gotFromWAL, gotFromUpload)
}

func TestRetryOn5xx(t *testing.T) {
// Create a mock HTTP server with a counter to simulate a 5xx error on the first attempt and a 2xx success on the second attempt
attempts := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if attempts < 4 {
attempts++
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
}))
defer mockServer.Close()

endpointURL, err := url.Parse(mockServer.URL)
require.NoError(t, err)

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
retrySettings: configretry.BackOffConfig{
Enabled: true,
},
}

ctx := context.Background()

// Execute the write request and verify that the exporter returns a non-permanent error on the first attempt.
err = exporter.execute(ctx, &prompb.WriteRequest{})
assert.NoError(t, err)
assert.Equal(t, 4, attempts)
func canceledContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}

func TestNoRetryOn4xx(t *testing.T) {
// Create a mock HTTP server with a counter to simulate a 4xx error
attempts := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if attempts < 1 {
attempts++
http.Error(w, "Bad Request", http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusOK)
}
}))
defer mockServer.Close()
func assertPermanentConsumerError(t assert.TestingT, err error, _ ...any) bool {
return assert.True(t, consumererror.IsPermanent(err), "error should be consumererror.Permanent")
}

endpointURL, err := url.Parse(mockServer.URL)
require.NoError(t, err)
func TestRetries(t *testing.T) {

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
retrySettings: configretry.BackOffConfig{
Enabled: true,
tts := []struct {
name string
serverErrorCount int // number of times server should return error
expectedAttempts int
httpStatus int
assertError assert.ErrorAssertionFunc
assertErrorType assert.ErrorAssertionFunc
ctx context.Context
}{
{
"test 5xx should retry",
3,
4,
http.StatusInternalServerError,
assert.NoError,
assert.NoError,
context.Background(),
},
{
"test 4xx should not retry",
4,
1,
http.StatusBadRequest,
assert.Error,
assertPermanentConsumerError,
context.Background(),
},
{
"test timeout context should not execute",
4,
0,
http.StatusInternalServerError,
assert.Error,
assertPermanentConsumerError,
canceledContext(),
},
}

ctx := context.Background()
for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {
totalAttempts := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if totalAttempts < tt.serverErrorCount {
http.Error(w, http.StatusText(tt.httpStatus), tt.httpStatus)
} else {
w.WriteHeader(http.StatusOK)
}
totalAttempts++
},
))
defer mockServer.Close()

endpointURL, err := url.Parse(mockServer.URL)
require.NoError(t, err)

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
retrySettings: configretry.BackOffConfig{
Enabled: true,
},
}

// Execute the write request and verify that the exporter returns an error due to the 4xx response.
err = exporter.execute(ctx, &prompb.WriteRequest{})
assert.Error(t, err)
assert.True(t, consumererror.IsPermanent(err))
assert.Equal(t, 1, attempts)
err = exporter.execute(tt.ctx, &prompb.WriteRequest{})
tt.assertError(t, err)
tt.assertErrorType(t, err)
assert.Equal(t, tt.expectedAttempts, totalAttempts)
})
}
}

0 comments on commit d6f2dfa

Please sign in to comment.