From 0eb9e5bf91afc71c185c125673002a56b2a51ff3 Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Fri, 6 Dec 2024 20:31:06 -0300 Subject: [PATCH] Remove race condition when accessing remote bulker map (#4171) Use the remoteOutputMutex whenever accessing the bulkerMap. Change GetBulkerMap to return a copy of the map so that remote output health will not conflict with adding/removing a bulker from the map. (cherry picked from commit 924ea0711ce6f75305c41949bd7279ed24648e7a) --- ...6-Remove-race-in-remote-bulker-access.yaml | 32 ++++++++++ internal/pkg/bulk/bulk_remote_output_test.go | 59 ++++++++++++++++++- internal/pkg/bulk/engine.go | 15 ++++- 3 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml diff --git a/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml b/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml new file mode 100644 index 000000000..d8cf85bea --- /dev/null +++ b/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Remove race in remote bulker access + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: fleet-server + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/fleet-server/issues/4170 diff --git a/internal/pkg/bulk/bulk_remote_output_test.go b/internal/pkg/bulk/bulk_remote_output_test.go index b61f54d9e..2a6c34a0d 100644 --- a/internal/pkg/bulk/bulk_remote_output_test.go +++ b/internal/pkg/bulk/bulk_remote_output_test.go @@ -12,6 +12,8 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) { @@ -78,9 +80,10 @@ func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + log := testlog.SetLogger(t) bulker := NewBulker(nil, nil) bulker.remoteOutputConfigMap["remote1"] = tc.cfg - hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(zerolog.Nop(), "remote1", tc.newCfg) + hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg) assert.Equal(t, tc.changed, hasChanged) assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"]) }) @@ -148,3 +151,57 @@ func Test_CreateAndGetBulkerChanged(t *testing.T) { assert.Nil(t, err) assert.Equal(t, true, cancelFnCalled) } + +func Benchmark_CreateAndGetBulker(b *testing.B) { + b.Skip("Crashes on remote runner") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log := zerolog.Nop() + outputMap := map[string]map[string]any{ + "remote1": map[string]any{ + "type": "remote_elasticsearch", + "hosts": []interface{}{"https://remote-es:443"}, + "service_token": "token1", + }, + } + b.Run("new remote bulker", func(b *testing.B) { + bulker := NewBulker(nil, nil) + b.ReportAllocs() + for range b.N { + b.StopTimer() + bulker.bulkerMap = make(map[string]Bulk) + bulker.remoteOutputConfigMap = make(map[string]map[string]any) + b.StartTimer() + + bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap) + } + }) + b.Run("existing remote bulker", func(b *testing.B) { + bulker := NewBulker(nil, nil) + outputBulker := NewBulker(nil, nil) + bulker.bulkerMap["remote1"] = outputBulker + bulker.remoteOutputConfigMap["remote1"] = outputMap["remote1"] + b.ResetTimer() + b.ReportAllocs() + for range b.N { + bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap) + } + }) + b.Run("changed remote bulker", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + b.StopTimer() + bulker := NewBulker(nil, nil) + outputBulker := NewBulker(nil, nil) + bulker.bulkerMap["remote1"] = outputBulker + bulker.remoteOutputConfigMap["remote1"] = map[string]any{ + "type": "remote_elasticsearch", + "hosts": []interface{}{"https://remote-es:443"}, + "service_token": "wrong token", + } + b.StartTimer() + + bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap) + } + }) +} diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 7f28d4451..e5b5061ed 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -128,11 +128,20 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker } func (b *Bulker) GetBulker(outputName string) Bulk { + b.remoteOutputMutex.RLock() + defer b.remoteOutputMutex.RUnlock() return b.bulkerMap[outputName] } +// GetBulkerMap returns a copy of the remote output bulkers func (b *Bulker) GetBulkerMap() map[string]Bulk { - return b.bulkerMap + mp := make(map[string]Bulk) + b.remoteOutputMutex.RLock() + for k, v := range b.bulkerMap { + mp[k] = v + } + b.remoteOutputMutex.RUnlock() + return mp } func (b *Bulker) CancelFn() context.CancelFunc { @@ -153,7 +162,9 @@ func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) { // if changed, stop the existing bulker and create a new one func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) { hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName]) + b.remoteOutputMutex.RLock() bulker := b.bulkerMap[outputName] + b.remoteOutputMutex.RUnlock() if bulker != nil && !hasConfigChanged { return bulker, false, nil } @@ -426,6 +437,8 @@ func (b *Bulker) Run(ctx context.Context) error { // cancelling context of each remote bulker when Run exits defer func() { + b.remoteOutputMutex.RLock() + defer b.remoteOutputMutex.RUnlock() for _, bulker := range b.bulkerMap { bulker.CancelFn()() }