From 0c578e16b0bae00c3534dd09252c291dcd9980e1 Mon Sep 17 00:00:00 2001 From: matt durham Date: Sun, 9 Feb 2025 16:03:12 -0500 Subject: [PATCH 01/18] Add parralelism block. --- CHANGELOG.md | 4 + .../prometheus/prometheus.write.queue.md | 29 +- go.mod | 26 +- go.sum | 46 +- .../prometheus/write/queue/e2e_bench_test.go | 126 ---- .../prometheus/write/queue/e2e_stats_test.go | 711 ------------------ .../prometheus/write/queue/e2e_test.go | 427 ----------- .../component/prometheus/write/queue/types.go | 67 +- .../prometheus/write/queue/types_test.go | 83 ++ 9 files changed, 220 insertions(+), 1299 deletions(-) delete mode 100644 internal/component/prometheus/write/queue/e2e_bench_test.go delete mode 100644 internal/component/prometheus/write/queue/e2e_stats_test.go delete mode 100644 internal/component/prometheus/write/queue/e2e_test.go create mode 100644 internal/component/prometheus/write/queue/types_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be2be5511..5ec8ded1dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Main (unreleased) ----------------- +### Breaking changes + +- (_Experimental_) Changed `parallelism` from attribute to block to allow dynamic scaling of `prometheus.write.queue`. (@mattdurham) + ### Features - Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index c8f2eeb397..921a2bc41a 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -53,6 +53,7 @@ The following blocks are supported inside the definition of persistence | [persistence][] | Configuration for persistence | no endpoint | [endpoint][] | Location to send metrics to. | no endpoint > basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the endpoint. | no +endpoint > parralelism | [parralelism][] | Configure parralelism for the endpoint. | no The `>` symbol indicates deeper levels of nesting. For example, `endpoint > basic_auth` refers to a `basic_auth` block defined inside an @@ -61,6 +62,7 @@ basic_auth` refers to a `basic_auth` block defined inside an [endpoint]: #endpoint-block [basic_auth]: #basic_auth-block [persistence]: #persistence-block +[parralelism]: #parralelism ### persistence block @@ -86,13 +88,12 @@ The following arguments are supported: Name | Type | Description | Default | Required ----------------------|---------------|-----------------------------------------------------------------|---------|---------- `url` | `string` | Full URL to send metrics to. | | yes -`bearer_token` | `secret` | Bearer token to authenticate with. | | no + `bearer_token` | `secret` | Bearer token to authenticate with. | | no `write_timeout` | `duration` | Timeout for requests made to the URL. | `"30s"` | no `retry_backoff` | `duration` | How long to wait between retries. | `1s` | no `max_retry_attempts` | `uint` | Maximum number of retries before dropping the batch. | `0` | no `batch_count` | `uint` | How many series to queue in each queue. | `1000` | no `flush_interval` | `duration` | How long to wait until sending if `batch_count` is not trigger. | `1s` | no - `parallelism` | `uint` | How many parallel batches to write. | 10 | no `external_labels` | `map(string)` | Labels to add to metrics sent over the network. | | no ### basic_auth block @@ -102,6 +103,30 @@ Name | Type | Description | Default `password` | `secret` | Basic auth password. | | no `username` | `string` | Basic auth username. | | no +### parralelism block + +Name | Type | Description | Default | Required +----------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|--------- +`drift_scale_up_seconds` | `int` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no +`drift_scale_down_seconds` | `int` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no +`max_connections` | `uint` | The maximum number of desired connections. | `50` | no +`min_connections` | `uint` | The minimum number of desired connections. | `2` | no +`network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no +`desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no +`desired_check_interval` | `duration` | The length of time that between checking for desired connections. | `5s` | no +`allowed_network_error_percent` | `float` | The allowed error rate before scaling down. | `0.50` | no + +Parralelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs. +By determining the drift between the incoming and outgoing timestamps that will determine whether to increase or decrease the +desired connections. This is represented by `drift_scale_up_seconds` and `drift_scale_down_seconds`, if the drift is between these +two values then the value will stay the same. Network success and failures are recorded and kept in memory, this helps determine +the nature of the drift. For instance if the drift is increasing but the network failures are increasing we should not increase +desired connections since that would only increase load on the endpoint. The last major part is to prevent flapping of desired connections. +This is accomplished with the `desired_check_interval`, each time a desired connection is calculated it is added to a list, before actually changing the +desired connection the system will choose the highest value in the lookback. Example; for the past 5 minutes desired connections have been: [2,1,1] the check runs +and determines that the desired connections are 1, but will not change the value since the value 2 is still in the lookback. On the next check we have [1,1,1], +now it will change to 1. In general the system is fast to increase and slow to decrease. + ## Exported fields diff --git a/go.mod b/go.mod index acd1199831..9e60673bee 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,7 @@ require ( github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 github.com/grafana/vmware_exporter v0.0.5-beta - github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94 + github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7 github.com/hashicorp/consul/api v1.29.5 github.com/hashicorp/go-discover v0.0.0-20230724184603-e89ebd1b2f65 github.com/hashicorp/go-multierror v1.1.1 @@ -162,7 +162,7 @@ require ( github.com/prometheus/blackbox_exporter v0.24.1-0.20230623125439-bd22efa1c900 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_model v0.6.1 - github.com/prometheus/common v0.60.1 + github.com/prometheus/common v0.61.0 github.com/prometheus/common/sigv4 v0.1.0 github.com/prometheus/consul_exporter v0.8.0 github.com/prometheus/memcached_exporter v0.13.0 @@ -248,18 +248,18 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.29.0 + golang.org/x/crypto v0.30.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 - golang.org/x/net v0.31.0 - golang.org/x/oauth2 v0.23.0 - golang.org/x/sys v0.27.0 - golang.org/x/text v0.20.0 + golang.org/x/net v0.32.0 + golang.org/x/oauth2 v0.24.0 + golang.org/x/sys v0.28.0 + golang.org/x/text v0.21.0 golang.org/x/time v0.6.0 golang.org/x/tools v0.25.0 google.golang.org/api v0.188.0 google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/protobuf v1.35.2 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible @@ -803,8 +803,8 @@ require ( go4.org/netipx v0.0.0-20230125063823-8449b0a6169f // indirect golang.org/x/arch v0.7.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/sync v0.9.0 - golang.org/x/term v0.26.0 // indirect + golang.org/x/sync v0.10.0 + golang.org/x/term v0.27.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect @@ -861,6 +861,12 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) +require ( + github.com/deneonet/benc v1.1.2 // indirect + github.com/panjf2000/ants/v2 v2.11.0 // indirect + golang.design/x/chann v0.1.2 // indirect +) + // NOTE: replace directives below must always be *temporary*. // // Adding a replace directive to change a module to a fork of a module will diff --git a/go.sum b/go.sum index 850c4de59d..f50687f8da 100644 --- a/go.sum +++ b/go.sum @@ -1314,6 +1314,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deneonet/benc v1.1.2 h1:JNJSnA53zVLjt4Bz1HwxG4tQg475LP+kd8rgUuV4tc4= +github.com/deneonet/benc v1.1.2/go.mod h1:HbL4lzHT0jkmlYa36bZw0a0Nhj4NsXG7bd/bXRxJYy4= github.com/denisenkom/go-mssqldb v0.0.0-20180620032804-94c9c97e8c9f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc= github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM= github.com/dennwc/btrfs v0.0.0-20230312211831-a1f570bd01a1 h1:ue4Es4Xzz255hWQ7NAWzZxuXG+YOV7URzzusLLSe0zU= @@ -1901,8 +1903,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= github.com/grafana/vmware_exporter v0.0.5-beta h1:2JCqzIWJzns8FN78wPsueC9rT3e3kZo2OUoL5kGMjdM= github.com/grafana/vmware_exporter v0.0.5-beta/go.mod h1:1CecUZII0zVsVcHtNfNeTTcxK7EksqAsAn/TCCB0Mh4= -github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94 h1:d3Hgun3ailVbNArBIhvRIjmCBOOCO9ClKNpzqQFsMLE= -github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94/go.mod h1:2B+4gxoOgzgRhstKcikROUHusMXLqd5nE/UKukaQrJI= +github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7 h1:UwiJ9tKZPzEZIGrk9yghWQcKErsyMwchEhrE+zbh+/o= +github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7/go.mod h1:rnU7r397nvQCTyVbcODlD3P6DIbQlidxPDweV+4ab2M= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA= @@ -2775,6 +2777,8 @@ github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c h1:vwpFWvAO8DeIZfFeqASzZfsxuWPno9ncAebBEP0N3uE= github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c/go.mod h1:otzZQXgoO96RTzDB/Hycg0qZcXZsWJGJRSXbmEIJ+4M= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= +github.com/panjf2000/ants/v2 v2.11.0 h1:sHrqEwTBQTQ2w6PMvbMfvBtVUuhsaYPzUmAYDLYmJPg= +github.com/panjf2000/ants/v2 v2.11.0/go.mod h1:V9HhTupTWxcaRmIglJvGwvzqXUTnIZW9uO6q4hAfApw= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -2905,8 +2909,8 @@ github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJ github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= -github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPAaSc= -github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw= +github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ= +github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= github.com/prometheus/consul_exporter v0.8.0 h1:2z3drFic65WFoHaJRKkmnJRRlBLmmxVqT8L9LO2yxAo= @@ -3500,6 +3504,8 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go4.org/netipx v0.0.0-20230125063823-8449b0a6169f h1:ketMxHg+vWm3yccyYiq+uK8D3fRmna2Fcj+awpQp84s= go4.org/netipx v0.0.0-20230125063823-8449b0a6169f/go.mod h1:tgPU4N2u9RByaTN3NC2p9xOzyFpte4jYwsIIRF7XlSc= +golang.design/x/chann v0.1.2 h1:eHF9wjuQnpp+j4ryWhyxC/pFuYzbvMAkudA/I5ALovY= +golang.design/x/chann v0.1.2/go.mod h1:Rh5KhCAp+0qu9+FfKPymHpu8onmjl89sFwMeiw3SK14= golang.org/x/arch v0.7.0 h1:pskyeJh/3AmoQ8CPE95vxHLqp1G1GfGNXTmcl9NEKTc= golang.org/x/arch v0.7.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -3547,8 +3553,8 @@ golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= -golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 h1:Lyizcy9jX02jYR0ceBkL6S+jRys8Uepf7wt1vrz6Ras= golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91/go.mod h1:kNa9WdvYnzFwC79zRpLRMJbdEFlhyM5RPFBBZp/wWH8= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -3705,8 +3711,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= -golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -3739,8 +3745,8 @@ golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -3760,8 +3766,8 @@ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -3902,8 +3908,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3922,8 +3928,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= -golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -3946,8 +3952,8 @@ golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= -golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -4376,8 +4382,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/internal/component/prometheus/write/queue/e2e_bench_test.go b/internal/component/prometheus/write/queue/e2e_bench_test.go deleted file mode 100644 index 3ae7c36534..0000000000 --- a/internal/component/prometheus/write/queue/e2e_bench_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package queue - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" -) - -func BenchmarkE2E(b *testing.B) { - // Around 120k ops if you look at profile roughly 20k are actual implementation with the rest being benchmark - // setup. - type e2eTest struct { - name string - maker func(index int, app storage.Appender) - tester func(samples []prompb.TimeSeries) - } - tests := []e2eTest{ - { - // This should be ~1200 allocs an op - name: "normal", - maker: func(index int, app storage.Appender) { - ts, v, lbls := makeSeries(index) - _, _ = app.Append(0, lbls, ts, v) - }, - tester: func(samples []prompb.TimeSeries) { - b.Helper() - for _, s := range samples { - require.True(b, len(s.Samples) == 1) - } - }, - }, - } - for _, test := range tests { - b.Run(test.name, func(t *testing.B) { - runBenchmark(t, test.maker, test.tester) - }) - } -} - -func runBenchmark(t *testing.B, add func(index int, appendable storage.Appender), _ func(samples []prompb.TimeSeries)) { - t.ReportAllocs() - l := log.NewNopLogger() - done := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) - expCh := make(chan Exports, 1) - c, err := newComponentBenchmark(t, l, srv.URL, expCh) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := 0 - app := exp.Receiver.Appender(ctx) - - for i := 0; i < t.N; i++ { - index++ - add(index, app) - } - require.NoError(t, app.Commit()) - - tm := time.NewTimer(10 * time.Second) - select { - case <-done: - case <-tm.C: - } - cancel() -} - -func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Exports) (*Queue, error) { - return NewComponent(component.Options{ - ID: "test", - Logger: l, - DataPath: t.TempDir(), - OnStateChange: func(e component.Exports) { - exp <- e.(Exports) - }, - Registerer: fakeRegistry{}, - Tracer: nil, - }, Arguments{ - TTL: 2 * time.Hour, - Persistence: Persistence{ - MaxSignalsToBatch: 100_000, - BatchInterval: 1 * time.Second, - }, - Endpoints: []EndpointConfig{{ - Name: "test", - URL: url, - Timeout: 10 * time.Second, - RetryBackoff: 1 * time.Second, - MaxRetryAttempts: 0, - BatchCount: 50, - FlushInterval: 1 * time.Second, - Parallelism: 1, - }}, - }) -} - -var _ prometheus.Registerer = (*fakeRegistry)(nil) - -type fakeRegistry struct{} - -func (f fakeRegistry) Register(collector prometheus.Collector) error { - return nil -} - -func (f fakeRegistry) MustRegister(collector ...prometheus.Collector) { -} - -func (f fakeRegistry) Unregister(collector prometheus.Collector) bool { - return true -} diff --git a/internal/component/prometheus/write/queue/e2e_stats_test.go b/internal/component/prometheus/write/queue/e2e_stats_test.go deleted file mode 100644 index b6737d5be8..0000000000 --- a/internal/component/prometheus/write/queue/e2e_stats_test.go +++ /dev/null @@ -1,711 +0,0 @@ -package queue - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/grafana/alloy/internal/util" - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/require" -) - -const remoteSamples = "prometheus_remote_storage_samples_total" -const remoteHistograms = "prometheus_remote_storage_histograms_total" -const remoteMetadata = "prometheus_remote_storage_metadata_total" - -const sentBytes = "prometheus_remote_storage_sent_bytes_total" -const sentMetadataBytes = "prometheus_remote_storage_metadata_bytes_total" - -const outTimestamp = "prometheus_remote_storage_queue_highest_sent_timestamp_seconds" -const inTimestamp = "prometheus_remote_storage_highest_timestamp_in_seconds" - -const failedSample = "prometheus_remote_storage_samples_failed_total" -const failedHistogram = "prometheus_remote_storage_histograms_failed_total" -const failedMetadata = "prometheus_remote_storage_metadata_failed_total" - -const retriedSamples = "prometheus_remote_storage_samples_retried_total" -const retriedHistogram = "prometheus_remote_storage_histograms_retried_total" -const retriedMetadata = "prometheus_remote_storage_metadata_retried_total" - -const prometheusDuration = "prometheus_remote_storage_queue_duration_seconds" - -const serializerIncoming = "alloy_queue_series_serializer_incoming_signals" -const alloySent = "alloy_queue_series_network_sent" -const alloySerializerIncoming = "alloy_queue_series_serializer_incoming_timestamp_seconds" -const alloyNetworkDuration = "alloy_queue_series_network_duration_seconds" -const alloyFailures = "alloy_queue_series_network_failed" -const alloyRetries = "alloy_queue_series_network_retried" -const alloy429 = "alloy_queue_series_network_retried_429" - -const alloyMetadataDuration = "alloy_queue_metadata_network_duration_seconds" -const alloyMetadataSent = "alloy_queue_metadata_network_sent" -const alloyMetadataFailed = "alloy_queue_metadata_network_failed" -const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429" -const alloyMetadataRetried = "alloy_queue_metadata_network_retried" - -const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds" - -const alloyDrift = "alloy_queue_series_timestamp_drift_seconds" - -// TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. -func TestMetadata(t *testing.T) { - // Check assumes you are checking for any value that is not 0. - // The test at the end will see if there are any values that were not 0. - tests := []statsTest{ - // Metadata Tests - { - name: "metadata success", - returnStatusCode: http.StatusOK, - dtype: Metadata, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteMetadata, - value: 10, - }, - { - name: sentMetadataBytes, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataSent, - value: 10, - }, - }, - }, - { - name: "metadata failure", - returnStatusCode: http.StatusBadRequest, - dtype: Metadata, - checks: []check{ - { - name: alloyMetadataFailed, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedMetadata, - value: 10, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "metadata retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Metadata, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedMetadata, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataRetried, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataRetried429, - valueFunc: greaterThenZero, - }, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runE2eStats(t, test) - }) - } - -} - -// TestMetrics is the large end to end testing for the queue based wal. -func TestMetrics(t *testing.T) { - // Check assumes you are checking for any value that is not 0. - // The test at the end will see if there are any values that were not 0. - tests := []statsTest{ - // Sample Tests - { - name: "sample success", - returnStatusCode: http.StatusOK, - dtype: Sample, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteSamples, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "sample failure", - returnStatusCode: http.StatusBadRequest, - dtype: Sample, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedSample, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "sample retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Sample, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedSamples, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - // histograms - { - name: "histogram success", - returnStatusCode: http.StatusOK, - dtype: Histogram, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteHistograms, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "histogram failure", - returnStatusCode: http.StatusBadRequest, - dtype: Histogram, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedHistogram, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "histogram retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Histogram, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedHistogram, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - // TURNING OFF EXEMPLAR TESTS until underlying issue is resolved. - //exemplar, note that once it hits the appender exemplars are treated the same as series. - /*{ - name: "exemplar success", - returnStatusCode: http.StatusOK, - dtype: Exemplar, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteSamples, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "exemplar failure", - returnStatusCode: http.StatusBadRequest, - dtype: Exemplar, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedSample, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - }, - }, - { - name: "exemplar retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Exemplar, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedSamples, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - }, - },*/ - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runE2eStats(t, test) - }) - } - -} - -func greaterThenZero(v float64) bool { - return v > 0 -} - -func isReasonableTimeStamp(v float64) bool { - if v < 0 { - return false - } - unixTime := time.Unix(int64(v), 0) - - return time.Since(unixTime) < 10*time.Second -} - -type dataType int - -const ( - Sample dataType = iota - Histogram - Exemplar - Metadata -) - -type check struct { - name string - value float64 - valueFunc func(v float64) bool -} -type statsTest struct { - name string - returnStatusCode int - // Only check for non zero values, once all checks are ran it will automatically ensure all remaining metrics are 0. - checks []check - dtype dataType -} - -func runE2eStats(t *testing.T, test statsTest) { - l := util.TestAlloyLogger(t) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(test.returnStatusCode) - })) - expCh := make(chan Exports, 1) - - reg := prometheus.NewRegistry() - c, err := newComponent(t, l, srv.URL, expCh, reg) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := 0 - - go func() { - app := exp.Receiver.Appender(ctx) - for j := 0; j < 10; j++ { - index++ - switch test.dtype { - case Sample: - ts, v, lbls := makeSeries(index) - _, errApp := app.Append(0, lbls, ts, v) - require.NoError(t, errApp) - case Histogram: - ts, lbls, h := makeHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) - require.NoError(t, errApp) - case Exemplar: - ex := makeExemplar(index) - _, errApp := app.AppendExemplar(0, nil, ex) - require.NoError(t, errApp) - case Metadata: - md, lbls := makeMetadata(index) - _, errApp := app.UpdateMetadata(0, lbls, md) - require.NoError(t, errApp) - default: - require.True(t, false) - } - } - require.NoError(t, app.Commit()) - }() - time.Sleep(5 * time.Second) - require.Eventually(t, func() bool { - dtos, gatherErr := reg.Gather() - require.NoError(t, gatherErr) - // Check if we have some valid metrics. - found := 0 - for _, d := range dtos { - if getValue(d) > 0 { - found++ - } - } - // Make sure we have a few metrics. - return found > 1 - }, 10*time.Second, 1*time.Second) - - metrics := make(map[string]float64) - dtos, err := reg.Gather() - require.NoError(t, err) - // Cancel needs to be here since it will unregister the metrics. - cancel() - - // Get the value of metrics. - for _, d := range dtos { - metrics[*d.Name] = getValue(d) - } - - // Check for the metrics that matter. - for _, valChk := range test.checks { - // These check functions will return the list of metrics with the one checked for deleted. - // Ideally at the end we should only be left with metrics with a value of zero.s - if valChk.valueFunc != nil { - metrics = checkValueCondition(t, valChk.name, valChk.valueFunc, metrics) - } else { - metrics = checkValue(t, valChk.name, valChk.value, metrics) - } - } - // all other metrics should be zero. - for k, v := range metrics { - require.Zerof(t, v, "%s should be zero", k) - } -} - -func getValue(d *dto.MetricFamily) float64 { - switch *d.Type { - case dto.MetricType_COUNTER: - return d.Metric[0].Counter.GetValue() - case dto.MetricType_GAUGE: - return d.Metric[0].Gauge.GetValue() - case dto.MetricType_SUMMARY: - return d.Metric[0].Summary.GetSampleSum() - case dto.MetricType_UNTYPED: - return d.Metric[0].Untyped.GetValue() - case dto.MetricType_HISTOGRAM: - return d.Metric[0].Histogram.GetSampleSum() - case dto.MetricType_GAUGE_HISTOGRAM: - return d.Metric[0].Histogram.GetSampleSum() - default: - panic("unknown type " + d.Type.String()) - } -} - -func checkValue(t *testing.T, name string, value float64, metrics map[string]float64) map[string]float64 { - v, ok := metrics[name] - require.Truef(t, ok, "invalid metric name %s", name) - require.Equalf(t, value, v, "%s should be %f", name, value) - delete(metrics, name) - return metrics -} - -func checkValueCondition(t *testing.T, name string, chk func(float64) bool, metrics map[string]float64) map[string]float64 { - v, ok := metrics[name] - require.Truef(t, ok, "invalid metric name %s", name) - require.Truef(t, chk(v), "false test for metric name %s", name) - delete(metrics, name) - return metrics -} diff --git a/internal/component/prometheus/write/queue/e2e_test.go b/internal/component/prometheus/write/queue/e2e_test.go deleted file mode 100644 index 71b8516ce7..0000000000 --- a/internal/component/prometheus/write/queue/e2e_test.go +++ /dev/null @@ -1,427 +0,0 @@ -package queue - -import ( - "context" - "fmt" - "io" - "net/http" - "net/http/httptest" - "reflect" - "strings" - "sync" - "testing" - "time" - - "github.com/golang/snappy" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/runtime/logging" - "github.com/grafana/alloy/internal/util" - "github.com/grafana/walqueue/types" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func TestE2E(t *testing.T) { - type e2eTest struct { - name string - maker func(index int, app storage.Appender) (float64, labels.Labels) - tester func(samples *safeSlice[prompb.TimeSeries]) - testMeta func(samples *safeSlice[prompb.MetricMetadata]) - } - tests := []e2eTest{ - { - name: "normal", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, v, lbls := makeSeries(index) - _, errApp := app.Append(0, lbls, ts, v) - require.NoError(t, errApp) - return v, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 1) - require.True(t, s.Samples[0].Timestamp > 0) - require.True(t, s.Samples[0].Value > 0) - require.True(t, len(s.Labels) == 1) - require.Truef(t, s.Labels[0].Name == fmt.Sprintf("name_%d", int(s.Samples[0].Value)), "%d name %s", int(s.Samples[0].Value), s.Labels[0].Name) - require.True(t, s.Labels[0].Value == fmt.Sprintf("value_%d", int(s.Samples[0].Value))) - } - }, - }, - { - name: "metadata", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - meta, lbls := makeMetadata(index) - _, errApp := app.UpdateMetadata(0, lbls, meta) - require.NoError(t, errApp) - return 0, lbls - }, - testMeta: func(samples *safeSlice[prompb.MetricMetadata]) { - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, s.GetUnit() == "seconds") - require.True(t, s.Help == "metadata help") - require.True(t, s.Unit == "seconds") - require.True(t, s.Type == prompb.MetricMetadata_COUNTER) - require.True(t, strings.HasPrefix(s.MetricFamilyName, "name_")) - } - }, - }, - - { - name: "histogram", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, lbls, h := makeHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) - require.NoError(t, errApp) - return h.Sum, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 1) - require.True(t, s.Samples[0].Timestamp > 0) - require.True(t, s.Samples[0].Value == 0) - require.True(t, len(s.Labels) == 1) - histSame(t, hist(int(s.Histograms[0].Sum)), s.Histograms[0]) - } - }, - }, - { - name: "float histogram", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, lbls, h := makeFloatHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, nil, h) - require.NoError(t, errApp) - return h.Sum, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 1) - require.True(t, s.Samples[0].Timestamp > 0) - require.True(t, s.Samples[0].Value == 0) - require.True(t, len(s.Labels) == 1) - histFloatSame(t, histFloat(int(s.Histograms[0].Sum)), s.Histograms[0]) - } - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runTest(t, test.maker, test.tester, test.testMeta) - }) - } -} - -const ( - iterations = 10 - items = 100 -) - -func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples *safeSlice[prompb.TimeSeries]), metaTest func(meta *safeSlice[prompb.MetricMetadata])) { - l := util.TestAlloyLogger(t) - done := make(chan struct{}) - var series atomic.Int32 - var meta atomic.Int32 - samples := newSafeSlice[prompb.TimeSeries]() - metaSamples := newSafeSlice[prompb.MetricMetadata]() - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - newSamples, newMetadata := handlePost(t, w, r) - series.Add(int32(len(newSamples))) - meta.Add(int32(len(newMetadata))) - samples.AddSlice(newSamples) - metaSamples.AddSlice(newMetadata) - if series.Load() == iterations*items { - done <- struct{}{} - } - if meta.Load() == iterations*items { - done <- struct{}{} - } - })) - expCh := make(chan Exports, 1) - c, err := newComponent(t, l, srv.URL, expCh, prometheus.NewRegistry()) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := atomic.NewInt64(0) - results := &safeMap{ - results: make(map[float64]labels.Labels), - } - - for i := 0; i < iterations; i++ { - go func() { - app := exp.Receiver.Appender(ctx) - for j := 0; j < items; j++ { - val := index.Add(1) - v, lbl := add(int(val), app) - results.Add(v, lbl) - } - require.NoError(t, app.Commit()) - }() - } - - // This is a weird use case to handle eventually. - // With race turned on this can take a long time. - tm := time.NewTimer(20 * time.Second) - select { - case <-done: - case <-tm.C: - require.Truef(t, false, "failed to collect signals in the appropriate time") - } - - cancel() - - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - if len(s.Histograms) == 1 { - lbls, ok := results.Get(s.Histograms[0].Sum) - require.True(t, ok) - for i, sLbl := range s.Labels { - require.True(t, lbls[i].Name == sLbl.Name) - require.True(t, lbls[i].Value == sLbl.Value) - } - } else { - lbls, ok := results.Get(s.Samples[0].Value) - require.True(t, ok) - for i, sLbl := range s.Labels { - require.True(t, lbls[i].Name == sLbl.Name) - require.True(t, lbls[i].Value == sLbl.Value) - } - } - } - if test != nil { - test(samples) - } else { - metaTest(metaSamples) - } - require.Eventuallyf(t, func() bool { - return types.OutStandingTimeSeriesBinary.Load() == 0 - }, 20*time.Second, 1*time.Second, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load()) -} - -func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) { - defer r.Body.Close() - data, err := io.ReadAll(r.Body) - require.NoError(t, err) - - data, err = snappy.Decode(nil, data) - require.NoError(t, err) - - var req prompb.WriteRequest - err = req.Unmarshal(data) - require.NoError(t, err) - return req.GetTimeseries(), req.Metadata -} - -func makeSeries(index int) (int64, float64, labels.Labels) { - return time.Now().UTC().UnixMilli(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) -} - -func makeMetadata(index int) (metadata.Metadata, labels.Labels) { - return metadata.Metadata{ - Type: "counter", - Unit: "seconds", - Help: "metadata help", - }, labels.FromStrings("__name__", fmt.Sprintf("name_%d", index)) -} - -func makeHistogram(index int) (int64, labels.Labels, *histogram.Histogram) { - return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) -} - -func makeExemplar(index int) exemplar.Exemplar { - return exemplar.Exemplar{ - Labels: labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), - Ts: time.Now().UnixMilli(), - HasTs: true, - Value: float64(index), - } -} - -func hist(i int) *histogram.Histogram { - return &histogram.Histogram{ - CounterResetHint: 1, - Schema: 2, - ZeroThreshold: 3, - ZeroCount: 4, - Count: 5, - Sum: float64(i), - PositiveSpans: []histogram.Span{ - { - Offset: 1, - Length: 2, - }, - }, - NegativeSpans: []histogram.Span{ - { - Offset: 3, - Length: 4, - }, - }, - PositiveBuckets: []int64{1, 2, 3}, - NegativeBuckets: []int64{1, 2, 3}, - } -} - -func histSame(t *testing.T, h *histogram.Histogram, pb prompb.Histogram) { - require.True(t, h.Sum == pb.Sum) - require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountInt).ZeroCountInt) - require.True(t, h.Schema == pb.Schema) - require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountInt).CountInt) - require.True(t, h.ZeroThreshold == pb.ZeroThreshold) - require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) - require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveDeltas)) - require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeDeltas)) - histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) - histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) -} - -func histSpanSame(t *testing.T, h []histogram.Span, pb []prompb.BucketSpan) { - require.True(t, len(h) == len(pb)) - for i := range h { - require.True(t, h[i].Length == pb[i].Length) - require.True(t, h[i].Offset == pb[i].Offset) - } -} - -func makeFloatHistogram(index int) (int64, labels.Labels, *histogram.FloatHistogram) { - return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) -} - -func histFloat(i int) *histogram.FloatHistogram { - return &histogram.FloatHistogram{ - CounterResetHint: 1, - Schema: 2, - ZeroThreshold: 3, - ZeroCount: 4, - Count: 5, - Sum: float64(i), - PositiveSpans: []histogram.Span{ - { - Offset: 1, - Length: 2, - }, - }, - NegativeSpans: []histogram.Span{ - { - Offset: 3, - Length: 4, - }, - }, - PositiveBuckets: []float64{1.1, 2.2, 3.3}, - NegativeBuckets: []float64{1.2, 2.3, 3.4}, - } -} - -func histFloatSame(t *testing.T, h *histogram.FloatHistogram, pb prompb.Histogram) { - require.True(t, h.Sum == pb.Sum) - require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountFloat).ZeroCountFloat) - require.True(t, h.Schema == pb.Schema) - require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountFloat).CountFloat) - require.True(t, h.ZeroThreshold == pb.ZeroThreshold) - require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) - require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveCounts)) - require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeCounts)) - histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) - histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) -} - -func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, reg prometheus.Registerer) (*Queue, error) { - return NewComponent(component.Options{ - ID: "test", - Logger: l, - DataPath: t.TempDir(), - OnStateChange: func(e component.Exports) { - exp <- e.(Exports) - }, - Registerer: reg, - Tracer: nil, - }, Arguments{ - TTL: 2 * time.Hour, - Persistence: Persistence{ - MaxSignalsToBatch: 10, - BatchInterval: 1 * time.Second, - }, - Endpoints: []EndpointConfig{{ - Name: "test", - URL: url, - Timeout: 20 * time.Second, - RetryBackoff: 5 * time.Second, - MaxRetryAttempts: 1, - BatchCount: 5, - FlushInterval: 1 * time.Second, - Parallelism: 1, - }}, - }) -} - -func newSafeSlice[T any]() *safeSlice[T] { - return &safeSlice[T]{slice: make([]T, 0)} -} - -type safeSlice[T any] struct { - slice []T - mut sync.Mutex -} - -func (s *safeSlice[T]) Add(v T) { - s.mut.Lock() - defer s.mut.Unlock() - s.slice = append(s.slice, v) -} - -func (s *safeSlice[T]) AddSlice(v []T) { - s.mut.Lock() - defer s.mut.Unlock() - s.slice = append(s.slice, v...) -} - -func (s *safeSlice[T]) Len() int { - s.mut.Lock() - defer s.mut.Unlock() - return len(s.slice) -} - -func (s *safeSlice[T]) Get(i int) T { - s.mut.Lock() - defer s.mut.Unlock() - return s.slice[i] -} - -type safeMap struct { - mut sync.Mutex - results map[float64]labels.Labels -} - -func (s *safeMap) Add(v float64, ls labels.Labels) { - s.mut.Lock() - defer s.mut.Unlock() - s.results[v] = ls -} - -func (s *safeMap) Get(v float64) (labels.Labels, bool) { - s.mut.Lock() - defer s.mut.Unlock() - res, ok := s.results[v] - return res, ok -} diff --git a/internal/component/prometheus/write/queue/types.go b/internal/component/prometheus/write/queue/types.go index ffffe287ea..a6c762b06b 100644 --- a/internal/component/prometheus/write/queue/types.go +++ b/internal/component/prometheus/write/queue/types.go @@ -50,7 +50,16 @@ func defaultEndpointConfig() EndpointConfig { MaxRetryAttempts: 0, BatchCount: 1_000, FlushInterval: 1 * time.Second, - Parallelism: 4, + Parallelism: ParralelismConfig{ + DriftScaleUpSeconds: 60, + DriftScaleDownSeconds: 30, + MaxConnections: 50, + MinConnections: 2, + NetworkFlushInterval: 1 * time.Minute, + DesiredConnectionsLookback: 5 * time.Minute, + DesiredCheckInterval: 5 * time.Second, + AllowedNetworkErrorPercent: 0.50, + }, } } @@ -66,6 +75,23 @@ func (r *Arguments) Validate() error { if conn.FlushInterval < 1*time.Second { return fmt.Errorf("flush_interval must be greater or equal to 1s, the internal timers resolution is 1s") } + if conn.Parallelism.MaxConnections < conn.Parallelism.MinConnections { + return fmt.Errorf("max_connections less than min_connections") + } + if conn.Parallelism.MinConnections == 0 { + return fmt.Errorf("min_connections must be greater than 0") + } + if conn.Parallelism.DriftScaleUpSeconds <= conn.Parallelism.DriftScaleDownSeconds { + return fmt.Errorf("drift_scale_up_seconds less than or equal drift_scale_down_seconds") + } + // Any lower than 1 second and you spend a fair amount of time churning on the draining and + // refilling the write buffers. + if conn.Parallelism.DesiredCheckInterval < 1*time.Second { + return fmt.Errorf("desired_check_interval must be greater than or equal to 1 second") + } + if conn.Parallelism.AllowedNetworkErrorPercent < 0 || conn.Parallelism.AllowedNetworkErrorPercent > 1 { + return fmt.Errorf("allowed_network_error_percent must be between 0.00 and 1.00") + } } return nil @@ -87,10 +113,36 @@ type EndpointConfig struct { // How long to wait before sending regardless of batch count. FlushInterval time.Duration `alloy:"flush_interval,attr,optional"` // How many concurrent queues to have. - Parallelism uint `alloy:"parallelism,attr,optional"` + Parallelism ParralelismConfig `alloy:"parallelism,attr,block"` ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` } +type ParralelismConfig struct { + // DriftScaleUpSeconds is the maximum amount of seconds that is allowed for the Newest Timestamp Serializer - Newest Timestamp Sent via Network before the connections scales up. + // Using non unix timestamp numbers. If Newest TS In Serializer sees 100s and Newest TS Out Network sees 20s then we have a drift of 80s. If AllowDriftSeconds is 60s that would + // trigger a scaling up event. + DriftScaleUpSeconds int64 `alloy:"drift_scale_up_seconds,attr,optional"` + // DriftScaleDownSeconds is the amount if we go below that we can scale down. Using the above if In is 100s and Out is 70s and DriftScaleDownSeconds is 30 then we wont scale + // down even though we are below the 60s. This is to keep the number of connections from flapping. In practice we should consider 30s DriftScaleDownSeconds and 60s DriftScaleUpSeconds to be a sweet spot + // for general usage. + DriftScaleDownSeconds int64 `alloy:"drift_scale_down_seconds,attr,optional"` + // MaxConnections is the maximum number of concurrent connections to use. + MaxConnections uint `alloy:"max_connections,attr,optional"` + // MinConnections is the minimum number of concurrent connections to use. + MinConnections uint `alloy:"min_connections,attr,optional"` + // NetworkFlushInterval is how long to keep network successes and errors in memory for calculations. + NetworkFlushInterval time.Duration `alloy:"network_flush_interval,attr,optional"` + // DesiredConnectionsLookback is how far to lookback for previous desired values. This is to prevent flapping. + // In a situation where in the past 5 minutes you have desired [1,2,1,1] and desired is 1 it will + // choose 2 since that was the greatest. This determines how fast you can scale down. + DesiredConnectionsLookback time.Duration `alloy:"desired_connections_lookback,attr,optional"` + // DesiredCheckInterval is how long to check for desired values. + DesiredCheckInterval time.Duration `alloy:"desired_check_interval,attr,optional"` + // AllowedNetworkErrorPercent is the percentage of failed network requests that are allowable. This will + // trigger a decrease in connections if exceeded. + AllowedNetworkErrorPercent float64 `alloy:"allowed_network_error_percent,attr,optional"` +} + var UserAgent = fmt.Sprintf("Alloy/%s", version.Version) func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { @@ -104,7 +156,16 @@ func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { BatchCount: cc.BatchCount, FlushInterval: cc.FlushInterval, ExternalLabels: cc.ExternalLabels, - Connections: cc.Parallelism, + Parralelism: types.ParralelismConfig{ + AllowedDriftSeconds: cc.Parallelism.DriftScaleUpSeconds, + MinimumScaleDownDriftSeconds: cc.Parallelism.DriftScaleDownSeconds, + MaxConnections: cc.Parallelism.MaxConnections, + MinConnections: cc.Parallelism.MinConnections, + ResetInterval: cc.Parallelism.NetworkFlushInterval, + Lookback: cc.Parallelism.DesiredConnectionsLookback, + CheckInterval: cc.Parallelism.DesiredCheckInterval, + AllowedNetworkErrorPercent: cc.Parallelism.AllowedNetworkErrorPercent, + }, } if cc.BasicAuth != nil { tcc.BasicAuth = &types.BasicAuth{ diff --git a/internal/component/prometheus/write/queue/types_test.go b/internal/component/prometheus/write/queue/types_test.go new file mode 100644 index 0000000000..0c9feaef7d --- /dev/null +++ b/internal/component/prometheus/write/queue/types_test.go @@ -0,0 +1,83 @@ +package queue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestParralelismConfig_Validate(t *testing.T) { + testCases := []struct { + name string + config func(cfg ParralelismConfig) ParralelismConfig + expectedErrMsg string + }{ + { + name: "default config is valid", + config: func(cfg ParralelismConfig) ParralelismConfig { + return cfg + }, + }, + { + name: "positive drift scale up seconds is invalid", + config: func(cfg ParralelismConfig) ParralelismConfig { + cfg.DriftScaleUpSeconds = 10 + cfg.DriftScaleDownSeconds = 10 + return cfg + }, + expectedErrMsg: "drift_scale_up_seconds less than or equal drift_scale_down_seconds", + }, + { + name: "max less than min", + config: func(cfg ParralelismConfig) ParralelismConfig { + cfg.MaxConnections = 1 + cfg.MinConnections = 2 + return cfg + }, + expectedErrMsg: "max_connections less than min_connections", + }, + { + name: "to low desired check", + config: func(cfg ParralelismConfig) ParralelismConfig { + cfg.DesiredCheckInterval = (1 * time.Second) - (50 * time.Millisecond) + return cfg + }, + expectedErrMsg: "desired_check_interval must be greater than or equal to 1 second", + }, + { + name: "invalid network error percentage low", + config: func(cfg ParralelismConfig) ParralelismConfig { + cfg.AllowedNetworkErrorPercent = -0.01 + return cfg + }, + expectedErrMsg: "allowed_network_error_percent must be between 0.00 and 1.00", + }, + { + name: "invalid network error percentage high", + config: func(cfg ParralelismConfig) ParralelismConfig { + cfg.AllowedNetworkErrorPercent = 1.01 + return cfg + }, + expectedErrMsg: "allowed_network_error_percent must be between 0.00 and 1.00", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := defaultEndpointConfig() + cfg.Parallelism = tc.config(cfg.Parallelism) + args := &Arguments{ + Endpoints: []EndpointConfig{cfg}, + } + err := args.Validate() + + if tc.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrMsg) + } + }) + } +} From ee265acf9f1c25e80387b8c2deef7c50abfd8a0b Mon Sep 17 00:00:00 2001 From: matt durham Date: Sun, 9 Feb 2025 16:24:02 -0500 Subject: [PATCH 02/18] fix attr --- internal/component/prometheus/write/queue/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/prometheus/write/queue/types.go b/internal/component/prometheus/write/queue/types.go index 17a2cfe094..d6fd08905c 100644 --- a/internal/component/prometheus/write/queue/types.go +++ b/internal/component/prometheus/write/queue/types.go @@ -113,7 +113,7 @@ type EndpointConfig struct { // How long to wait before sending regardless of batch count. FlushInterval time.Duration `alloy:"flush_interval,attr,optional"` // How many concurrent queues to have. - Parallelism ParralelismConfig `alloy:"parallelism,attr,block"` + Parallelism ParralelismConfig `alloy:"parallelism,block,optional"` ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` TLSConfig *TLSConfig `alloy:"tls_config,block,optional"` RoundRobin bool `alloy:"enable_round_robin,attr,optional"` From ee2a014864e1ef2433cc95353e935426a6954f5a Mon Sep 17 00:00:00 2001 From: matt durham Date: Mon, 10 Feb 2025 13:53:28 -0500 Subject: [PATCH 03/18] Update to include context cancelled logic. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 07e82fd1f1..ef025a0620 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,7 @@ require ( github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 github.com/grafana/vmware_exporter v0.0.5-beta - github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7 + github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25 github.com/hashicorp/consul/api v1.30.0 github.com/hashicorp/go-discover v0.0.0-20230724184603-e89ebd1b2f65 github.com/hashicorp/go-multierror v1.1.1 diff --git a/go.sum b/go.sum index 01a197c315..ab77e526af 100644 --- a/go.sum +++ b/go.sum @@ -1914,8 +1914,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= github.com/grafana/vmware_exporter v0.0.5-beta h1:2JCqzIWJzns8FN78wPsueC9rT3e3kZo2OUoL5kGMjdM= github.com/grafana/vmware_exporter v0.0.5-beta/go.mod h1:1CecUZII0zVsVcHtNfNeTTcxK7EksqAsAn/TCCB0Mh4= -github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7 h1:UwiJ9tKZPzEZIGrk9yghWQcKErsyMwchEhrE+zbh+/o= -github.com/grafana/walqueue v0.0.0-20250207154308-e874c2251fc7/go.mod h1:rnU7r397nvQCTyVbcODlD3P6DIbQlidxPDweV+4ab2M= +github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25 h1:p8plAhxyEWfDXRuYCfeZgrGS1GtJU5GnnOxsOBLgi9Q= +github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25/go.mod h1:rnU7r397nvQCTyVbcODlD3P6DIbQlidxPDweV+4ab2M= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA= From 8698cface8a1b0c9961db7d3ddbd7198eac79479 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 10:09:21 -0500 Subject: [PATCH 04/18] Update CHANGELOG.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d351b4dc2..d8736c91f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ Main (unreleased) ### Breaking changes -- (_Experimental_) Changed `parallelism` from attribute to block to allow dynamic scaling of `prometheus.write.queue`. (@mattdurham) +- (_Experimental_) In `prometheus.write.queue` changed `parallelism` from attribute to a block to allow for dynamic scaling. (@mattdurham) ### Features From b99312fc9d1fda861b10a0515ee5ed1d6404108a Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 10:10:30 -0500 Subject: [PATCH 05/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 6ad6d1d6e0..2b30303027 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -54,7 +54,7 @@ The following blocks are supported inside the definition of endpoint | [endpoint][] | Location to send metrics to. | no endpoint > basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the endpoint. | no endpoint > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no - endpoint > parallelism | [parallelism][] | Configure parralelism for the endpoint. | no + endpoint > parallelism | [parallelism][] | Configure parallelism for the endpoint. | no The `>` symbol indicates deeper levels of nesting. For example, `endpoint > basic_auth` refers to a `basic_auth` block defined inside an From 67cb37efa3784c3708dfd98cf0903219b0494380 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 10:10:39 -0500 Subject: [PATCH 06/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 2b30303027..577adacb62 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -64,7 +64,7 @@ basic_auth` refers to a `basic_auth` block defined inside an [basic_auth]: #basic_auth-block [persistence]: #persistence-block [tls_config]: #tls_config-block -[parallelism]: #parralelism +[parallelism]: #parallelism ### persistence block From 89a3c1d0638d94bbe5eb068da185b82bf4835cd4 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 10:11:06 -0500 Subject: [PATCH 07/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 577adacb62..8408189f36 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -127,7 +127,7 @@ Name | Type | Description `min_connections` | `uint` | The minimum number of desired connections. | `2` | no `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no `desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no -`desired_check_interval` | `duration` | The length of time that between checking for desired connections. | `5s` | no +`desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. | `0.50` | no Parralelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs. From 4719f080069918499d86ed81645cba7247baed49 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 11 Feb 2025 10:51:10 -0500 Subject: [PATCH 08/18] Update to include better naming from walqueue. --- .../prometheus/prometheus.write.queue.md | 24 ++++---- go.mod | 2 +- go.sum | 4 +- .../component/prometheus/write/queue/types.go | 59 +++++++------------ 4 files changed, 37 insertions(+), 52 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 8408189f36..63915f4d0e 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -119,18 +119,18 @@ Name | Type | Description ### parallelism block -Name | Type | Description | Default | Required -----------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|--------- -`drift_scale_up_seconds` | `int` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no -`drift_scale_down_seconds` | `int` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no -`max_connections` | `uint` | The maximum number of desired connections. | `50` | no -`min_connections` | `uint` | The minimum number of desired connections. | `2` | no -`network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no -`desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no -`desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no -`allowed_network_error_percent` | `float` | The allowed error rate before scaling down. | `0.50` | no - -Parralelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs. +| Name | Type | Description | Default | Required | +|---------------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|----------| +| `drift_scale_up` | `duration` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no | +| `drift_scale_down` | `duration` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no | +| `max_connections` | `uint` | The maximum number of desired connections. | `50` | no | +| `min_connections` | `uint` | The minimum number of desired connections. | `2` | no | +| `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no | +| `desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no | +| `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | +| `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example `0.50` allows 50% error rate. | `0.50` | no | + +Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs. By determining the drift between the incoming and outgoing timestamps that will determine whether to increase or decrease the desired connections. This is represented by `drift_scale_up_seconds` and `drift_scale_down_seconds`, if the drift is between these two values then the value will stay the same. Network success and failures are recorded and kept in memory, this helps determine diff --git a/go.mod b/go.mod index ef025a0620..e4f05844fc 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,7 @@ require ( github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 github.com/grafana/vmware_exporter v0.0.5-beta - github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25 + github.com/grafana/walqueue v0.0.0-20250211154548-6435b3242458 github.com/hashicorp/consul/api v1.30.0 github.com/hashicorp/go-discover v0.0.0-20230724184603-e89ebd1b2f65 github.com/hashicorp/go-multierror v1.1.1 diff --git a/go.sum b/go.sum index ab77e526af..6dae217c57 100644 --- a/go.sum +++ b/go.sum @@ -1914,8 +1914,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= github.com/grafana/vmware_exporter v0.0.5-beta h1:2JCqzIWJzns8FN78wPsueC9rT3e3kZo2OUoL5kGMjdM= github.com/grafana/vmware_exporter v0.0.5-beta/go.mod h1:1CecUZII0zVsVcHtNfNeTTcxK7EksqAsAn/TCCB0Mh4= -github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25 h1:p8plAhxyEWfDXRuYCfeZgrGS1GtJU5GnnOxsOBLgi9Q= -github.com/grafana/walqueue v0.0.0-20250210172748-0dc5bd134b25/go.mod h1:rnU7r397nvQCTyVbcODlD3P6DIbQlidxPDweV+4ab2M= +github.com/grafana/walqueue v0.0.0-20250211154548-6435b3242458 h1:KUa/teNk/VYBDSSjsgdLI37gnFXTz1+h513CmNWJeU4= +github.com/grafana/walqueue v0.0.0-20250211154548-6435b3242458/go.mod h1:rnU7r397nvQCTyVbcODlD3P6DIbQlidxPDweV+4ab2M= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA= diff --git a/internal/component/prometheus/write/queue/types.go b/internal/component/prometheus/write/queue/types.go index d6fd08905c..ec906c8db4 100644 --- a/internal/component/prometheus/write/queue/types.go +++ b/internal/component/prometheus/write/queue/types.go @@ -50,9 +50,9 @@ func defaultEndpointConfig() EndpointConfig { MaxRetryAttempts: 0, BatchCount: 1_000, FlushInterval: 1 * time.Second, - Parallelism: ParralelismConfig{ - DriftScaleUpSeconds: 60, - DriftScaleDownSeconds: 30, + Parallelism: ParallelismConfig{ + DriftScaleUp: 60 * time.Second, + DriftScaleDown: 30 * time.Second, MaxConnections: 50, MinConnections: 2, NetworkFlushInterval: 1 * time.Minute, @@ -81,7 +81,7 @@ func (r *Arguments) Validate() error { if conn.Parallelism.MinConnections == 0 { return fmt.Errorf("min_connections must be greater than 0") } - if conn.Parallelism.DriftScaleUpSeconds <= conn.Parallelism.DriftScaleDownSeconds { + if conn.Parallelism.DriftScaleUp <= conn.Parallelism.DriftScaleDown { return fmt.Errorf("drift_scale_up_seconds less than or equal drift_scale_down_seconds") } // Any lower than 1 second and you spend a fair amount of time churning on the draining and @@ -113,7 +113,7 @@ type EndpointConfig struct { // How long to wait before sending regardless of batch count. FlushInterval time.Duration `alloy:"flush_interval,attr,optional"` // How many concurrent queues to have. - Parallelism ParralelismConfig `alloy:"parallelism,block,optional"` + Parallelism ParallelismConfig `alloy:"parallelism,block,optional"` ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` TLSConfig *TLSConfig `alloy:"tls_config,block,optional"` RoundRobin bool `alloy:"enable_round_robin,attr,optional"` @@ -126,30 +126,15 @@ type TLSConfig struct { InsecureSkipVerify bool `alloy:"insecure_skip_verify,attr,optional"` } -type ParralelismConfig struct { - // DriftScaleUpSeconds is the maximum amount of seconds that is allowed for the Newest Timestamp Serializer - Newest Timestamp Sent via Network before the connections scales up. - // Using non unix timestamp numbers. If Newest TS In Serializer sees 100s and Newest TS Out Network sees 20s then we have a drift of 80s. If AllowDriftSeconds is 60s that would - // trigger a scaling up event. - DriftScaleUpSeconds int64 `alloy:"drift_scale_up_seconds,attr,optional"` - // DriftScaleDownSeconds is the amount if we go below that we can scale down. Using the above if In is 100s and Out is 70s and DriftScaleDownSeconds is 30 then we wont scale - // down even though we are below the 60s. This is to keep the number of connections from flapping. In practice we should consider 30s DriftScaleDownSeconds and 60s DriftScaleUpSeconds to be a sweet spot - // for general usage. - DriftScaleDownSeconds int64 `alloy:"drift_scale_down_seconds,attr,optional"` - // MaxConnections is the maximum number of concurrent connections to use. - MaxConnections uint `alloy:"max_connections,attr,optional"` - // MinConnections is the minimum number of concurrent connections to use. - MinConnections uint `alloy:"min_connections,attr,optional"` - // NetworkFlushInterval is how long to keep network successes and errors in memory for calculations. - NetworkFlushInterval time.Duration `alloy:"network_flush_interval,attr,optional"` - // DesiredConnectionsLookback is how far to lookback for previous desired values. This is to prevent flapping. - // In a situation where in the past 5 minutes you have desired [1,2,1,1] and desired is 1 it will - // choose 2 since that was the greatest. This determines how fast you can scale down. +type ParallelismConfig struct { + DriftScaleUp time.Duration `alloy:"drift_scale_up,attr,optional"` + DriftScaleDown time.Duration `alloy:"drift_scale_down,attr,optional"` + MaxConnections uint `alloy:"max_connections,attr,optional"` + MinConnections uint `alloy:"min_connections,attr,optional"` + NetworkFlushInterval time.Duration `alloy:"network_flush_interval,attr,optional"` DesiredConnectionsLookback time.Duration `alloy:"desired_connections_lookback,attr,optional"` - // DesiredCheckInterval is how long to check for desired values. - DesiredCheckInterval time.Duration `alloy:"desired_check_interval,attr,optional"` - // AllowedNetworkErrorPercent is the percentage of failed network requests that are allowable. This will - // trigger a decrease in connections if exceeded. - AllowedNetworkErrorPercent float64 `alloy:"allowed_network_error_percent,attr,optional"` + DesiredCheckInterval time.Duration `alloy:"desired_check_interval,attr,optional"` + AllowedNetworkErrorPercent float64 `alloy:"allowed_network_error_percent,attr,optional"` } var UserAgent = fmt.Sprintf("Alloy/%s", version.Version) @@ -166,15 +151,15 @@ func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { FlushInterval: cc.FlushInterval, ExternalLabels: cc.ExternalLabels, UseRoundRobin: cc.RoundRobin, - Parralelism: types.ParralelismConfig{ - AllowedDriftSeconds: cc.Parallelism.DriftScaleUpSeconds, - MinimumScaleDownDriftSeconds: cc.Parallelism.DriftScaleDownSeconds, - MaxConnections: cc.Parallelism.MaxConnections, - MinConnections: cc.Parallelism.MinConnections, - ResetInterval: cc.Parallelism.NetworkFlushInterval, - Lookback: cc.Parallelism.DesiredConnectionsLookback, - CheckInterval: cc.Parallelism.DesiredCheckInterval, - AllowedNetworkErrorPercent: cc.Parallelism.AllowedNetworkErrorPercent, + Parallelism: types.ParallelismConfig{ + AllowedDrift: cc.Parallelism.DriftScaleUp, + MinimumScaleDownDrift: cc.Parallelism.DriftScaleDown, + MaxConnections: cc.Parallelism.MaxConnections, + MinConnections: cc.Parallelism.MinConnections, + ResetInterval: cc.Parallelism.NetworkFlushInterval, + Lookback: cc.Parallelism.DesiredConnectionsLookback, + CheckInterval: cc.Parallelism.DesiredCheckInterval, + AllowedNetworkErrorFraction: cc.Parallelism.AllowedNetworkErrorPercent, }, } if cc.BasicAuth != nil { From 165b1188aad02918e4f5e8e34a90871b6fbab5fc Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 11 Feb 2025 10:53:05 -0500 Subject: [PATCH 09/18] Reword verbiage. --- .../prometheus/prometheus.write.queue.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 63915f4d0e..eb09538971 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -130,14 +130,18 @@ Name | Type | Description | `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | | `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example `0.50` allows 50% error rate. | `0.50` | no | -Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs. +Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs: + By determining the drift between the incoming and outgoing timestamps that will determine whether to increase or decrease the desired connections. This is represented by `drift_scale_up_seconds` and `drift_scale_down_seconds`, if the drift is between these -two values then the value will stay the same. Network success and failures are recorded and kept in memory, this helps determine +two values then the value will stay the same. + +Network success and failures are recorded and kept in memory, this helps determine the nature of the drift. For instance if the drift is increasing but the network failures are increasing we should not increase -desired connections since that would only increase load on the endpoint. The last major part is to prevent flapping of desired connections. -This is accomplished with the `desired_check_interval`, each time a desired connection is calculated it is added to a list, before actually changing the -desired connection the system will choose the highest value in the lookback. Example; for the past 5 minutes desired connections have been: [2,1,1] the check runs +desired connections since that would only increase load on the endpoint. + +Flapping prevention accomplished with `desired_check_interval`, each time a desired connection is calculated it is added to a list, before actually changing the +desired connection the system will choose the highest value in the lookback buffer. Example; for the past 5 minutes desired connections have been: [2,1,1] the check runs and determines that the desired connections are 1, but will not change the value since the value 2 is still in the lookback. On the next check we have [1,1,1], now it will change to 1. In general the system is fast to increase and slow to decrease. From f962f7a97994d46f83f74b06c4b36c86e0be715d Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 11 Feb 2025 11:04:34 -0500 Subject: [PATCH 10/18] Fix naming in test. --- .../prometheus/write/queue/types_test.go | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/component/prometheus/write/queue/types_test.go b/internal/component/prometheus/write/queue/types_test.go index 78b733d65e..9ef6d1ee4a 100644 --- a/internal/component/prometheus/write/queue/types_test.go +++ b/internal/component/prometheus/write/queue/types_test.go @@ -27,32 +27,30 @@ func TestParsingTLSConfig(t *testing.T) { require.NoError(t, err) } - - func TestParralelismConfig_Validate(t *testing.T) { testCases := []struct { name string - config func(cfg ParralelismConfig) ParralelismConfig + config func(cfg ParallelismConfig) ParallelismConfig expectedErrMsg string }{ { name: "default config is valid", - config: func(cfg ParralelismConfig) ParralelismConfig { + config: func(cfg ParallelismConfig) ParallelismConfig { return cfg }, }, { name: "positive drift scale up seconds is invalid", - config: func(cfg ParralelismConfig) ParralelismConfig { - cfg.DriftScaleUpSeconds = 10 - cfg.DriftScaleDownSeconds = 10 + config: func(cfg ParallelismConfig) ParallelismConfig { + cfg.DriftScaleUp = 10 * time.Second + cfg.DriftScaleDown = 10 * time.Second return cfg }, expectedErrMsg: "drift_scale_up_seconds less than or equal drift_scale_down_seconds", }, { name: "max less than min", - config: func(cfg ParralelismConfig) ParralelismConfig { + config: func(cfg ParallelismConfig) ParallelismConfig { cfg.MaxConnections = 1 cfg.MinConnections = 2 return cfg @@ -61,7 +59,7 @@ func TestParralelismConfig_Validate(t *testing.T) { }, { name: "to low desired check", - config: func(cfg ParralelismConfig) ParralelismConfig { + config: func(cfg ParallelismConfig) ParallelismConfig { cfg.DesiredCheckInterval = (1 * time.Second) - (50 * time.Millisecond) return cfg }, @@ -69,7 +67,7 @@ func TestParralelismConfig_Validate(t *testing.T) { }, { name: "invalid network error percentage low", - config: func(cfg ParralelismConfig) ParralelismConfig { + config: func(cfg ParallelismConfig) ParallelismConfig { cfg.AllowedNetworkErrorPercent = -0.01 return cfg }, @@ -77,7 +75,7 @@ func TestParralelismConfig_Validate(t *testing.T) { }, { name: "invalid network error percentage high", - config: func(cfg ParralelismConfig) ParralelismConfig { + config: func(cfg ParallelismConfig) ParallelismConfig { cfg.AllowedNetworkErrorPercent = 1.01 return cfg }, @@ -103,4 +101,3 @@ func TestParralelismConfig_Validate(t *testing.T) { }) } } - From 04eba86f4def882a85dddb7d41c644539d944af9 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 14:47:26 -0500 Subject: [PATCH 11/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index eb09538971..29627f8c17 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -122,7 +122,7 @@ Name | Type | Description | Name | Type | Description | Default | Required | |---------------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|----------| | `drift_scale_up` | `duration` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no | -| `drift_scale_down` | `duration` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no | +| `drift_scale_down` | `duration` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before decreasing desired connections. | `30` | no | | `max_connections` | `uint` | The maximum number of desired connections. | `50` | no | | `min_connections` | `uint` | The minimum number of desired connections. | `2` | no | | `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no | From 012ae173cfd68bb115ba609517727415dbf96c92 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 14:47:35 -0500 Subject: [PATCH 12/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 29627f8c17..c14f9a86d0 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -128,7 +128,7 @@ Name | Type | Description | `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no | | `desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no | | `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | -| `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example `0.50` allows 50% error rate. | `0.50` | no | +| `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example, `0.50` allows 50% error rate. | `0.50` | no | Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs: From 92f9eea6d48b1e0578e4f65594c1dc06cf4c9278 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 11 Feb 2025 14:48:52 -0500 Subject: [PATCH 13/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../prometheus/prometheus.write.queue.md | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index c14f9a86d0..1d4e2eeef0 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -130,20 +130,22 @@ Name | Type | Description | `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | | `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example, `0.50` allows 50% error rate. | `0.50` | no | -Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs: - -By determining the drift between the incoming and outgoing timestamps that will determine whether to increase or decrease the -desired connections. This is represented by `drift_scale_up_seconds` and `drift_scale_down_seconds`, if the drift is between these -two values then the value will stay the same. - -Network success and failures are recorded and kept in memory, this helps determine -the nature of the drift. For instance if the drift is increasing but the network failures are increasing we should not increase -desired connections since that would only increase load on the endpoint. - -Flapping prevention accomplished with `desired_check_interval`, each time a desired connection is calculated it is added to a list, before actually changing the -desired connection the system will choose the highest value in the lookback buffer. Example; for the past 5 minutes desired connections have been: [2,1,1] the check runs -and determines that the desired connections are 1, but will not change the value since the value 2 is still in the lookback. On the next check we have [1,1,1], -now it will change to 1. In general the system is fast to increase and slow to decrease. +Parallelism determines when to scale up or down the number of desired connections. + +The drift between the incoming and outgoing timestamps determines whether to increase or decrease the desired connections. +The value stays the same if the drift is between `drift_scale_up_seconds` and `drift_scale_down_seconds`. + +Network successes and failures are recorded and kept in memory. +This data helps determine the nature of the drift. +For example, if the drift is increasing and the network failures are increasing, the desired connections should not increase because that would increase the load on the endpoint. + +The `desired_check_interval` prevents connection flapping. +Each time a desired connection is calculated, the connection is added to a list. +Before changing the desired connection, the system will choose the highest value in the lookback buffer. +For example, for the past 5 minutes, desired connections have been: [2,1,1]. +The check determines that the desired connections are 1, and the number of desired connections will not change because the value `2` is still in the lookback buffer. +On the next check, the desired connections are [1,1,1]. +Now, it will change to 1. In general, the system is fast to increase and slow to decrease. ## Exported fields From d09c0eb41751839976a2bc4e56532dad70127ab2 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 11 Feb 2025 14:49:10 -0500 Subject: [PATCH 14/18] pr feedbackh --- .../prometheus/prometheus.write.queue.md | 20 +++++------ .../component/prometheus/write/queue/types.go | 36 +++++++++---------- .../prometheus/write/queue/types_test.go | 4 +-- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index eb09538971..b56b956690 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -119,16 +119,16 @@ Name | Type | Description ### parallelism block -| Name | Type | Description | Default | Required | -|---------------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|----------| -| `drift_scale_up` | `duration` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no | -| `drift_scale_down` | `duration` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no | -| `max_connections` | `uint` | The maximum number of desired connections. | `50` | no | -| `min_connections` | `uint` | The minimum number of desired connections. | `2` | no | -| `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no | -| `desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no | -| `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | -| `allowed_network_error_percent` | `float` | The allowed error rate before scaling down. For example `0.50` allows 50% error rate. | `0.50` | no | +| Name | Type | Description | Default | Required | +|----------------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------|---------|----------| +| `drift_scale_up` | `duration` | The maximum amount of time between the timestamps of incoming signals and outgoing signals before increasing desired connections. | `60` | no | +| `drift_scale_down` | `duration` | The minimum amount of time between the timestamps of incoming signals and outgoing signals before descreasing desired connections. | `30` | no | +| `max_connections` | `uint` | The maximum number of desired connections. | `50` | no | +| `min_connections` | `uint` | The minimum number of desired connections. | `2` | no | +| `network_flush_interval` | `duration` | The length of time that network successes and failures are kept for determining desired connections. | `1m` | no | +| `desired_connections_lookback` | `duration` | The length of time that previous desired connections are kept for determining desired connections. | `5m` | no | +| `desired_check_interval` | `duration` | The length of time between checking for desired connections. | `5s` | no | +| `allowed_network_error_fraction` | `float` | The allowed error rate before scaling down. For example `0.50` allows 50% error rate. | `0.50` | no | Parallelism determines when to scale up or down the number of desired connections. This is accomplished by a variety of inputs: diff --git a/internal/component/prometheus/write/queue/types.go b/internal/component/prometheus/write/queue/types.go index ec906c8db4..020011a8c4 100644 --- a/internal/component/prometheus/write/queue/types.go +++ b/internal/component/prometheus/write/queue/types.go @@ -51,14 +51,14 @@ func defaultEndpointConfig() EndpointConfig { BatchCount: 1_000, FlushInterval: 1 * time.Second, Parallelism: ParallelismConfig{ - DriftScaleUp: 60 * time.Second, - DriftScaleDown: 30 * time.Second, - MaxConnections: 50, - MinConnections: 2, - NetworkFlushInterval: 1 * time.Minute, - DesiredConnectionsLookback: 5 * time.Minute, - DesiredCheckInterval: 5 * time.Second, - AllowedNetworkErrorPercent: 0.50, + DriftScaleUp: 60 * time.Second, + DriftScaleDown: 30 * time.Second, + MaxConnections: 50, + MinConnections: 2, + NetworkFlushInterval: 1 * time.Minute, + DesiredConnectionsLookback: 5 * time.Minute, + DesiredCheckInterval: 5 * time.Second, + AllowedNetworkErrorFraction: 0.50, }, } } @@ -89,7 +89,7 @@ func (r *Arguments) Validate() error { if conn.Parallelism.DesiredCheckInterval < 1*time.Second { return fmt.Errorf("desired_check_interval must be greater than or equal to 1 second") } - if conn.Parallelism.AllowedNetworkErrorPercent < 0 || conn.Parallelism.AllowedNetworkErrorPercent > 1 { + if conn.Parallelism.AllowedNetworkErrorFraction < 0 || conn.Parallelism.AllowedNetworkErrorFraction > 1 { return fmt.Errorf("allowed_network_error_percent must be between 0.00 and 1.00") } } @@ -127,14 +127,14 @@ type TLSConfig struct { } type ParallelismConfig struct { - DriftScaleUp time.Duration `alloy:"drift_scale_up,attr,optional"` - DriftScaleDown time.Duration `alloy:"drift_scale_down,attr,optional"` - MaxConnections uint `alloy:"max_connections,attr,optional"` - MinConnections uint `alloy:"min_connections,attr,optional"` - NetworkFlushInterval time.Duration `alloy:"network_flush_interval,attr,optional"` - DesiredConnectionsLookback time.Duration `alloy:"desired_connections_lookback,attr,optional"` - DesiredCheckInterval time.Duration `alloy:"desired_check_interval,attr,optional"` - AllowedNetworkErrorPercent float64 `alloy:"allowed_network_error_percent,attr,optional"` + DriftScaleUp time.Duration `alloy:"drift_scale_up,attr,optional"` + DriftScaleDown time.Duration `alloy:"drift_scale_down,attr,optional"` + MaxConnections uint `alloy:"max_connections,attr,optional"` + MinConnections uint `alloy:"min_connections,attr,optional"` + NetworkFlushInterval time.Duration `alloy:"network_flush_interval,attr,optional"` + DesiredConnectionsLookback time.Duration `alloy:"desired_connections_lookback,attr,optional"` + DesiredCheckInterval time.Duration `alloy:"desired_check_interval,attr,optional"` + AllowedNetworkErrorFraction float64 `alloy:"allowed_network_error_fraction,attr,optional"` } var UserAgent = fmt.Sprintf("Alloy/%s", version.Version) @@ -159,7 +159,7 @@ func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { ResetInterval: cc.Parallelism.NetworkFlushInterval, Lookback: cc.Parallelism.DesiredConnectionsLookback, CheckInterval: cc.Parallelism.DesiredCheckInterval, - AllowedNetworkErrorFraction: cc.Parallelism.AllowedNetworkErrorPercent, + AllowedNetworkErrorFraction: cc.Parallelism.AllowedNetworkErrorFraction, }, } if cc.BasicAuth != nil { diff --git a/internal/component/prometheus/write/queue/types_test.go b/internal/component/prometheus/write/queue/types_test.go index 9ef6d1ee4a..c5fbd17088 100644 --- a/internal/component/prometheus/write/queue/types_test.go +++ b/internal/component/prometheus/write/queue/types_test.go @@ -68,7 +68,7 @@ func TestParralelismConfig_Validate(t *testing.T) { { name: "invalid network error percentage low", config: func(cfg ParallelismConfig) ParallelismConfig { - cfg.AllowedNetworkErrorPercent = -0.01 + cfg.AllowedNetworkErrorFraction = -0.01 return cfg }, expectedErrMsg: "allowed_network_error_percent must be between 0.00 and 1.00", @@ -76,7 +76,7 @@ func TestParralelismConfig_Validate(t *testing.T) { { name: "invalid network error percentage high", config: func(cfg ParallelismConfig) ParallelismConfig { - cfg.AllowedNetworkErrorPercent = 1.01 + cfg.AllowedNetworkErrorFraction = 1.01 return cfg }, expectedErrMsg: "allowed_network_error_percent must be between 0.00 and 1.00", From 3cf281aa16644905479f91939e0af9924d007bb6 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 11 Feb 2025 14:52:36 -0500 Subject: [PATCH 15/18] Merge suggestions and committed. --- .../reference/components/prometheus/prometheus.write.queue.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index edc40ef17a..dfc03e51f4 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -141,11 +141,11 @@ For example, if the drift is increasing and the network failures are increasing, The `desired_check_interval` prevents connection flapping. Each time a desired connection is calculated, the connection is added to a list. -Before changing the desired connection, the system will choose the highest value in the lookback buffer. +Before changing the desired connection, `prometheus.write.queue` will choose the highest value in the lookback buffer. For example, for the past 5 minutes, desired connections have been: [2,1,1]. The check determines that the desired connections are 1, and the number of desired connections will not change because the value `2` is still in the lookback buffer. On the next check, the desired connections are [1,1,1]. -Now, it will change to 1. In general, the system is fast to increase and slow to decrease. +Since the `2` value has expired the desired connections will change to 1. In general, the system is fast to increase and slow to decrease. ## Exported fields From a8b1a3022d6f29918e9bd01a4313f7493a28a354 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Wed, 12 Feb 2025 10:56:50 -0500 Subject: [PATCH 16/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/prometheus/prometheus.write.queue.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index dfc03e51f4..50d8aafa4b 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -142,10 +142,11 @@ For example, if the drift is increasing and the network failures are increasing, The `desired_check_interval` prevents connection flapping. Each time a desired connection is calculated, the connection is added to a list. Before changing the desired connection, `prometheus.write.queue` will choose the highest value in the lookback buffer. -For example, for the past 5 minutes, desired connections have been: [2,1,1]. +For example, for the past 5 minutes, the desired connections have been: [2,1,1]. The check determines that the desired connections are 1, and the number of desired connections will not change because the value `2` is still in the lookback buffer. On the next check, the desired connections are [1,1,1]. -Since the `2` value has expired the desired connections will change to 1. In general, the system is fast to increase and slow to decrease. +Since the `2` value has expired, the desired connections will change to 1. +In general, the system is fast to increase and slow to decrease the desired connections. ## Exported fields From 30661dac40ecc55ecee8e9b70c8454a29ecf69a3 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Thu, 13 Feb 2025 09:22:27 -0500 Subject: [PATCH 17/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 50d8aafa4b..89fbb58fea 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -140,7 +140,7 @@ This data helps determine the nature of the drift. For example, if the drift is increasing and the network failures are increasing, the desired connections should not increase because that would increase the load on the endpoint. The `desired_check_interval` prevents connection flapping. -Each time a desired connection is calculated, the connection is added to a list. +Each time a desired connection is calculated, the connection is added to a lookback buffer. Before changing the desired connection, `prometheus.write.queue` will choose the highest value in the lookback buffer. For example, for the past 5 minutes, the desired connections have been: [2,1,1]. The check determines that the desired connections are 1, and the number of desired connections will not change because the value `2` is still in the lookback buffer. From b4eb0d8909b667aabd5f01f7ba81695b16bb1168 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Thu, 13 Feb 2025 10:50:44 -0500 Subject: [PATCH 18/18] Update docs/sources/reference/components/prometheus/prometheus.write.queue.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../reference/components/prometheus/prometheus.write.queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md index 89fbb58fea..746428c50e 100644 --- a/docs/sources/reference/components/prometheus/prometheus.write.queue.md +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -141,7 +141,7 @@ For example, if the drift is increasing and the network failures are increasing, The `desired_check_interval` prevents connection flapping. Each time a desired connection is calculated, the connection is added to a lookback buffer. -Before changing the desired connection, `prometheus.write.queue` will choose the highest value in the lookback buffer. +Before increasing or decreasing the desired connections, `prometheus.write.queue` chooses the highest value in the lookback buffer. For example, for the past 5 minutes, the desired connections have been: [2,1,1]. The check determines that the desired connections are 1, and the number of desired connections will not change because the value `2` is still in the lookback buffer. On the next check, the desired connections are [1,1,1].