From bfba7edb3094fb69194e0d272714620f762a5617 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Sun, 19 Jan 2025 02:02:47 +0530 Subject: [PATCH 01/17] Enhance RabbitMQ receiver with node metrics support --- receiver/rabbitmqreceiver/client.go | 61 ++--- receiver/rabbitmqreceiver/client_test.go | 35 +-- receiver/rabbitmqreceiver/config.go | 8 +- receiver/rabbitmqreceiver/config_test.go | 30 ++- receiver/rabbitmqreceiver/documentation.md | 32 +++ receiver/rabbitmqreceiver/factory.go | 42 +++- receiver/rabbitmqreceiver/factory_test.go | 23 +- .../generated_package_test.go | 3 +- .../internal/metadata/generated_config.go | 16 ++ .../metadata/generated_config_test.go | 8 + .../internal/metadata/generated_metrics.go | 238 +++++++++++++++++- .../metadata/generated_metrics_test.go | 72 ++++++ .../internal/metadata/testdata/config.yaml | 16 ++ .../rabbitmqreceiver/internal/mocks/client.go | 23 ++ .../internal/models/models.go | 12 + receiver/rabbitmqreceiver/metadata.yaml | 40 +++ receiver/rabbitmqreceiver/scraper.go | 104 ++++++-- receiver/rabbitmqreceiver/scraper_test.go | 66 +++-- .../apiresponses/get_nodes_response.json | 52 ++++ .../expected_metrics/metrics_golden.yaml | 60 ++--- 20 files changed, 800 insertions(+), 141 deletions(-) create mode 100644 receiver/rabbitmqreceiver/testdata/apiresponses/get_nodes_response.json diff --git a/receiver/rabbitmqreceiver/client.go b/receiver/rabbitmqreceiver/client.go index 2ce1e4c3075b..25da53314b31 100644 --- a/receiver/rabbitmqreceiver/client.go +++ b/receiver/rabbitmqreceiver/client.go @@ -16,16 +16,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver/internal/models" ) -// queuePath is the path to queues endpoint -const queuePath = "/api/queues" +const ( + // queuePath is the endpoint for RabbitMQ queues. + queuePath = "/api/queues" + + // nodePath is the endpoint for RabbitMQ nodes. + nodePath = "/api/nodes" +) type client interface { - // GetQueues calls "/api/queues" endpoint to get list of queues for the target node + // GetQueues retrieves a list of queues from the RabbitMQ API. GetQueues(ctx context.Context) ([]*models.Queue, error) + // GetNodes retrieves a list of nodes from the RabbitMQ API. + GetNodes(ctx context.Context) ([]*models.Node, error) } -var _ client = (*rabbitmqClient)(nil) - type rabbitmqClient struct { client *http.Client hostEndpoint string @@ -41,7 +46,7 @@ type rabbitmqCredentials struct { func newClient(ctx context.Context, cfg *Config, host component.Host, settings component.TelemetrySettings, logger *zap.Logger) (client, error) { httpClient, err := cfg.ToClient(ctx, host, settings) if err != nil { - return nil, fmt.Errorf("failed to create HTTP Client: %w", err) + return nil, fmt.Errorf("failed to create HTTP client: %w", err) } return &rabbitmqClient{ @@ -57,57 +62,53 @@ func newClient(ctx context.Context, cfg *Config, host component.Host, settings c func (c *rabbitmqClient) GetQueues(ctx context.Context) ([]*models.Queue, error) { var queues []*models.Queue - if err := c.get(ctx, queuePath, &queues); err != nil { - c.logger.Debug("Failed to retrieve queues", zap.Error(err)) + c.logger.Error("Failed to retrieve queues", zap.Error(err)) return nil, err } - return queues, nil } +func (c *rabbitmqClient) GetNodes(ctx context.Context) ([]*models.Node, error) { + var nodes []*models.Node + if err := c.get(ctx, nodePath, &nodes); err != nil { + c.logger.Error("Failed to retrieve nodes", zap.Error(err)) + return nil, err + } + return nodes, nil +} + func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) error { - // Construct endpoint and create request url := c.hostEndpoint + path req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) if err != nil { - return fmt.Errorf("failed to create get request for path %s: %w", path, err) + return fmt.Errorf("failed to create GET request for path %s: %w", path, err) } - // Set user/pass authentication req.SetBasicAuth(c.creds.username, c.creds.password) - // Make request resp, err := c.client.Do(req) if err != nil { - return fmt.Errorf("failed to make http request: %w", err) + return fmt.Errorf("failed to make HTTP request: %w", err) } - - // Defer body close defer func() { if closeErr := resp.Body.Close(); closeErr != nil { - c.logger.Warn("failed to close response body", zap.Error(closeErr)) + c.logger.Warn("Failed to close response body", zap.Error(closeErr)) } }() - // Check for OK status code if resp.StatusCode != http.StatusOK { - c.logger.Debug("rabbitMQ API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode)) - - // Attempt to extract the error payload - payloadData, err := io.ReadAll(resp.Body) - if err != nil { - c.logger.Debug("failed to read payload error message", zap.Error(err)) - } else { - c.logger.Debug("rabbitMQ API Error", zap.ByteString("api_error", payloadData)) - } + c.logger.Error("Non-200 response code received", zap.Int("status_code", resp.StatusCode)) - return fmt.Errorf("non 200 code returned %d", resp.StatusCode) + payload, readErr := io.ReadAll(resp.Body) + if readErr == nil { + c.logger.Debug("Error response payload", zap.ByteString("error_payload", payload)) + } + return fmt.Errorf("non-200 response code: %d", resp.StatusCode) } - // Decode the payload into the passed in response object if err := json.NewDecoder(resp.Body).Decode(respObj); err != nil { - return fmt.Errorf("failed to decode response payload: %w", err) + return fmt.Errorf("failed to decode response body: %w", err) } return nil diff --git a/receiver/rabbitmqreceiver/client_test.go b/receiver/rabbitmqreceiver/client_test.go index afbc7f69c2e6..872d66e4beee 100644 --- a/receiver/rabbitmqreceiver/client_test.go +++ b/receiver/rabbitmqreceiver/client_test.go @@ -26,12 +26,13 @@ import ( const ( queuesAPIResponseFile = "get_queues_response.json" + nodesAPIResponseFile = "get_nodes_response.json" ) func TestNewClient(t *testing.T) { - clientConfigNonExistandCA := confighttp.NewDefaultClientConfig() - clientConfigNonExistandCA.Endpoint = defaultEndpoint - clientConfigNonExistandCA.TLSSetting = configtls.ClientConfig{ + clientConfigInvalid := confighttp.NewDefaultClientConfig() + clientConfigInvalid.Endpoint = defaultEndpoint + clientConfigInvalid.TLSSetting = configtls.ClientConfig{ Config: configtls.Config{ CAFile: "/non/existent", }, @@ -40,7 +41,7 @@ func TestNewClient(t *testing.T) { clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint - testCase := []struct { + testCases := []struct { desc string cfg *Config host component.Host @@ -51,17 +52,19 @@ func TestNewClient(t *testing.T) { { desc: "Invalid HTTP config", cfg: &Config{ - ClientConfig: clientConfigNonExistandCA, + ClientConfig: clientConfigInvalid, }, host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: errors.New("failed to create HTTP Client"), + expectError: errors.New("failed to create HTTP client: failed to load TLS config"), }, { desc: "Valid Configuration", cfg: &Config{ ClientConfig: clientConfig, + Username: "valid_user", + Password: "valid_password", }, host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), @@ -70,7 +73,7 @@ func TestNewClient(t *testing.T) { }, } - for _, tc := range testCase { + for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { ac, err := newClient(context.Background(), tc.cfg, tc.host, tc.settings, tc.logger) if tc.expectError != nil { @@ -108,9 +111,9 @@ func TestGetQueuesDetails(t *testing.T) { tc := createTestClient(t, ts.URL) - clusters, err := tc.GetQueues(context.Background()) - require.Nil(t, clusters) - require.EqualError(t, err, "non 200 code returned 401") + queues, err := tc.GetQueues(context.Background()) + require.Nil(t, queues) + require.EqualError(t, err, "non-200 response code: 401") }, }, { @@ -125,13 +128,13 @@ func TestGetQueuesDetails(t *testing.T) { tc := createTestClient(t, ts.URL) - clusters, err := tc.GetQueues(context.Background()) - require.Nil(t, clusters) - require.ErrorContains(t, err, "failed to decode response payload") + queues, err := tc.GetQueues(context.Background()) + require.Nil(t, queues) + require.ErrorContains(t, err, "failed to decode response body: json: cannot unmarshal object into Go value of type []*models.Queue") }, }, { - desc: "Successful call", + desc: "Successful Queue API call", testFunc: func(t *testing.T) { data := loadAPIResponseData(t, queuesAPIResponseFile) @@ -149,9 +152,9 @@ func TestGetQueuesDetails(t *testing.T) { err := json.Unmarshal(data, &expected) require.NoError(t, err) - clusters, err := tc.GetQueues(context.Background()) + queues, err := tc.GetQueues(context.Background()) require.NoError(t, err) - require.Equal(t, expected, clusters) + require.Equal(t, expected, queues) }, }, } diff --git a/receiver/rabbitmqreceiver/config.go b/receiver/rabbitmqreceiver/config.go index 3479a8ae8b4d..70e00371dc2d 100644 --- a/receiver/rabbitmqreceiver/config.go +++ b/receiver/rabbitmqreceiver/config.go @@ -19,7 +19,6 @@ import ( var ( errMissingUsername = errors.New(`"username" not specified in config`) errMissingPassword = errors.New(`"password" not specified in config`) - errInvalidEndpoint = errors.New(`"endpoint" must be in the form of ://:`) ) @@ -31,20 +30,25 @@ type Config struct { confighttp.ClientConfig `mapstructure:",squash"` Username string `mapstructure:"username"` Password configopaque.String `mapstructure:"password"` + EnableNodeMetrics bool `mapstructure:"enable_node_metrics"` // Added flag for node metrics metadata.MetricsBuilderConfig `mapstructure:",squash"` } -// Validate validates the configuration by checking for missing or invalid fields +// Validate validates the configuration by checking for missing or invalid fields. func (cfg *Config) Validate() error { var err []error + + // Validate username if cfg.Username == "" { err = append(err, errMissingUsername) } + // Validate password if cfg.Password == "" { err = append(err, errMissingPassword) } + // Validate endpoint _, parseErr := url.Parse(cfg.Endpoint) if parseErr != nil { wrappedErr := fmt.Errorf("%s: %w", errInvalidEndpoint.Error(), parseErr) diff --git a/receiver/rabbitmqreceiver/config_test.go b/receiver/rabbitmqreceiver/config_test.go index dfcdfefe5cda..4bafdaadf1a3 100644 --- a/receiver/rabbitmqreceiver/config_test.go +++ b/receiver/rabbitmqreceiver/config_test.go @@ -20,7 +20,7 @@ import ( func TestValidate(t *testing.T) { clientConfigInvalid := confighttp.NewDefaultClientConfig() - clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg" + clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg" clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint @@ -38,7 +38,7 @@ func TestValidate(t *testing.T) { expectedErr: errors.Join( errMissingUsername, errMissingPassword, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)), }, { desc: "missing password and invalid endpoint", @@ -48,7 +48,7 @@ func TestValidate(t *testing.T) { }, expectedErr: errors.Join( errMissingPassword, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { @@ -59,7 +59,7 @@ func TestValidate(t *testing.T) { }, expectedErr: errors.Join( errMissingUsername, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { @@ -70,15 +70,26 @@ func TestValidate(t *testing.T) { ClientConfig: clientConfigInvalid, }, expectedErr: errors.Join( - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { - desc: "valid config", + desc: "valid config with node metrics enabled", cfg: &Config{ - Username: "otelu", - Password: "otelp", - ClientConfig: clientConfig, + Username: "otelu", + Password: "otelp", + ClientConfig: clientConfig, + EnableNodeMetrics: true, + }, + expectedErr: nil, + }, + { + desc: "valid config with node metrics disabled", + cfg: &Config{ + Username: "otelu", + Password: "otelp", + ClientConfig: clientConfig, + EnableNodeMetrics: false, }, expectedErr: nil, }, @@ -112,6 +123,7 @@ func TestLoadConfig(t *testing.T) { expected.Username = "otelu" expected.Password = "${env:RABBITMQ_PASSWORD}" expected.CollectionInterval = 10 * time.Second + expected.EnableNodeMetrics = true require.Equal(t, expected, cfg) } diff --git a/receiver/rabbitmqreceiver/documentation.md b/receiver/rabbitmqreceiver/documentation.md index 419398852eac..cdcaa56e5014 100644 --- a/receiver/rabbitmqreceiver/documentation.md +++ b/receiver/rabbitmqreceiver/documentation.md @@ -66,6 +66,38 @@ The number of messages published to a queue. | ---- | ----------- | ---------- | ----------------------- | --------- | | {messages} | Sum | Int | Cumulative | true | +### rabbitmq.node.disk_free + +Free disk space on the node. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {bytes} | Sum | Int | Cumulative | false | + +### rabbitmq.node.fd_used + +The number of file descriptors used on the node. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {fd} | Sum | Int | Cumulative | false | + +### rabbitmq.node.mem_limit + +The memory limit on the node. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {bytes} | Sum | Int | Cumulative | false | + +### rabbitmq.node.mem_used + +The memory used on the node. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {bytes} | Sum | Int | Cumulative | false | + ## Resource Attributes | Name | Description | Values | Enabled | diff --git a/receiver/rabbitmqreceiver/factory.go b/receiver/rabbitmqreceiver/factory.go index b863e40769fa..a47e6b72e4d2 100644 --- a/receiver/rabbitmqreceiver/factory.go +++ b/receiver/rabbitmqreceiver/factory.go @@ -20,40 +20,58 @@ import ( var errConfigNotRabbit = errors.New("config was not a RabbitMQ receiver config") -// NewFactory creates a new receiver factory +// NewFactory creates a new receiver factory. func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + ) } +// createDefaultConfig creates the default configuration for the RabbitMQ receiver. func createDefaultConfig() component.Config { - cfg := scraperhelper.NewDefaultControllerConfig() - cfg.CollectionInterval = 10 * time.Second + defaultControllerConfig := scraperhelper.NewDefaultControllerConfig() + defaultControllerConfig.CollectionInterval = 10 * time.Second - clientConfig := confighttp.NewDefaultClientConfig() - clientConfig.Endpoint = defaultEndpoint - clientConfig.Timeout = 10 * time.Second + defaultClientConfig := confighttp.NewDefaultClientConfig() + defaultClientConfig.Endpoint = defaultEndpoint + defaultClientConfig.Timeout = 10 * time.Second return &Config{ - ControllerConfig: cfg, - ClientConfig: clientConfig, + ControllerConfig: defaultControllerConfig, + ClientConfig: defaultClientConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + EnableNodeMetrics: true, // Default to enabling node metrics. } } -func createMetricsReceiver(_ context.Context, params receiver.Settings, rConf component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { +// createMetricsReceiver creates the metrics receiver for RabbitMQ. +func createMetricsReceiver( + _ context.Context, + params receiver.Settings, + rConf component.Config, + consumer consumer.Metrics, +) (receiver.Metrics, error) { cfg, ok := rConf.(*Config) if !ok { return nil, errConfigNotRabbit } rabbitScraper := newScraper(params.Logger, cfg, params) - s, err := scraper.NewMetrics(rabbitScraper.scrape, scraper.WithStart(rabbitScraper.start)) + + s, err := scraper.NewMetrics( + rabbitScraper.scrape, + scraper.WithStart(rabbitScraper.start), + ) if err != nil { return nil, err } - return scraperhelper.NewMetricsController(&cfg.ControllerConfig, params, consumer, scraperhelper.AddScraper(metadata.Type, s)) + return scraperhelper.NewMetricsController( + &cfg.ControllerConfig, + params, + consumer, + scraperhelper.AddScraper(metadata.Type, s), + ) } diff --git a/receiver/rabbitmqreceiver/factory_test.go b/receiver/rabbitmqreceiver/factory_test.go index 2e9900df69cc..4b8cbf54bcd6 100644 --- a/receiver/rabbitmqreceiver/factory_test.go +++ b/receiver/rabbitmqreceiver/factory_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" @@ -39,13 +38,14 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory() - var expectedCfg component.Config = &Config{ + expectedCfg := &Config{ ControllerConfig: scraperhelper.ControllerConfig{ CollectionInterval: 10 * time.Second, InitialDelay: time.Second, }, ClientConfig: clientConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + EnableNodeMetrics: true, // Ensure default includes EnableNodeMetrics } require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) @@ -72,12 +72,29 @@ func TestNewFactory(t *testing.T) { _, err := factory.CreateMetrics( context.Background(), receivertest.NewNopSettings(), - nil, + nil, // Passing nil config consumertest.NewNop(), ) require.ErrorIs(t, err, errConfigNotRabbit) }, }, + { + desc: "returns error if EnableNodeMetrics is false but metrics are attempted", + testFunc: func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.EnableNodeMetrics = false + + _, err := factory.CreateMetrics( + context.Background(), + receivertest.NewNopSettings(), + cfg, + consumertest.NewNop(), + ) + // No error expected since node metrics disabling shouldn't block creation. + require.NoError(t, err) + }, + }, } for _, tc := range testCases { diff --git a/receiver/rabbitmqreceiver/generated_package_test.go b/receiver/rabbitmqreceiver/generated_package_test.go index 1581d54bfae2..121c329b647d 100644 --- a/receiver/rabbitmqreceiver/generated_package_test.go +++ b/receiver/rabbitmqreceiver/generated_package_test.go @@ -3,9 +3,8 @@ package rabbitmqreceiver import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_config.go b/receiver/rabbitmqreceiver/internal/metadata/generated_config.go index 5857e880ebf0..14815f0a8bdb 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_config.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_config.go @@ -34,6 +34,10 @@ type MetricsConfig struct { RabbitmqMessageDelivered MetricConfig `mapstructure:"rabbitmq.message.delivered"` RabbitmqMessageDropped MetricConfig `mapstructure:"rabbitmq.message.dropped"` RabbitmqMessagePublished MetricConfig `mapstructure:"rabbitmq.message.published"` + RabbitmqNodeDiskFree MetricConfig `mapstructure:"rabbitmq.node.disk_free"` + RabbitmqNodeFdUsed MetricConfig `mapstructure:"rabbitmq.node.fd_used"` + RabbitmqNodeMemLimit MetricConfig `mapstructure:"rabbitmq.node.mem_limit"` + RabbitmqNodeMemUsed MetricConfig `mapstructure:"rabbitmq.node.mem_used"` } func DefaultMetricsConfig() MetricsConfig { @@ -56,6 +60,18 @@ func DefaultMetricsConfig() MetricsConfig { RabbitmqMessagePublished: MetricConfig{ Enabled: true, }, + RabbitmqNodeDiskFree: MetricConfig{ + Enabled: true, + }, + RabbitmqNodeFdUsed: MetricConfig{ + Enabled: true, + }, + RabbitmqNodeMemLimit: MetricConfig{ + Enabled: true, + }, + RabbitmqNodeMemUsed: MetricConfig{ + Enabled: true, + }, } } diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_config_test.go b/receiver/rabbitmqreceiver/internal/metadata/generated_config_test.go index df1be909649e..f4234eb5c81e 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_config_test.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_config_test.go @@ -31,6 +31,10 @@ func TestMetricsBuilderConfig(t *testing.T) { RabbitmqMessageDelivered: MetricConfig{Enabled: true}, RabbitmqMessageDropped: MetricConfig{Enabled: true}, RabbitmqMessagePublished: MetricConfig{Enabled: true}, + RabbitmqNodeDiskFree: MetricConfig{Enabled: true}, + RabbitmqNodeFdUsed: MetricConfig{Enabled: true}, + RabbitmqNodeMemLimit: MetricConfig{Enabled: true}, + RabbitmqNodeMemUsed: MetricConfig{Enabled: true}, }, ResourceAttributes: ResourceAttributesConfig{ RabbitmqNodeName: ResourceAttributeConfig{Enabled: true}, @@ -49,6 +53,10 @@ func TestMetricsBuilderConfig(t *testing.T) { RabbitmqMessageDelivered: MetricConfig{Enabled: false}, RabbitmqMessageDropped: MetricConfig{Enabled: false}, RabbitmqMessagePublished: MetricConfig{Enabled: false}, + RabbitmqNodeDiskFree: MetricConfig{Enabled: false}, + RabbitmqNodeFdUsed: MetricConfig{Enabled: false}, + RabbitmqNodeMemLimit: MetricConfig{Enabled: false}, + RabbitmqNodeMemUsed: MetricConfig{Enabled: false}, }, ResourceAttributes: ResourceAttributesConfig{ RabbitmqNodeName: ResourceAttributeConfig{Enabled: false}, diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go index e613c88c6368..ecad05124c0a 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go @@ -6,8 +6,8 @@ import ( "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/filter" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/filter" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" ) @@ -346,6 +346,210 @@ func newMetricRabbitmqMessagePublished(cfg MetricConfig) metricRabbitmqMessagePu return m } +type metricRabbitmqNodeDiskFree struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills rabbitmq.node.disk_free metric with initial data. +func (m *metricRabbitmqNodeDiskFree) init() { + m.data.SetName("rabbitmq.node.disk_free") + m.data.SetDescription("Free disk space on the node.") + m.data.SetUnit("{bytes}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricRabbitmqNodeDiskFree) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricRabbitmqNodeDiskFree) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricRabbitmqNodeDiskFree) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricRabbitmqNodeDiskFree(cfg MetricConfig) metricRabbitmqNodeDiskFree { + m := metricRabbitmqNodeDiskFree{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricRabbitmqNodeFdUsed struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills rabbitmq.node.fd_used metric with initial data. +func (m *metricRabbitmqNodeFdUsed) init() { + m.data.SetName("rabbitmq.node.fd_used") + m.data.SetDescription("The number of file descriptors used on the node.") + m.data.SetUnit("{fd}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricRabbitmqNodeFdUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricRabbitmqNodeFdUsed) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricRabbitmqNodeFdUsed) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricRabbitmqNodeFdUsed(cfg MetricConfig) metricRabbitmqNodeFdUsed { + m := metricRabbitmqNodeFdUsed{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricRabbitmqNodeMemLimit struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills rabbitmq.node.mem_limit metric with initial data. +func (m *metricRabbitmqNodeMemLimit) init() { + m.data.SetName("rabbitmq.node.mem_limit") + m.data.SetDescription("The memory limit on the node.") + m.data.SetUnit("{bytes}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricRabbitmqNodeMemLimit) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricRabbitmqNodeMemLimit) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricRabbitmqNodeMemLimit) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricRabbitmqNodeMemLimit(cfg MetricConfig) metricRabbitmqNodeMemLimit { + m := metricRabbitmqNodeMemLimit{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricRabbitmqNodeMemUsed struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills rabbitmq.node.mem_used metric with initial data. +func (m *metricRabbitmqNodeMemUsed) init() { + m.data.SetName("rabbitmq.node.mem_used") + m.data.SetDescription("The memory used on the node.") + m.data.SetUnit("{bytes}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricRabbitmqNodeMemUsed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricRabbitmqNodeMemUsed) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricRabbitmqNodeMemUsed) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricRabbitmqNodeMemUsed(cfg MetricConfig) metricRabbitmqNodeMemUsed { + m := metricRabbitmqNodeMemUsed{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + // MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations // required to produce metric representation defined in metadata and user config. type MetricsBuilder struct { @@ -362,6 +566,10 @@ type MetricsBuilder struct { metricRabbitmqMessageDelivered metricRabbitmqMessageDelivered metricRabbitmqMessageDropped metricRabbitmqMessageDropped metricRabbitmqMessagePublished metricRabbitmqMessagePublished + metricRabbitmqNodeDiskFree metricRabbitmqNodeDiskFree + metricRabbitmqNodeFdUsed metricRabbitmqNodeFdUsed + metricRabbitmqNodeMemLimit metricRabbitmqNodeMemLimit + metricRabbitmqNodeMemUsed metricRabbitmqNodeMemUsed } // MetricBuilderOption applies changes to default metrics builder. @@ -393,6 +601,10 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, opt metricRabbitmqMessageDelivered: newMetricRabbitmqMessageDelivered(mbc.Metrics.RabbitmqMessageDelivered), metricRabbitmqMessageDropped: newMetricRabbitmqMessageDropped(mbc.Metrics.RabbitmqMessageDropped), metricRabbitmqMessagePublished: newMetricRabbitmqMessagePublished(mbc.Metrics.RabbitmqMessagePublished), + metricRabbitmqNodeDiskFree: newMetricRabbitmqNodeDiskFree(mbc.Metrics.RabbitmqNodeDiskFree), + metricRabbitmqNodeFdUsed: newMetricRabbitmqNodeFdUsed(mbc.Metrics.RabbitmqNodeFdUsed), + metricRabbitmqNodeMemLimit: newMetricRabbitmqNodeMemLimit(mbc.Metrics.RabbitmqNodeMemLimit), + metricRabbitmqNodeMemUsed: newMetricRabbitmqNodeMemUsed(mbc.Metrics.RabbitmqNodeMemUsed), resourceAttributeIncludeFilter: make(map[string]filter.Filter), resourceAttributeExcludeFilter: make(map[string]filter.Filter), } @@ -489,6 +701,10 @@ func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) { mb.metricRabbitmqMessageDelivered.emit(ils.Metrics()) mb.metricRabbitmqMessageDropped.emit(ils.Metrics()) mb.metricRabbitmqMessagePublished.emit(ils.Metrics()) + mb.metricRabbitmqNodeDiskFree.emit(ils.Metrics()) + mb.metricRabbitmqNodeFdUsed.emit(ils.Metrics()) + mb.metricRabbitmqNodeMemLimit.emit(ils.Metrics()) + mb.metricRabbitmqNodeMemUsed.emit(ils.Metrics()) for _, op := range options { op.apply(rm) @@ -550,6 +766,26 @@ func (mb *MetricsBuilder) RecordRabbitmqMessagePublishedDataPoint(ts pcommon.Tim mb.metricRabbitmqMessagePublished.recordDataPoint(mb.startTime, ts, val) } +// RecordRabbitmqNodeDiskFreeDataPoint adds a data point to rabbitmq.node.disk_free metric. +func (mb *MetricsBuilder) RecordRabbitmqNodeDiskFreeDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricRabbitmqNodeDiskFree.recordDataPoint(mb.startTime, ts, val) +} + +// RecordRabbitmqNodeFdUsedDataPoint adds a data point to rabbitmq.node.fd_used metric. +func (mb *MetricsBuilder) RecordRabbitmqNodeFdUsedDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricRabbitmqNodeFdUsed.recordDataPoint(mb.startTime, ts, val) +} + +// RecordRabbitmqNodeMemLimitDataPoint adds a data point to rabbitmq.node.mem_limit metric. +func (mb *MetricsBuilder) RecordRabbitmqNodeMemLimitDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricRabbitmqNodeMemLimit.recordDataPoint(mb.startTime, ts, val) +} + +// RecordRabbitmqNodeMemUsedDataPoint adds a data point to rabbitmq.node.mem_used metric. +func (mb *MetricsBuilder) RecordRabbitmqNodeMemUsedDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricRabbitmqNodeMemUsed.recordDataPoint(mb.startTime, ts, val) +} + // Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, // and metrics builder should update its startTime and reset it's internal state accordingly. func (mb *MetricsBuilder) Reset(options ...MetricBuilderOption) { diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go index 6ec9325fa90d..c8e7daf9c881 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go @@ -92,6 +92,22 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordRabbitmqMessagePublishedDataPoint(ts, 1) + defaultMetricsCount++ + allMetricsCount++ + mb.RecordRabbitmqNodeDiskFreeDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordRabbitmqNodeFdUsedDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordRabbitmqNodeMemLimitDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordRabbitmqNodeMemUsedDataPoint(ts, 1) + rb := mb.NewResourceBuilder() rb.SetRabbitmqNodeName("rabbitmq.node.name-val") rb.SetRabbitmqQueueName("rabbitmq.queue.name-val") @@ -205,6 +221,62 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) + case "rabbitmq.node.disk_free": + assert.False(t, validatedMetrics["rabbitmq.node.disk_free"], "Found a duplicate in the metrics slice: rabbitmq.node.disk_free") + validatedMetrics["rabbitmq.node.disk_free"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Free disk space on the node.", ms.At(i).Description()) + assert.Equal(t, "{bytes}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "rabbitmq.node.fd_used": + assert.False(t, validatedMetrics["rabbitmq.node.fd_used"], "Found a duplicate in the metrics slice: rabbitmq.node.fd_used") + validatedMetrics["rabbitmq.node.fd_used"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of file descriptors used on the node.", ms.At(i).Description()) + assert.Equal(t, "{fd}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "rabbitmq.node.mem_limit": + assert.False(t, validatedMetrics["rabbitmq.node.mem_limit"], "Found a duplicate in the metrics slice: rabbitmq.node.mem_limit") + validatedMetrics["rabbitmq.node.mem_limit"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The memory limit on the node.", ms.At(i).Description()) + assert.Equal(t, "{bytes}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "rabbitmq.node.mem_used": + assert.False(t, validatedMetrics["rabbitmq.node.mem_used"], "Found a duplicate in the metrics slice: rabbitmq.node.mem_used") + validatedMetrics["rabbitmq.node.mem_used"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The memory used on the node.", ms.At(i).Description()) + assert.Equal(t, "{bytes}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) } } }) diff --git a/receiver/rabbitmqreceiver/internal/metadata/testdata/config.yaml b/receiver/rabbitmqreceiver/internal/metadata/testdata/config.yaml index 0ed13b5f035e..da5d7f93dda3 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/rabbitmqreceiver/internal/metadata/testdata/config.yaml @@ -13,6 +13,14 @@ all_set: enabled: true rabbitmq.message.published: enabled: true + rabbitmq.node.disk_free: + enabled: true + rabbitmq.node.fd_used: + enabled: true + rabbitmq.node.mem_limit: + enabled: true + rabbitmq.node.mem_used: + enabled: true resource_attributes: rabbitmq.node.name: enabled: true @@ -34,6 +42,14 @@ none_set: enabled: false rabbitmq.message.published: enabled: false + rabbitmq.node.disk_free: + enabled: false + rabbitmq.node.fd_used: + enabled: false + rabbitmq.node.mem_limit: + enabled: false + rabbitmq.node.mem_used: + enabled: false resource_attributes: rabbitmq.node.name: enabled: false diff --git a/receiver/rabbitmqreceiver/internal/mocks/client.go b/receiver/rabbitmqreceiver/internal/mocks/client.go index a33ec5872698..58648e92f33c 100644 --- a/receiver/rabbitmqreceiver/internal/mocks/client.go +++ b/receiver/rabbitmqreceiver/internal/mocks/client.go @@ -37,3 +37,26 @@ func (_m *MockClient) GetQueues(ctx context.Context) ([]*models.Queue, error) { return r0, r1 } + +// GetNodes provides a mock function with given fields: ctx +func (_m *MockClient) GetNodes(ctx context.Context) ([]*models.Node, error) { + ret := _m.Called(ctx) + + var r0 []*models.Node + if rf, ok := ret.Get(0).(func(context.Context) []*models.Node); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*models.Node) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/receiver/rabbitmqreceiver/internal/models/models.go b/receiver/rabbitmqreceiver/internal/models/models.go index 0d97f616f320..fd670783b66b 100644 --- a/receiver/rabbitmqreceiver/internal/models/models.go +++ b/receiver/rabbitmqreceiver/internal/models/models.go @@ -18,3 +18,15 @@ type Queue struct { // Embedded Metrics MessageStats map[string]any `json:"message_stats"` } + +// Node represents a RabbitMQ node in the API response +type Node struct { + // Identifiers + Name string `json:"name"` + + // Metrics + DiskFree int64 `json:"disk_free"` + FDUsed int64 `json:"fd_used"` + MemLimit int64 `json:"mem_limit"` + MemUsed int64 `json:"mem_used"` +} diff --git a/receiver/rabbitmqreceiver/metadata.yaml b/receiver/rabbitmqreceiver/metadata.yaml index ebea9452b4a2..7146f98bdfc7 100644 --- a/receiver/rabbitmqreceiver/metadata.yaml +++ b/receiver/rabbitmqreceiver/metadata.yaml @@ -1,5 +1,12 @@ type: rabbitmq +tests: + config: + endpoint: "http://localhost:15672" + username: "testuser" + password: "testpassword" + collection_interval: 10s + status: class: receiver stability: @@ -31,6 +38,7 @@ attributes: enum: - ready - unacknowledged + metrics: rabbitmq.consumer.count: description: The number of consumers currently reading from the queue. @@ -81,3 +89,35 @@ metrics: value_type: int attributes: [message.state] enabled: true + rabbitmq.node.disk_free: + description: Free disk space on the node. + unit: "{bytes}" + sum: + monotonic: false + aggregation_temporality: cumulative + value_type: int + enabled: true + rabbitmq.node.fd_used: + description: The number of file descriptors used on the node. + unit: "{fd}" + sum: + monotonic: false + aggregation_temporality: cumulative + value_type: int + enabled: true + rabbitmq.node.mem_limit: + description: The memory limit on the node. + unit: "{bytes}" + sum: + monotonic: false + aggregation_temporality: cumulative + value_type: int + enabled: true + rabbitmq.node.mem_used: + description: The memory used on the node. + unit: "{bytes}" + sum: + monotonic: false + aggregation_temporality: cumulative + value_type: int + enabled: true diff --git a/receiver/rabbitmqreceiver/scraper.go b/receiver/rabbitmqreceiver/scraper.go index fd0ad5d5f529..48c8ab402fa9 100644 --- a/receiver/rabbitmqreceiver/scraper.go +++ b/receiver/rabbitmqreceiver/scraper.go @@ -6,6 +6,8 @@ package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "errors" + "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -55,55 +57,87 @@ func newScraper(logger *zap.Logger, cfg *Config, settings receiver.Settings) *ra } } -// start starts the scraper by creating a new HTTP Client on the scraper -func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) (err error) { - r.client, err = newClient(ctx, r.cfg, host, r.settings, r.logger) - return +// start starts the scraper by creating a new HTTP Client +func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) error { + if r.cfg.Endpoint == "" || r.cfg.Username == "" || r.cfg.Password == "" { + return fmt.Errorf("invalid configuration: missing endpoint, username, or password") + } + + client, err := newClient(ctx, r.cfg, host, r.settings, r.logger) + if err != nil { + return fmt.Errorf("failed to initialize RabbitMQ client: %w", err) + } + + r.client = client + return nil } // scrape collects metrics from the RabbitMQ API func (r *rabbitmqScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { now := pcommon.NewTimestampFromTime(time.Now()) - // Validate we don't attempt to scrape without initializing the client + // Validate client initialization if r.client == nil { return pmetric.NewMetrics(), errClientNotInit } - // Get queues for processing + // Collect queue metrics + if err := r.collectQueueMetrics(ctx, now); err != nil { + r.logger.Error("failed to collect queue metrics", zap.Error(err)) + return pmetric.NewMetrics(), err + } + + // Collect node metrics if enabled + if r.cfg.EnableNodeMetrics { + if err := r.collectNodeMetrics(ctx, now); err != nil { + r.logger.Error("failed to collect node metrics", zap.Error(err)) + return pmetric.NewMetrics(), err + } + } + + return r.mb.Emit(), nil +} + +func (r *rabbitmqScraper) collectQueueMetrics(ctx context.Context, now pcommon.Timestamp) error { queues, err := r.client.GetQueues(ctx) if err != nil { - return pmetric.NewMetrics(), err + return err } - // Collect metrics for each queue for _, queue := range queues { r.collectQueue(queue, now) } + return nil +} - return r.mb.Emit(), nil +func (r *rabbitmqScraper) collectNodeMetrics(ctx context.Context, now pcommon.Timestamp) error { + nodes, err := r.client.GetNodes(ctx) + if err != nil { + return err + } + + for _, node := range nodes { + r.collectNode(node, now) + } + return nil } -// collectQueue collects metrics +// collectQueue collects metrics for a specific queue func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestamp) { r.mb.RecordRabbitmqConsumerCountDataPoint(now, queue.Consumers) r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.UnacknowledgedMessages, metadata.AttributeMessageStateUnacknowledged) r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.ReadyMessages, metadata.AttributeMessageStateReady) for _, messageStatMetric := range messageStatMetrics { - // Get metric value val, ok := queue.MessageStats[messageStatMetric] - // A metric may not exist if the actions that increment it do not occur if !ok { - r.logger.Debug("metric not found", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name)) + r.logger.Debug("metric not found", zap.String("metric", messageStatMetric), zap.String("queue", queue.Name)) continue } - // Convert to int64 val64, ok := convertValToInt64(val) if !ok { - // Log warning if the metric is not in the format we expect - r.logger.Warn("metric not int64", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name)) + r.logger.Warn("metric not int64", zap.String("metric", messageStatMetric), zap.String("queue", queue.Name)) continue } @@ -118,6 +152,7 @@ func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestam r.mb.RecordRabbitmqMessageDroppedDataPoint(now, val64) } } + rb := r.mb.NewResourceBuilder() rb.SetRabbitmqQueueName(queue.Name) rb.SetRabbitmqNodeName(queue.Node) @@ -125,14 +160,41 @@ func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestam r.mb.EmitForResource(metadata.WithResource(rb.Emit())) } -// convertValToInt64 values from message state unmarshal as float64s but should be int64. -// Need to do a double cast to get an int64. -// This should never fail but worth checking just in case. +func (r *rabbitmqScraper) collectNode(node *models.Node, now pcommon.Timestamp) { + // Record node-specific metrics + r.mb.RecordRabbitmqNodeDiskFreeDataPoint(now, node.DiskFree) + r.mb.RecordRabbitmqNodeFdUsedDataPoint(now, node.FDUsed) + r.mb.RecordRabbitmqNodeMemLimitDataPoint(now, node.MemLimit) + r.mb.RecordRabbitmqNodeMemUsedDataPoint(now, node.MemUsed) + + // Build resource attributes for the node + rb := r.mb.NewResourceBuilder() + rb.SetRabbitmqNodeName(node.Name) + + // Emit resource and log attributes for debugging + resource := rb.Emit() + r.logger.Debug("Emitting resource for node", zap.String("attributes", formatResourceAttributes(resource))) + + // Emit metrics for the node + r.mb.EmitForResource(metadata.WithResource(resource)) +} + +// formatResourceAttributes converts resource attributes to a string for logging +func formatResourceAttributes(resource pcommon.Resource) string { + attrs := resource.Attributes() + var attributes []string + attrs.Range(func(k string, v pcommon.Value) bool { + attributes = append(attributes, k+"="+v.AsString()) + return true + }) + return "{" + strings.Join(attributes, ", ") + "}" +} + +// convertValToInt64 converts a value to int64 func convertValToInt64(val any) (int64, bool) { f64Val, ok := val.(float64) if !ok { - return 0, ok + return 0, false } - return int64(f64Val), true } diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index 112941b136d9..f48c9ba8ac88 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -26,13 +25,8 @@ import ( ) func TestScraperStart(t *testing.T) { - clientConfigNonExistandCA := confighttp.NewDefaultClientConfig() - clientConfigNonExistandCA.Endpoint = defaultEndpoint - clientConfigNonExistandCA.TLSSetting = configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/non/existent", - }, - } + clientConfigInvalid := confighttp.NewDefaultClientConfig() + clientConfigInvalid.Endpoint = "invalid://endpoint" clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint @@ -46,7 +40,11 @@ func TestScraperStart(t *testing.T) { desc: "Bad Config", scraper: &rabbitmqScraper{ cfg: &Config{ - ClientConfig: clientConfigNonExistandCA, + ClientConfig: confighttp.ClientConfig{ + Endpoint: "", // Invalid endpoint + }, + Username: "", // Missing username + Password: "", // Missing password }, settings: componenttest.NewNopTelemetrySettings(), }, @@ -56,7 +54,11 @@ func TestScraperStart(t *testing.T) { desc: "Valid Config", scraper: &rabbitmqScraper{ cfg: &Config{ - ClientConfig: clientConfig, + ClientConfig: confighttp.ClientConfig{ + Endpoint: "http://localhost:15672", + }, + Username: "valid_user", + Password: "valid_password", }, settings: componenttest.NewNopTelemetrySettings(), }, @@ -76,10 +78,11 @@ func TestScraperStart(t *testing.T) { } } -func TestScaperScrape(t *testing.T) { +func TestScraperScrape(t *testing.T) { testCases := []struct { desc string setupMockClient func(t *testing.T) client + enableNodeMetrics bool expectedMetricGen func(t *testing.T) pmetric.Metrics expectedErr error }{ @@ -88,6 +91,7 @@ func TestScaperScrape(t *testing.T) { setupMockClient: func(*testing.T) client { return nil }, + enableNodeMetrics: false, expectedMetricGen: func(*testing.T) pmetric.Metrics { return pmetric.NewMetrics() }, @@ -100,17 +104,17 @@ func TestScaperScrape(t *testing.T) { mockClient.On("GetQueues", mock.Anything).Return(nil, errors.New("some api error")) return &mockClient }, + enableNodeMetrics: false, expectedMetricGen: func(*testing.T) pmetric.Metrics { return pmetric.NewMetrics() }, expectedErr: errors.New("some api error"), }, { - desc: "Successful Collection", + desc: "Successful Queue Collection", setupMockClient: func(t *testing.T) client { mockClient := mocks.MockClient{} - // use helper function from client tests - data := loadAPIResponseData(t, queuesAPIResponseFile) + data := loadAPIResponseData(t, "get_queues_response.json") var queues []*models.Queue err := json.Unmarshal(data, &queues) require.NoError(t, err) @@ -118,6 +122,38 @@ func TestScaperScrape(t *testing.T) { mockClient.On("GetQueues", mock.Anything).Return(queues, nil) return &mockClient }, + enableNodeMetrics: false, + expectedMetricGen: func(t *testing.T) pmetric.Metrics { + goldenPath := filepath.Join("testdata", "expected_metrics", "metrics_golden.yaml") + expectedMetrics, err := golden.ReadMetrics(goldenPath) + require.NoError(t, err) + return expectedMetrics + }, + expectedErr: nil, + }, + { + desc: "Successful Node Metrics Collection", + setupMockClient: func(t *testing.T) client { + mockClient := mocks.MockClient{} + + // Mock data for nodes + nodeData := loadAPIResponseData(t, "get_nodes_response.json") + var nodes []*models.Node + err := json.Unmarshal(nodeData, &nodes) + require.NoError(t, err) + + // Mock data for queues + queueData := loadAPIResponseData(t, "get_queues_response.json") + var queues []*models.Queue + err = json.Unmarshal(queueData, &queues) + require.NoError(t, err) + + // Mock client methods + mockClient.On("GetNodes", mock.Anything).Return(nodes, nil) + mockClient.On("GetQueues", mock.Anything).Return(queues, nil) + + return &mockClient + }, expectedMetricGen: func(t *testing.T) pmetric.Metrics { goldenPath := filepath.Join("testdata", "expected_metrics", "metrics_golden.yaml") expectedMetrics, err := golden.ReadMetrics(goldenPath) @@ -132,6 +168,7 @@ func TestScaperScrape(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { scraper := newScraper(zap.NewNop(), createDefaultConfig().(*Config), receivertest.NewNopSettings()) scraper.client = tc.setupMockClient(t) + scraper.cfg.EnableNodeMetrics = tc.enableNodeMetrics actualMetrics, err := scraper.scrape(context.Background()) if tc.expectedErr == nil { @@ -141,7 +178,6 @@ func TestScaperScrape(t *testing.T) { } expectedMetrics := tc.expectedMetricGen(t) - require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp(), pmetrictest.IgnoreResourceMetricsOrder(), diff --git a/receiver/rabbitmqreceiver/testdata/apiresponses/get_nodes_response.json b/receiver/rabbitmqreceiver/testdata/apiresponses/get_nodes_response.json new file mode 100644 index 000000000000..ebef2ca80de3 --- /dev/null +++ b/receiver/rabbitmqreceiver/testdata/apiresponses/get_nodes_response.json @@ -0,0 +1,52 @@ +[ + { + "name": "rabbit@66a063ecff83", + "disk_free": 50000000, + "fd_used": 120, + "mem_limit": 1024000000, + "mem_used": 512000000, + "uptime": 86400000, + "run_queue": 2, + "processors": 4, + "io_read_count": 10530, + "io_read_bytes": 128003200, + "io_write_count": 7234, + "io_write_bytes": 243320200, + "io_sync_count": 580, + "io_seek_count": 342, + "io_reopen_count": 120, + "metrics": { + "channel_closed": 100, + "channel_open": 8, + "connection_closed": 50, + "connection_open": 4, + "queue_index_read": 5021, + "queue_index_write": 3512 + } + }, + { + "name": "rabbit@another_node", + "disk_free": 70000000, + "fd_used": 100, + "mem_limit": 2048000000, + "mem_used": 1024000000, + "uptime": 172800000, + "run_queue": 1, + "processors": 8, + "io_read_count": 21030, + "io_read_bytes": 256003200, + "io_write_count": 14468, + "io_write_bytes": 486640400, + "io_sync_count": 1160, + "io_seek_count": 684, + "io_reopen_count": 240, + "metrics": { + "channel_closed": 200, + "channel_open": 16, + "connection_closed": 100, + "connection_open": 8, + "queue_index_read": 10042, + "queue_index_write": 7024 + } + } +] diff --git a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml index dbb7ec2e50e1..4a81b72440d6 100644 --- a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml +++ b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml @@ -17,29 +17,29 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: "0" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 0 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 unit: '{consumers}' - description: The total number of messages currently in the queue. name: rabbitmq.message.current sum: aggregationTemporality: 2 dataPoints: - - asInt: "0" + - asInt: 0 attributes: - key: state value: stringValue: ready - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" - - asInt: "0" + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 + - asInt: 0 attributes: - key: state value: stringValue: unacknowledged - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 unit: '{messages}' scope: name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver @@ -62,18 +62,18 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: "1" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 1 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 unit: '{consumers}' - description: The number of messages acknowledged by consumers. name: rabbitmq.message.acknowledged sum: aggregationTemporality: 2 dataPoints: - - asInt: "7827" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 7827 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 isMonotonic: true unit: '{messages}' - description: The total number of messages currently in the queue. @@ -81,29 +81,29 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: "1" + - asInt: 1 attributes: - key: state value: stringValue: ready - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" - - asInt: "1" + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 + - asInt: 1 attributes: - key: state value: stringValue: unacknowledged - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 unit: '{messages}' - description: The number of messages delivered to consumers. name: rabbitmq.message.delivered sum: aggregationTemporality: 2 dataPoints: - - asInt: "7828" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 7828 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 isMonotonic: true unit: '{messages}' - description: The number of messages dropped as unroutable. @@ -111,9 +111,9 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: "0" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 0 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 isMonotonic: true unit: '{messages}' - description: The number of messages published to a queue. @@ -121,9 +121,9 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: "7830" - startTimeUnixNano: "1000000" - timeUnixNano: "2000000" + - asInt: 7830 + startTimeUnixNano: 1000000 + timeUnixNano: 2000000 isMonotonic: true unit: '{messages}' scope: From c0e24b4e2c75bd047ef53a36e863308e3e5a04df Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Tue, 21 Jan 2025 00:25:09 +0530 Subject: [PATCH 02/17] =?UTF-8?q?Here=E2=80=99s=20the=20changelog=20entry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbitmqreceiver_node_metrics_enhancement.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml diff --git a/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml b/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml new file mode 100644 index 000000000000..65d9fc2109d7 --- /dev/null +++ b/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: rabbitmqreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Enhance the RabbitMQ receiver to collect and report node-level metrics (`rabbitmq.node.disk_free`, `rabbitmq.node.fd_used`, `rabbitmq.node.mem_limit`, and `rabbitmq.node.mem_used`). This provides additional observability into the state and resource usage of RabbitMQ nodes." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] From 88d22a0d1a0b63454194583dcf50c7759db43837 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Tue, 21 Jan 2025 00:27:53 +0530 Subject: [PATCH 03/17] added issue id --- .chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml b/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml index 65d9fc2109d7..97a01dc6ff32 100644 --- a/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml +++ b/.chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml @@ -8,4 +8,4 @@ component: rabbitmqreceiver note: "Enhance the RabbitMQ receiver to collect and report node-level metrics (`rabbitmq.node.disk_free`, `rabbitmq.node.fd_used`, `rabbitmq.node.mem_limit`, and `rabbitmq.node.mem_used`). This provides additional observability into the state and resource usage of RabbitMQ nodes." # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [36925] From 9b6e76d39e2229648b823e06c4857f8852738986 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Tue, 21 Jan 2025 12:55:38 +0530 Subject: [PATCH 04/17] Updated Readme and metadata.yaml --- receiver/rabbitmqreceiver/README.md | 2 ++ receiver/rabbitmqreceiver/metadata.yaml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/receiver/rabbitmqreceiver/README.md b/receiver/rabbitmqreceiver/README.md index d7d6d4f6c873..5a47c50139e3 100644 --- a/receiver/rabbitmqreceiver/README.md +++ b/receiver/rabbitmqreceiver/README.md @@ -34,6 +34,7 @@ The following settings are optional: - `endpoint` (default: `http://localhost:15672`): The URL of the node to be monitored. - `collection_interval` (default = `10s`): This receiver collects metrics on an interval. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. - `tls` (defaults defined [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)): TLS control. By default insecure settings are rejected and certificate verification is on. +- `enable_node_metrics` (default = `false`): Enables collection of RabbitMQ node-level metrics such as memory usage, file descriptors, and disk space. ### Example Configuration @@ -44,6 +45,7 @@ receivers: username: otelu password: ${env:RABBITMQ_PASSWORD} collection_interval: 10s + enable_node_metrics: true ``` The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md). diff --git a/receiver/rabbitmqreceiver/metadata.yaml b/receiver/rabbitmqreceiver/metadata.yaml index 7146f98bdfc7..34d4a0c61418 100644 --- a/receiver/rabbitmqreceiver/metadata.yaml +++ b/receiver/rabbitmqreceiver/metadata.yaml @@ -1,6 +1,6 @@ type: rabbitmq -tests: +tests: # generated_component_test.go looking for test.config from metadata.yaml config: endpoint: "http://localhost:15672" username: "testuser" From b1c88246e571d4946be90a4c43159c45b59b2101 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Tue, 21 Jan 2025 14:09:45 +0530 Subject: [PATCH 05/17] updated some nits --- receiver/rabbitmqreceiver/client.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/receiver/rabbitmqreceiver/client.go b/receiver/rabbitmqreceiver/client.go index 25da53314b31..04f15181aab3 100644 --- a/receiver/rabbitmqreceiver/client.go +++ b/receiver/rabbitmqreceiver/client.go @@ -25,9 +25,9 @@ const ( ) type client interface { - // GetQueues retrieves a list of queues from the RabbitMQ API. + // GetQueues calls "/api/queues" endpoint to get list of queues for the target node GetQueues(ctx context.Context) ([]*models.Queue, error) - // GetNodes retrieves a list of nodes from the RabbitMQ API. + // GetQueues calls "/api/nodes" endpoint to get list of nodes for the target node GetNodes(ctx context.Context) ([]*models.Node, error) } @@ -46,7 +46,7 @@ type rabbitmqCredentials struct { func newClient(ctx context.Context, cfg *Config, host component.Host, settings component.TelemetrySettings, logger *zap.Logger) (client, error) { httpClient, err := cfg.ToClient(ctx, host, settings) if err != nil { - return nil, fmt.Errorf("failed to create HTTP client: %w", err) + return nil, fmt.Errorf("failed to create HTTP Client: %w", err) } return &rabbitmqClient{ @@ -62,27 +62,32 @@ func newClient(ctx context.Context, cfg *Config, host component.Host, settings c func (c *rabbitmqClient) GetQueues(ctx context.Context) ([]*models.Queue, error) { var queues []*models.Queue + if err := c.get(ctx, queuePath, &queues); err != nil { c.logger.Error("Failed to retrieve queues", zap.Error(err)) return nil, err } + return queues, nil } func (c *rabbitmqClient) GetNodes(ctx context.Context) ([]*models.Node, error) { var nodes []*models.Node + if err := c.get(ctx, nodePath, &nodes); err != nil { c.logger.Error("Failed to retrieve nodes", zap.Error(err)) return nil, err } + return nodes, nil } func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) error { + // Construct endpoint and create request url := c.hostEndpoint + path req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) if err != nil { - return fmt.Errorf("failed to create GET request for path %s: %w", path, err) + return fmt.Errorf("failed to create get request for path %s: %w", path, err) } req.SetBasicAuth(c.creds.username, c.creds.password) @@ -91,9 +96,11 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro if err != nil { return fmt.Errorf("failed to make HTTP request: %w", err) } + + // Defer body close defer func() { if closeErr := resp.Body.Close(); closeErr != nil { - c.logger.Warn("Failed to close response body", zap.Error(closeErr)) + c.logger.Warn("failed to close response body", zap.Error(closeErr)) } }() @@ -107,8 +114,9 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro return fmt.Errorf("non-200 response code: %d", resp.StatusCode) } + // Decode the payload into the passed in response object if err := json.NewDecoder(resp.Body).Decode(respObj); err != nil { - return fmt.Errorf("failed to decode response body: %w", err) + return fmt.Errorf("failed to decode response payload: %w", err) } return nil From 31f7a2c5fd679359cfae8f293738f479f1e91a00 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Sun, 26 Jan 2025 21:20:49 +0530 Subject: [PATCH 06/17] Updated some nits --- receiver/rabbitmqreceiver/client.go | 9 ++- receiver/rabbitmqreceiver/config.go | 3 +- receiver/rabbitmqreceiver/factory.go | 5 +- receiver/rabbitmqreceiver/scraper_test.go | 1 + .../expected_metrics/metrics_golden.yaml | 56 +++++++++---------- 5 files changed, 39 insertions(+), 35 deletions(-) diff --git a/receiver/rabbitmqreceiver/client.go b/receiver/rabbitmqreceiver/client.go index 04f15181aab3..1b2025912255 100644 --- a/receiver/rabbitmqreceiver/client.go +++ b/receiver/rabbitmqreceiver/client.go @@ -64,7 +64,7 @@ func (c *rabbitmqClient) GetQueues(ctx context.Context) ([]*models.Queue, error) var queues []*models.Queue if err := c.get(ctx, queuePath, &queues); err != nil { - c.logger.Error("Failed to retrieve queues", zap.Error(err)) + c.logger.Debug("Failed to retrieve queues", zap.Error(err)) return nil, err } @@ -90,11 +90,13 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro return fmt.Errorf("failed to create get request for path %s: %w", path, err) } + // Set user/pass authentication req.SetBasicAuth(c.creds.username, c.creds.password) + // Make request resp, err := c.client.Do(req) if err != nil { - return fmt.Errorf("failed to make HTTP request: %w", err) + return fmt.Errorf("failed to make http request: %w", err) } // Defer body close @@ -104,8 +106,9 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro } }() + // Check for OK status code if resp.StatusCode != http.StatusOK { - c.logger.Error("Non-200 response code received", zap.Int("status_code", resp.StatusCode)) + c.logger.Debug("rabbitMQ API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode)) payload, readErr := io.ReadAll(resp.Body) if readErr == nil { diff --git a/receiver/rabbitmqreceiver/config.go b/receiver/rabbitmqreceiver/config.go index 70e00371dc2d..30475b6a987a 100644 --- a/receiver/rabbitmqreceiver/config.go +++ b/receiver/rabbitmqreceiver/config.go @@ -19,6 +19,7 @@ import ( var ( errMissingUsername = errors.New(`"username" not specified in config`) errMissingPassword = errors.New(`"password" not specified in config`) + errInvalidEndpoint = errors.New(`"endpoint" must be in the form of ://:`) ) @@ -34,7 +35,7 @@ type Config struct { metadata.MetricsBuilderConfig `mapstructure:",squash"` } -// Validate validates the configuration by checking for missing or invalid fields. +// Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { var err []error diff --git a/receiver/rabbitmqreceiver/factory.go b/receiver/rabbitmqreceiver/factory.go index a47e6b72e4d2..e42b500e0273 100644 --- a/receiver/rabbitmqreceiver/factory.go +++ b/receiver/rabbitmqreceiver/factory.go @@ -20,13 +20,12 @@ import ( var errConfigNotRabbit = errors.New("config was not a RabbitMQ receiver config") -// NewFactory creates a new receiver factory. +// NewFactory creates a new receiver factory func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), - ) + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) } // createDefaultConfig creates the default configuration for the RabbitMQ receiver. diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index f48c9ba8ac88..c6137b5b0383 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -178,6 +178,7 @@ func TestScraperScrape(t *testing.T) { } expectedMetrics := tc.expectedMetricGen(t) + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp(), pmetrictest.IgnoreResourceMetricsOrder(), diff --git a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml index 4a81b72440d6..52940743f464 100644 --- a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml +++ b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml @@ -17,29 +17,29 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: 0 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "0" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" unit: '{consumers}' - description: The total number of messages currently in the queue. name: rabbitmq.message.current sum: aggregationTemporality: 2 dataPoints: - - asInt: 0 + - asInt: "0" attributes: - key: state value: stringValue: ready - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 - - asInt: 0 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asInt: "0" attributes: - key: state value: stringValue: unacknowledged - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" unit: '{messages}' scope: name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver @@ -62,18 +62,18 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: 1 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "1" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" unit: '{consumers}' - description: The number of messages acknowledged by consumers. name: rabbitmq.message.acknowledged sum: aggregationTemporality: 2 dataPoints: - - asInt: 7827 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "7827" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" isMonotonic: true unit: '{messages}' - description: The total number of messages currently in the queue. @@ -86,24 +86,24 @@ resourceMetrics: - key: state value: stringValue: ready - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" - asInt: 1 attributes: - key: state value: stringValue: unacknowledged - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" unit: '{messages}' - description: The number of messages delivered to consumers. name: rabbitmq.message.delivered sum: aggregationTemporality: 2 dataPoints: - - asInt: 7828 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "7828" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" isMonotonic: true unit: '{messages}' - description: The number of messages dropped as unroutable. @@ -111,9 +111,9 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: 0 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "0" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" isMonotonic: true unit: '{messages}' - description: The number of messages published to a queue. @@ -121,9 +121,9 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: 7830 - startTimeUnixNano: 1000000 - timeUnixNano: 2000000 + - asInt: "7830" + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" isMonotonic: true unit: '{messages}' scope: From 99fb3731dbef84280fae99572956631ed2f99ed1 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 00:13:11 +0530 Subject: [PATCH 07/17] Updated some nits --- receiver/rabbitmqreceiver/scraper.go | 13 ++++++++++--- receiver/rabbitmqreceiver/scraper_test.go | 7 +++++++ .../testdata/expected_metrics/metrics_golden.yaml | 4 ++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/receiver/rabbitmqreceiver/scraper.go b/receiver/rabbitmqreceiver/scraper.go index 48c8ab402fa9..90ea379caf1e 100644 --- a/receiver/rabbitmqreceiver/scraper.go +++ b/receiver/rabbitmqreceiver/scraper.go @@ -104,6 +104,7 @@ func (r *rabbitmqScraper) collectQueueMetrics(ctx context.Context, now pcommon.T return err } + // Collect metrics for each queue for _, queue := range queues { r.collectQueue(queue, now) } @@ -129,15 +130,19 @@ func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestam r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.ReadyMessages, metadata.AttributeMessageStateReady) for _, messageStatMetric := range messageStatMetrics { + // Get metric value val, ok := queue.MessageStats[messageStatMetric] + // A metric may not exist if the actions that increment it do not occur if !ok { - r.logger.Debug("metric not found", zap.String("metric", messageStatMetric), zap.String("queue", queue.Name)) + r.logger.Debug("metric not found", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name)) continue } + // Convert to int64 val64, ok := convertValToInt64(val) if !ok { - r.logger.Warn("metric not int64", zap.String("metric", messageStatMetric), zap.String("queue", queue.Name)) + // Log warning if the metric is not in the format we expect + r.logger.Warn("metric not int64", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name)) continue } @@ -190,7 +195,9 @@ func formatResourceAttributes(resource pcommon.Resource) string { return "{" + strings.Join(attributes, ", ") + "}" } -// convertValToInt64 converts a value to int64 +// convertValToInt64 values from message state unmarshal as float64s but should be int64. +// Need to do a double cast to get an int64. +// This should never fail but worth checking just in case. func convertValToInt64(val any) (int64, bool) { f64Val, ok := val.(float64) if !ok { diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index c6137b5b0383..69d68d82d8b1 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -27,6 +28,11 @@ import ( func TestScraperStart(t *testing.T) { clientConfigInvalid := confighttp.NewDefaultClientConfig() clientConfigInvalid.Endpoint = "invalid://endpoint" + clientConfigInvalid.TLSSetting = configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "/non/existent", + }, + } clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint @@ -114,6 +120,7 @@ func TestScraperScrape(t *testing.T) { desc: "Successful Queue Collection", setupMockClient: func(t *testing.T) client { mockClient := mocks.MockClient{} + // use helper function from client tests data := loadAPIResponseData(t, "get_queues_response.json") var queues []*models.Queue err := json.Unmarshal(data, &queues) diff --git a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml index 52940743f464..dbb7ec2e50e1 100644 --- a/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml +++ b/receiver/rabbitmqreceiver/testdata/expected_metrics/metrics_golden.yaml @@ -81,14 +81,14 @@ resourceMetrics: sum: aggregationTemporality: 2 dataPoints: - - asInt: 1 + - asInt: "1" attributes: - key: state value: stringValue: ready startTimeUnixNano: "1000000" timeUnixNano: "2000000" - - asInt: 1 + - asInt: "1" attributes: - key: state value: From 19d07ceb898a76049d850e54fae101717ebf7187 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 00:59:27 +0530 Subject: [PATCH 08/17] Updated some nits --- receiver/rabbitmqreceiver/client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/rabbitmqreceiver/client_test.go b/receiver/rabbitmqreceiver/client_test.go index 872d66e4beee..61055c7b4a58 100644 --- a/receiver/rabbitmqreceiver/client_test.go +++ b/receiver/rabbitmqreceiver/client_test.go @@ -57,7 +57,7 @@ func TestNewClient(t *testing.T) { host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: errors.New("failed to create HTTP client: failed to load TLS config"), + expectError: errors.New("failed to create HTTP Client: failed to load TLS config: failed to load CA CertPool File: failed to load cert /non/existent: open \\non\\existent: The system cannot find the path specified."), }, { desc: "Valid Configuration", @@ -130,7 +130,7 @@ func TestGetQueuesDetails(t *testing.T) { queues, err := tc.GetQueues(context.Background()) require.Nil(t, queues) - require.ErrorContains(t, err, "failed to decode response body: json: cannot unmarshal object into Go value of type []*models.Queue") + require.ErrorContains(t, err, "failed to decode response payload: json: cannot unmarshal object into Go value of type []*models.Queue") }, }, { From 50a9a01301ea5eefd5a4a4cda3a18797ca8543e8 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 01:06:59 +0530 Subject: [PATCH 09/17] Updated some nits --- receiver/rabbitmqreceiver/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/receiver/rabbitmqreceiver/client.go b/receiver/rabbitmqreceiver/client.go index 1b2025912255..8ec87ee3265d 100644 --- a/receiver/rabbitmqreceiver/client.go +++ b/receiver/rabbitmqreceiver/client.go @@ -31,6 +31,8 @@ type client interface { GetNodes(ctx context.Context) ([]*models.Node, error) } +var _ client = (*rabbitmqClient)(nil) + type rabbitmqClient struct { client *http.Client hostEndpoint string From 0904bca95ac6c642c79d5b46db2f697a6ad88c00 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 01:47:41 +0530 Subject: [PATCH 10/17] updated enable_node_metrics (default = rue) --- receiver/rabbitmqreceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/rabbitmqreceiver/README.md b/receiver/rabbitmqreceiver/README.md index 5a47c50139e3..b1329eb4d76e 100644 --- a/receiver/rabbitmqreceiver/README.md +++ b/receiver/rabbitmqreceiver/README.md @@ -34,7 +34,7 @@ The following settings are optional: - `endpoint` (default: `http://localhost:15672`): The URL of the node to be monitored. - `collection_interval` (default = `10s`): This receiver collects metrics on an interval. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. - `tls` (defaults defined [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)): TLS control. By default insecure settings are rejected and certificate verification is on. -- `enable_node_metrics` (default = `false`): Enables collection of RabbitMQ node-level metrics such as memory usage, file descriptors, and disk space. +- `enable_node_metrics` (default = `true`): Enables collection of RabbitMQ node-level metrics such as memory usage, file descriptors, and disk space. ### Example Configuration From 7b462e0a41e29a22ff0f8b418e8c77306fb47838 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 02:31:00 +0530 Subject: [PATCH 11/17] Rectified lint issues --- receiver/rabbitmqreceiver/client_test.go | 12 +++--- receiver/rabbitmqreceiver/scraper.go | 5 ++- receiver/rabbitmqreceiver/scraper_test.go | 52 +++++++++++------------ 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/receiver/rabbitmqreceiver/client_test.go b/receiver/rabbitmqreceiver/client_test.go index 61055c7b4a58..3344c477df5c 100644 --- a/receiver/rabbitmqreceiver/client_test.go +++ b/receiver/rabbitmqreceiver/client_test.go @@ -6,7 +6,6 @@ package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "encoding/json" - "errors" "net/http" "net/http/httptest" "os" @@ -47,7 +46,7 @@ func TestNewClient(t *testing.T) { host component.Host settings component.TelemetrySettings logger *zap.Logger - expectError error + expectError string // Updated to string to match substrings in the error }{ { desc: "Invalid HTTP config", @@ -57,7 +56,7 @@ func TestNewClient(t *testing.T) { host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: errors.New("failed to create HTTP Client: failed to load TLS config: failed to load CA CertPool File: failed to load cert /non/existent: open \\non\\existent: The system cannot find the path specified."), + expectError: "failed to create HTTP Client: failed to load TLS config", }, { desc: "Valid Configuration", @@ -69,16 +68,17 @@ func TestNewClient(t *testing.T) { host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: nil, + expectError: "", }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { ac, err := newClient(context.Background(), tc.cfg, tc.host, tc.settings, tc.logger) - if tc.expectError != nil { + if tc.expectError != "" { require.Nil(t, ac) - require.ErrorContains(t, err, tc.expectError.Error()) + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectError) // Check for the substring } else { require.NoError(t, err) diff --git a/receiver/rabbitmqreceiver/scraper.go b/receiver/rabbitmqreceiver/scraper.go index 90ea379caf1e..c4fbdd7cf690 100644 --- a/receiver/rabbitmqreceiver/scraper.go +++ b/receiver/rabbitmqreceiver/scraper.go @@ -63,12 +63,13 @@ func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) error return fmt.Errorf("invalid configuration: missing endpoint, username, or password") } - client, err := newClient(ctx, r.cfg, host, r.settings, r.logger) + rabbitClient, err := newClient(ctx, r.cfg, host, r.settings, r.logger) + if err != nil { return fmt.Errorf("failed to initialize RabbitMQ client: %w", err) } - r.client = client + r.client = rabbitClient return nil } diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index 69d68d82d8b1..81fa2ac6cdcb 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -30,51 +31,50 @@ func TestScraperStart(t *testing.T) { clientConfigInvalid.Endpoint = "invalid://endpoint" clientConfigInvalid.TLSSetting = configtls.ClientConfig{ Config: configtls.Config{ - CAFile: "/non/existent", + CAFile: "/non/existent", // Invalid CA file }, } clientConfig := confighttp.NewDefaultClientConfig() - clientConfig.Endpoint = defaultEndpoint + clientConfig.Endpoint = "http://localhost:15672" // Valid endpoint + // Test cases for the scraper start function. testcases := []struct { desc string - scraper *rabbitmqScraper + clientCfg confighttp.ClientConfig + username string + password string expectError bool }{ { - desc: "Bad Config", - scraper: &rabbitmqScraper{ - cfg: &Config{ - ClientConfig: confighttp.ClientConfig{ - Endpoint: "", // Invalid endpoint - }, - Username: "", // Missing username - Password: "", // Missing password - }, - settings: componenttest.NewNopTelemetrySettings(), - }, + desc: "Bad Config - Invalid Endpoint", + clientCfg: clientConfigInvalid, + username: "", // Missing username + password: "", // Missing password expectError: true, }, { - desc: "Valid Config", - scraper: &rabbitmqScraper{ - cfg: &Config{ - ClientConfig: confighttp.ClientConfig{ - Endpoint: "http://localhost:15672", - }, - Username: "valid_user", - Password: "valid_password", - }, - settings: componenttest.NewNopTelemetrySettings(), - }, + desc: "Valid Config", + clientCfg: clientConfig, + username: "valid_user", + password: "valid_password", expectError: false, }, } + // Run each test case. for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { - err := tc.scraper.start(context.Background(), componenttest.NewNopHost()) + scraper := &rabbitmqScraper{ + cfg: &Config{ + ClientConfig: tc.clientCfg, + Username: tc.username, + Password: configopaque.String(tc.password), // Convert to configopaque.String + }, + settings: componenttest.NewNopTelemetrySettings(), + } + + err := scraper.start(context.Background(), componenttest.NewNopHost()) if tc.expectError { require.Error(t, err) } else { From 9359f5b0cf9b58df13f685eac3349436cbd844af Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 03:10:11 +0530 Subject: [PATCH 12/17] Fix gofumpt formatting in scraper.go --- receiver/rabbitmqreceiver/scraper.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/rabbitmqreceiver/scraper.go b/receiver/rabbitmqreceiver/scraper.go index c4fbdd7cf690..de8c9da9cb99 100644 --- a/receiver/rabbitmqreceiver/scraper.go +++ b/receiver/rabbitmqreceiver/scraper.go @@ -64,7 +64,6 @@ func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) error } rabbitClient, err := newClient(ctx, r.cfg, host, r.settings, r.logger) - if err != nil { return fmt.Errorf("failed to initialize RabbitMQ client: %w", err) } From 846e529f49f03360b3fa71bb5424af59465d5f35 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 03:32:36 +0530 Subject: [PATCH 13/17] Run make generate to update generated code --- exporter/zipkinexporter/zipkin_test.go | 3 ++- .../internal/statsreader/statsreaders_mockedspanner_test.go | 3 ++- receiver/rabbitmqreceiver/generated_package_test.go | 3 ++- .../rabbitmqreceiver/internal/metadata/generated_metrics.go | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index f42423aaf6a9..da9835d89d6f 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -86,7 +86,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { require.NoError(t, mzr.Flush()) // We expect back the exact JSON that was received - wants := []string{` + wants := []string{ + ` [{ "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db", "kind": "CLIENT","name": "get", diff --git a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go index 6b750522373b..f38696a8d7fe 100644 --- a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go +++ b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go @@ -138,7 +138,8 @@ func TestStatsReaders_Read(t *testing.T) { op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ Database: databaseName, - Statements: []string{`CREATE TABLE STATS ( + Statements: []string{ + `CREATE TABLE STATS ( INTERVAL_END TIMESTAMP, METRIC_LABEL STRING(MAX), METRIC_VALUE INT64 diff --git a/receiver/rabbitmqreceiver/generated_package_test.go b/receiver/rabbitmqreceiver/generated_package_test.go index 121c329b647d..1581d54bfae2 100644 --- a/receiver/rabbitmqreceiver/generated_package_test.go +++ b/receiver/rabbitmqreceiver/generated_package_test.go @@ -3,8 +3,9 @@ package rabbitmqreceiver import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go index ecad05124c0a..840ca0fb39df 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics.go @@ -6,8 +6,8 @@ import ( "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/filter" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" ) From 8c2bb3999d9a3191c5c9b5485cf91cf0f41d4c5e Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Mon, 27 Jan 2025 03:58:36 +0530 Subject: [PATCH 14/17] Run make generate to update generated code and Fix gofumpt formatting issues --- exporter/zipkinexporter/zipkin_test.go | 3 ++- .../internal/statsreader/statsreaders_mockedspanner_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index da9835d89d6f..ae93c6ca945b 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -121,7 +121,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { "timestamp": 1472470996199000, "duration": 207000 }] - `} + `, + } for i, s := range wants { want := unmarshalZipkinSpanArrayToMap(t, s) gotBytes := buf.Next(int(sizes[i])) diff --git a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go index f38696a8d7fe..2da59a0da2e5 100644 --- a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go +++ b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go @@ -144,7 +144,8 @@ func TestStatsReaders_Read(t *testing.T) { METRIC_LABEL STRING(MAX), METRIC_VALUE INT64 ) PRIMARY KEY (METRIC_LABEL) - `}, + `, + }, }) require.NoError(t, err) From fc3183a68ce06a813e9e38a8d32daa09dfacdc49 Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Sun, 2 Feb 2025 17:09:38 +0530 Subject: [PATCH 15/17] aligned with the reviewer's feedback to make the PR non-breaking and structured --- exporter/zipkinexporter/zipkin_test.go | 6 +- internal/buildscripts/ocb-add-replaces.sh | 2 +- .../statsreaders_mockedspanner_test.go | 6 +- receiver/rabbitmqreceiver/README.md | 12 +- receiver/rabbitmqreceiver/client.go | 16 ++- receiver/rabbitmqreceiver/client_test.go | 113 ++++++++++++++---- receiver/rabbitmqreceiver/config.go | 5 - receiver/rabbitmqreceiver/config_test.go | 30 ++--- receiver/rabbitmqreceiver/documentation.md | 10 ++ receiver/rabbitmqreceiver/factory.go | 37 ++---- receiver/rabbitmqreceiver/factory_test.go | 23 +--- .../generated_component_test.go | 2 +- .../generated_package_test.go | 3 +- .../internal/metadata/generated_config.go | 8 +- .../metadata/generated_metrics_test.go | 4 - receiver/rabbitmqreceiver/metadata.yaml | 17 ++- receiver/rabbitmqreceiver/scraper.go | 68 ++++------- receiver/rabbitmqreceiver/scraper_test.go | 60 ++++------ 18 files changed, 210 insertions(+), 212 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index ae93c6ca945b..f42423aaf6a9 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -86,8 +86,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { require.NoError(t, mzr.Flush()) // We expect back the exact JSON that was received - wants := []string{ - ` + wants := []string{` [{ "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db", "kind": "CLIENT","name": "get", @@ -121,8 +120,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { "timestamp": 1472470996199000, "duration": 207000 }] - `, - } + `} for i, s := range wants { want := unmarshalZipkinSpanArrayToMap(t, s) gotBytes := buf.Next(int(sizes[i])) diff --git a/internal/buildscripts/ocb-add-replaces.sh b/internal/buildscripts/ocb-add-replaces.sh index a3d5a5ee365d..c7338480f12d 100755 --- a/internal/buildscripts/ocb-add-replaces.sh +++ b/internal/buildscripts/ocb-add-replaces.sh @@ -17,4 +17,4 @@ for mod_path in $local_mods; do mod=${mod_path#"."} # remove initial dot echo " - github.com/open-telemetry/opentelemetry-collector-contrib$mod => ../..$mod" >> "$CONFIG_OUT" done -echo "Wrote replace statements to $CONFIG_OUT" +echo "Wrote replace statements to $CONFIG_OUT" \ No newline at end of file diff --git a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go index 2da59a0da2e5..6b750522373b 100644 --- a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go +++ b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go @@ -138,14 +138,12 @@ func TestStatsReaders_Read(t *testing.T) { op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ Database: databaseName, - Statements: []string{ - `CREATE TABLE STATS ( + Statements: []string{`CREATE TABLE STATS ( INTERVAL_END TIMESTAMP, METRIC_LABEL STRING(MAX), METRIC_VALUE INT64 ) PRIMARY KEY (METRIC_LABEL) - `, - }, + `}, }) require.NoError(t, err) diff --git a/receiver/rabbitmqreceiver/README.md b/receiver/rabbitmqreceiver/README.md index 57043bce0b54..a4de25f6d93e 100644 --- a/receiver/rabbitmqreceiver/README.md +++ b/receiver/rabbitmqreceiver/README.md @@ -34,9 +34,7 @@ The following settings are optional: - `endpoint` (default: `http://localhost:15672`): The URL of the node to be monitored. - `collection_interval` (default = `10s`): This receiver collects metrics on an interval. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. - - `tls`: TLS control. [By default, insecure settings are rejected and certificate verification is on](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md). -- `enable_node_metrics` (default = `true`): Enables collection of RabbitMQ node-level metrics such as memory usage, file descriptors, and disk space. ### Example Configuration @@ -47,7 +45,15 @@ receivers: username: otelu password: ${env:RABBITMQ_PASSWORD} collection_interval: 10s - enable_node_metrics: true + metrics: # Enable node metrics by explicitly setting them to true + rabbitmq.node.disk_free: + enabled: true + rabbitmq.node.fd_used: + enabled: true + rabbitmq.node.mem_limit: + enabled: true + rabbitmq.node.mem_used: + enabled: true ``` The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md). diff --git a/receiver/rabbitmqreceiver/client.go b/receiver/rabbitmqreceiver/client.go index 8ec87ee3265d..79a44ca7dc77 100644 --- a/receiver/rabbitmqreceiver/client.go +++ b/receiver/rabbitmqreceiver/client.go @@ -27,7 +27,7 @@ const ( type client interface { // GetQueues calls "/api/queues" endpoint to get list of queues for the target node GetQueues(ctx context.Context) ([]*models.Queue, error) - // GetQueues calls "/api/nodes" endpoint to get list of nodes for the target node + // GetNodes calls "/api/nodes" endpoint to get list of nodes for the target node GetNodes(ctx context.Context) ([]*models.Node, error) } @@ -77,7 +77,7 @@ func (c *rabbitmqClient) GetNodes(ctx context.Context) ([]*models.Node, error) { var nodes []*models.Node if err := c.get(ctx, nodePath, &nodes); err != nil { - c.logger.Error("Failed to retrieve nodes", zap.Error(err)) + c.logger.Debug("Failed to retrieve nodes", zap.Error(err)) return nil, err } @@ -112,11 +112,15 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro if resp.StatusCode != http.StatusOK { c.logger.Debug("rabbitMQ API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode)) - payload, readErr := io.ReadAll(resp.Body) - if readErr == nil { - c.logger.Debug("Error response payload", zap.ByteString("error_payload", payload)) + // Attempt to extract the error payload + payloadData, err := io.ReadAll(resp.Body) + if err != nil { + c.logger.Debug("failed to read payload error message", zap.Error(err)) + } else { + c.logger.Debug("rabbitMQ API Error", zap.ByteString("api_error", payloadData)) } - return fmt.Errorf("non-200 response code: %d", resp.StatusCode) + + return fmt.Errorf("non 200 code returned %d", resp.StatusCode) } // Decode the payload into the passed in response object diff --git a/receiver/rabbitmqreceiver/client_test.go b/receiver/rabbitmqreceiver/client_test.go index 3344c477df5c..ce7c5147cf9e 100644 --- a/receiver/rabbitmqreceiver/client_test.go +++ b/receiver/rabbitmqreceiver/client_test.go @@ -6,6 +6,7 @@ package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "os" @@ -29,9 +30,9 @@ const ( ) func TestNewClient(t *testing.T) { - clientConfigInvalid := confighttp.NewDefaultClientConfig() - clientConfigInvalid.Endpoint = defaultEndpoint - clientConfigInvalid.TLSSetting = configtls.ClientConfig{ + clientConfigNonExistandCA := confighttp.NewDefaultClientConfig() + clientConfigNonExistandCA.Endpoint = defaultEndpoint + clientConfigNonExistandCA.TLSSetting = configtls.ClientConfig{ Config: configtls.Config{ CAFile: "/non/existent", }, @@ -40,45 +41,42 @@ func TestNewClient(t *testing.T) { clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint - testCases := []struct { + testCase := []struct { desc string cfg *Config host component.Host settings component.TelemetrySettings logger *zap.Logger - expectError string // Updated to string to match substrings in the error + expectError error }{ { desc: "Invalid HTTP config", cfg: &Config{ - ClientConfig: clientConfigInvalid, + ClientConfig: clientConfigNonExistandCA, }, host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: "failed to create HTTP Client: failed to load TLS config", + expectError: errors.New("failed to create HTTP Client"), }, { desc: "Valid Configuration", cfg: &Config{ ClientConfig: clientConfig, - Username: "valid_user", - Password: "valid_password", }, host: componenttest.NewNopHost(), settings: componenttest.NewNopTelemetrySettings(), logger: zap.NewNop(), - expectError: "", + expectError: nil, }, } - for _, tc := range testCases { + for _, tc := range testCase { t.Run(tc.desc, func(t *testing.T) { ac, err := newClient(context.Background(), tc.cfg, tc.host, tc.settings, tc.logger) - if tc.expectError != "" { + if tc.expectError != nil { require.Nil(t, ac) - require.Error(t, err) - require.Contains(t, err.Error(), tc.expectError) // Check for the substring + require.ErrorContains(t, err, tc.expectError.Error()) } else { require.NoError(t, err) @@ -111,9 +109,9 @@ func TestGetQueuesDetails(t *testing.T) { tc := createTestClient(t, ts.URL) - queues, err := tc.GetQueues(context.Background()) - require.Nil(t, queues) - require.EqualError(t, err, "non-200 response code: 401") + clusters, err := tc.GetQueues(context.Background()) + require.Nil(t, clusters) + require.EqualError(t, err, "non 200 code returned 401") }, }, { @@ -128,13 +126,13 @@ func TestGetQueuesDetails(t *testing.T) { tc := createTestClient(t, ts.URL) - queues, err := tc.GetQueues(context.Background()) - require.Nil(t, queues) - require.ErrorContains(t, err, "failed to decode response payload: json: cannot unmarshal object into Go value of type []*models.Queue") + clusters, err := tc.GetQueues(context.Background()) + require.Nil(t, clusters) + require.ErrorContains(t, err, "failed to decode response payload") }, }, { - desc: "Successful Queue API call", + desc: "Successful call", testFunc: func(t *testing.T) { data := loadAPIResponseData(t, queuesAPIResponseFile) @@ -152,9 +150,78 @@ func TestGetQueuesDetails(t *testing.T) { err := json.Unmarshal(data, &expected) require.NoError(t, err) - queues, err := tc.GetQueues(context.Background()) + clusters, err := tc.GetQueues(context.Background()) + require.NoError(t, err) + require.Equal(t, expected, clusters) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, tc.testFunc) + } +} + +func TestGetNodesDetails(t *testing.T) { + testCases := []struct { + desc string + testFunc func(*testing.T) + }{ + { + desc: "Non-200 Response for GetNodes", + testFunc: func(t *testing.T) { + // Setup test server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusForbidden) + })) + defer ts.Close() + + tc := createTestClient(t, ts.URL) + + nodes, err := tc.GetNodes(context.Background()) + require.Nil(t, nodes) + require.EqualError(t, err, "non 200 code returned 403") + }, + }, + { + desc: "Bad payload returned for GetNodes", + testFunc: func(t *testing.T) { + // Setup test server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte("{invalid-json}")) + assert.NoError(t, err) + })) + defer ts.Close() + + tc := createTestClient(t, ts.URL) + + nodes, err := tc.GetNodes(context.Background()) + require.Nil(t, nodes) + require.ErrorContains(t, err, "failed to decode response payload") + }, + }, + { + desc: "Successful GetNodes call", + testFunc: func(t *testing.T) { + data := loadAPIResponseData(t, nodesAPIResponseFile) + + // Setup test server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write(data) + assert.NoError(t, err) + })) + defer ts.Close() + + tc := createTestClient(t, ts.URL) + + // Load the valid data into a struct to compare + var expected []*models.Node + err := json.Unmarshal(data, &expected) + require.NoError(t, err) + + nodes, err := tc.GetNodes(context.Background()) require.NoError(t, err) - require.Equal(t, expected, queues) + require.Equal(t, expected, nodes) }, }, } diff --git a/receiver/rabbitmqreceiver/config.go b/receiver/rabbitmqreceiver/config.go index 30475b6a987a..3479a8ae8b4d 100644 --- a/receiver/rabbitmqreceiver/config.go +++ b/receiver/rabbitmqreceiver/config.go @@ -31,25 +31,20 @@ type Config struct { confighttp.ClientConfig `mapstructure:",squash"` Username string `mapstructure:"username"` Password configopaque.String `mapstructure:"password"` - EnableNodeMetrics bool `mapstructure:"enable_node_metrics"` // Added flag for node metrics metadata.MetricsBuilderConfig `mapstructure:",squash"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { var err []error - - // Validate username if cfg.Username == "" { err = append(err, errMissingUsername) } - // Validate password if cfg.Password == "" { err = append(err, errMissingPassword) } - // Validate endpoint _, parseErr := url.Parse(cfg.Endpoint) if parseErr != nil { wrappedErr := fmt.Errorf("%s: %w", errInvalidEndpoint.Error(), parseErr) diff --git a/receiver/rabbitmqreceiver/config_test.go b/receiver/rabbitmqreceiver/config_test.go index 4bafdaadf1a3..dfcdfefe5cda 100644 --- a/receiver/rabbitmqreceiver/config_test.go +++ b/receiver/rabbitmqreceiver/config_test.go @@ -20,7 +20,7 @@ import ( func TestValidate(t *testing.T) { clientConfigInvalid := confighttp.NewDefaultClientConfig() - clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg" + clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg" clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = defaultEndpoint @@ -38,7 +38,7 @@ func TestValidate(t *testing.T) { expectedErr: errors.Join( errMissingUsername, errMissingPassword, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)), }, { desc: "missing password and invalid endpoint", @@ -48,7 +48,7 @@ func TestValidate(t *testing.T) { }, expectedErr: errors.Join( errMissingPassword, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { @@ -59,7 +59,7 @@ func TestValidate(t *testing.T) { }, expectedErr: errors.Join( errMissingUsername, - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { @@ -70,26 +70,15 @@ func TestValidate(t *testing.T) { ClientConfig: clientConfigInvalid, }, expectedErr: errors.Join( - fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), + fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`), ), }, { - desc: "valid config with node metrics enabled", + desc: "valid config", cfg: &Config{ - Username: "otelu", - Password: "otelp", - ClientConfig: clientConfig, - EnableNodeMetrics: true, - }, - expectedErr: nil, - }, - { - desc: "valid config with node metrics disabled", - cfg: &Config{ - Username: "otelu", - Password: "otelp", - ClientConfig: clientConfig, - EnableNodeMetrics: false, + Username: "otelu", + Password: "otelp", + ClientConfig: clientConfig, }, expectedErr: nil, }, @@ -123,7 +112,6 @@ func TestLoadConfig(t *testing.T) { expected.Username = "otelu" expected.Password = "${env:RABBITMQ_PASSWORD}" expected.CollectionInterval = 10 * time.Second - expected.EnableNodeMetrics = true require.Equal(t, expected, cfg) } diff --git a/receiver/rabbitmqreceiver/documentation.md b/receiver/rabbitmqreceiver/documentation.md index cdcaa56e5014..64a16369976f 100644 --- a/receiver/rabbitmqreceiver/documentation.md +++ b/receiver/rabbitmqreceiver/documentation.md @@ -66,6 +66,16 @@ The number of messages published to a queue. | ---- | ----------- | ---------- | ----------------------- | --------- | | {messages} | Sum | Int | Cumulative | true | +## Optional Metrics + +The following metrics are not emitted by default. Each of them can be enabled by applying the following configuration: + +```yaml +metrics: + : + enabled: true +``` + ### rabbitmq.node.disk_free Free disk space on the node. diff --git a/receiver/rabbitmqreceiver/factory.go b/receiver/rabbitmqreceiver/factory.go index e42b500e0273..b863e40769fa 100644 --- a/receiver/rabbitmqreceiver/factory.go +++ b/receiver/rabbitmqreceiver/factory.go @@ -28,49 +28,32 @@ func NewFactory() receiver.Factory { receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) } -// createDefaultConfig creates the default configuration for the RabbitMQ receiver. func createDefaultConfig() component.Config { - defaultControllerConfig := scraperhelper.NewDefaultControllerConfig() - defaultControllerConfig.CollectionInterval = 10 * time.Second + cfg := scraperhelper.NewDefaultControllerConfig() + cfg.CollectionInterval = 10 * time.Second - defaultClientConfig := confighttp.NewDefaultClientConfig() - defaultClientConfig.Endpoint = defaultEndpoint - defaultClientConfig.Timeout = 10 * time.Second + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = defaultEndpoint + clientConfig.Timeout = 10 * time.Second return &Config{ - ControllerConfig: defaultControllerConfig, - ClientConfig: defaultClientConfig, + ControllerConfig: cfg, + ClientConfig: clientConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), - EnableNodeMetrics: true, // Default to enabling node metrics. } } -// createMetricsReceiver creates the metrics receiver for RabbitMQ. -func createMetricsReceiver( - _ context.Context, - params receiver.Settings, - rConf component.Config, - consumer consumer.Metrics, -) (receiver.Metrics, error) { +func createMetricsReceiver(_ context.Context, params receiver.Settings, rConf component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { cfg, ok := rConf.(*Config) if !ok { return nil, errConfigNotRabbit } rabbitScraper := newScraper(params.Logger, cfg, params) - - s, err := scraper.NewMetrics( - rabbitScraper.scrape, - scraper.WithStart(rabbitScraper.start), - ) + s, err := scraper.NewMetrics(rabbitScraper.scrape, scraper.WithStart(rabbitScraper.start)) if err != nil { return nil, err } - return scraperhelper.NewMetricsController( - &cfg.ControllerConfig, - params, - consumer, - scraperhelper.AddScraper(metadata.Type, s), - ) + return scraperhelper.NewMetricsController(&cfg.ControllerConfig, params, consumer, scraperhelper.AddScraper(metadata.Type, s)) } diff --git a/receiver/rabbitmqreceiver/factory_test.go b/receiver/rabbitmqreceiver/factory_test.go index 4b8cbf54bcd6..2e9900df69cc 100644 --- a/receiver/rabbitmqreceiver/factory_test.go +++ b/receiver/rabbitmqreceiver/factory_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" @@ -38,14 +39,13 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory() - expectedCfg := &Config{ + var expectedCfg component.Config = &Config{ ControllerConfig: scraperhelper.ControllerConfig{ CollectionInterval: 10 * time.Second, InitialDelay: time.Second, }, ClientConfig: clientConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), - EnableNodeMetrics: true, // Ensure default includes EnableNodeMetrics } require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) @@ -72,29 +72,12 @@ func TestNewFactory(t *testing.T) { _, err := factory.CreateMetrics( context.Background(), receivertest.NewNopSettings(), - nil, // Passing nil config + nil, consumertest.NewNop(), ) require.ErrorIs(t, err, errConfigNotRabbit) }, }, - { - desc: "returns error if EnableNodeMetrics is false but metrics are attempted", - testFunc: func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) - cfg.EnableNodeMetrics = false - - _, err := factory.CreateMetrics( - context.Background(), - receivertest.NewNopSettings(), - cfg, - consumertest.NewNop(), - ) - // No error expected since node metrics disabling shouldn't block creation. - require.NoError(t, err) - }, - }, } for _, tc := range testCases { diff --git a/receiver/rabbitmqreceiver/generated_component_test.go b/receiver/rabbitmqreceiver/generated_component_test.go index ee6a6107a354..15fab37f9bfd 100644 --- a/receiver/rabbitmqreceiver/generated_component_test.go +++ b/receiver/rabbitmqreceiver/generated_component_test.go @@ -27,8 +27,8 @@ func TestComponentLifecycle(t *testing.T) { factory := NewFactory() tests := []struct { - createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) name string + createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) }{ { diff --git a/receiver/rabbitmqreceiver/generated_package_test.go b/receiver/rabbitmqreceiver/generated_package_test.go index 1581d54bfae2..121c329b647d 100644 --- a/receiver/rabbitmqreceiver/generated_package_test.go +++ b/receiver/rabbitmqreceiver/generated_package_test.go @@ -3,9 +3,8 @@ package rabbitmqreceiver import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_config.go b/receiver/rabbitmqreceiver/internal/metadata/generated_config.go index 14815f0a8bdb..d26a59d3349b 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_config.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_config.go @@ -61,16 +61,16 @@ func DefaultMetricsConfig() MetricsConfig { Enabled: true, }, RabbitmqNodeDiskFree: MetricConfig{ - Enabled: true, + Enabled: false, }, RabbitmqNodeFdUsed: MetricConfig{ - Enabled: true, + Enabled: false, }, RabbitmqNodeMemLimit: MetricConfig{ - Enabled: true, + Enabled: false, }, RabbitmqNodeMemUsed: MetricConfig{ - Enabled: true, + Enabled: false, }, } } diff --git a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go index c8e7daf9c881..398b67ee2eba 100644 --- a/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/rabbitmqreceiver/internal/metadata/generated_metrics_test.go @@ -92,19 +92,15 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordRabbitmqMessagePublishedDataPoint(ts, 1) - defaultMetricsCount++ allMetricsCount++ mb.RecordRabbitmqNodeDiskFreeDataPoint(ts, 1) - defaultMetricsCount++ allMetricsCount++ mb.RecordRabbitmqNodeFdUsedDataPoint(ts, 1) - defaultMetricsCount++ allMetricsCount++ mb.RecordRabbitmqNodeMemLimitDataPoint(ts, 1) - defaultMetricsCount++ allMetricsCount++ mb.RecordRabbitmqNodeMemUsedDataPoint(ts, 1) diff --git a/receiver/rabbitmqreceiver/metadata.yaml b/receiver/rabbitmqreceiver/metadata.yaml index 5958d96ea005..46aa4b8dc642 100644 --- a/receiver/rabbitmqreceiver/metadata.yaml +++ b/receiver/rabbitmqreceiver/metadata.yaml @@ -6,6 +6,15 @@ tests: # generated_component_test.go looking for test.config from m username: "testuser" password: "testpassword" collection_interval: 10s + metrics: + rabbitmq.node.disk_free: + enabled: true + rabbitmq.node.fd_used: + enabled: true + rabbitmq.node.mem_limit: + enabled: true + rabbitmq.node.mem_used: + enabled: true status: class: receiver @@ -96,7 +105,7 @@ metrics: monotonic: false aggregation_temporality: cumulative value_type: int - enabled: true + enabled: false rabbitmq.node.fd_used: description: The number of file descriptors used on the node. unit: "{fd}" @@ -104,7 +113,7 @@ metrics: monotonic: false aggregation_temporality: cumulative value_type: int - enabled: true + enabled: false rabbitmq.node.mem_limit: description: The memory limit on the node. unit: "{bytes}" @@ -112,7 +121,7 @@ metrics: monotonic: false aggregation_temporality: cumulative value_type: int - enabled: true + enabled: false rabbitmq.node.mem_used: description: The memory used on the node. unit: "{bytes}" @@ -120,4 +129,4 @@ metrics: monotonic: false aggregation_temporality: cumulative value_type: int - enabled: true + enabled: false diff --git a/receiver/rabbitmqreceiver/scraper.go b/receiver/rabbitmqreceiver/scraper.go index de8c9da9cb99..4fcebcbd7bc5 100644 --- a/receiver/rabbitmqreceiver/scraper.go +++ b/receiver/rabbitmqreceiver/scraper.go @@ -6,8 +6,6 @@ package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "errors" - "fmt" - "strings" "time" "go.opentelemetry.io/collector/component" @@ -57,42 +55,36 @@ func newScraper(logger *zap.Logger, cfg *Config, settings receiver.Settings) *ra } } -// start starts the scraper by creating a new HTTP Client -func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) error { - if r.cfg.Endpoint == "" || r.cfg.Username == "" || r.cfg.Password == "" { - return fmt.Errorf("invalid configuration: missing endpoint, username, or password") - } - - rabbitClient, err := newClient(ctx, r.cfg, host, r.settings, r.logger) - if err != nil { - return fmt.Errorf("failed to initialize RabbitMQ client: %w", err) - } - - r.client = rabbitClient - return nil +// start starts the scraper by creating a new HTTP Client on the scraper +func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) (err error) { + r.client, err = newClient(ctx, r.cfg, host, r.settings, r.logger) + return } // scrape collects metrics from the RabbitMQ API func (r *rabbitmqScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { now := pcommon.NewTimestampFromTime(time.Now()) - // Validate client initialization + // Validate we don't attempt to scrape without initializing the client if r.client == nil { return pmetric.NewMetrics(), errClientNotInit } + var errs []error // Collect multiple errors instead of failing immediately + // Collect queue metrics if err := r.collectQueueMetrics(ctx, now); err != nil { - r.logger.Error("failed to collect queue metrics", zap.Error(err)) - return pmetric.NewMetrics(), err + errs = append(errs, err) } - // Collect node metrics if enabled - if r.cfg.EnableNodeMetrics { - if err := r.collectNodeMetrics(ctx, now); err != nil { - r.logger.Error("failed to collect node metrics", zap.Error(err)) - return pmetric.NewMetrics(), err - } + // Collect node metrics + if err := r.collectNodeMetrics(ctx, now); err != nil { + errs = append(errs, err) + } + + // If there are errors, return them as a combined error + if len(errs) > 0 { + return pmetric.NewMetrics(), errors.Join(errs...) } return r.mb.Emit(), nil @@ -116,14 +108,13 @@ func (r *rabbitmqScraper) collectNodeMetrics(ctx context.Context, now pcommon.Ti if err != nil { return err } - for _, node := range nodes { r.collectNode(node, now) } return nil } -// collectQueue collects metrics for a specific queue +// collectQueue collects metrics func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestamp) { r.mb.RecordRabbitmqConsumerCountDataPoint(now, queue.Consumers) r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.UnacknowledgedMessages, metadata.AttributeMessageStateUnacknowledged) @@ -157,7 +148,6 @@ func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestam r.mb.RecordRabbitmqMessageDroppedDataPoint(now, val64) } } - rb := r.mb.NewResourceBuilder() rb.SetRabbitmqQueueName(queue.Name) rb.SetRabbitmqNodeName(queue.Node) @@ -165,34 +155,17 @@ func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestam r.mb.EmitForResource(metadata.WithResource(rb.Emit())) } +// collectNode collects metrics for a specific RabbitMQ node func (r *rabbitmqScraper) collectNode(node *models.Node, now pcommon.Timestamp) { - // Record node-specific metrics r.mb.RecordRabbitmqNodeDiskFreeDataPoint(now, node.DiskFree) r.mb.RecordRabbitmqNodeFdUsedDataPoint(now, node.FDUsed) r.mb.RecordRabbitmqNodeMemLimitDataPoint(now, node.MemLimit) r.mb.RecordRabbitmqNodeMemUsedDataPoint(now, node.MemUsed) - // Build resource attributes for the node rb := r.mb.NewResourceBuilder() rb.SetRabbitmqNodeName(node.Name) - // Emit resource and log attributes for debugging - resource := rb.Emit() - r.logger.Debug("Emitting resource for node", zap.String("attributes", formatResourceAttributes(resource))) - - // Emit metrics for the node - r.mb.EmitForResource(metadata.WithResource(resource)) -} - -// formatResourceAttributes converts resource attributes to a string for logging -func formatResourceAttributes(resource pcommon.Resource) string { - attrs := resource.Attributes() - var attributes []string - attrs.Range(func(k string, v pcommon.Value) bool { - attributes = append(attributes, k+"="+v.AsString()) - return true - }) - return "{" + strings.Join(attributes, ", ") + "}" + r.mb.EmitForResource(metadata.WithResource(rb.Emit())) } // convertValToInt64 values from message state unmarshal as float64s but should be int64. @@ -201,7 +174,8 @@ func formatResourceAttributes(resource pcommon.Resource) string { func convertValToInt64(val any) (int64, bool) { f64Val, ok := val.(float64) if !ok { - return 0, false + return 0, ok } + return int64(f64Val), true } diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index 81fa2ac6cdcb..66ddd2817f2f 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -27,54 +26,47 @@ import ( ) func TestScraperStart(t *testing.T) { - clientConfigInvalid := confighttp.NewDefaultClientConfig() - clientConfigInvalid.Endpoint = "invalid://endpoint" - clientConfigInvalid.TLSSetting = configtls.ClientConfig{ + clientConfigNonExistandCA := confighttp.NewDefaultClientConfig() + clientConfigNonExistandCA.Endpoint = defaultEndpoint + clientConfigNonExistandCA.TLSSetting = configtls.ClientConfig{ Config: configtls.Config{ - CAFile: "/non/existent", // Invalid CA file + CAFile: "/non/existent", }, } clientConfig := confighttp.NewDefaultClientConfig() - clientConfig.Endpoint = "http://localhost:15672" // Valid endpoint + clientConfig.Endpoint = defaultEndpoint - // Test cases for the scraper start function. testcases := []struct { desc string - clientCfg confighttp.ClientConfig - username string - password string + scraper *rabbitmqScraper expectError bool }{ { - desc: "Bad Config - Invalid Endpoint", - clientCfg: clientConfigInvalid, - username: "", // Missing username - password: "", // Missing password + desc: "Bad Config", + scraper: &rabbitmqScraper{ + cfg: &Config{ + ClientConfig: clientConfigNonExistandCA, + }, + settings: componenttest.NewNopTelemetrySettings(), + }, expectError: true, }, { - desc: "Valid Config", - clientCfg: clientConfig, - username: "valid_user", - password: "valid_password", + desc: "Valid Config", + scraper: &rabbitmqScraper{ + cfg: &Config{ + ClientConfig: clientConfig, + }, + settings: componenttest.NewNopTelemetrySettings(), + }, expectError: false, }, } - // Run each test case. for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { - scraper := &rabbitmqScraper{ - cfg: &Config{ - ClientConfig: tc.clientCfg, - Username: tc.username, - Password: configopaque.String(tc.password), // Convert to configopaque.String - }, - settings: componenttest.NewNopTelemetrySettings(), - } - - err := scraper.start(context.Background(), componenttest.NewNopHost()) + err := tc.scraper.start(context.Background(), componenttest.NewNopHost()) if tc.expectError { require.Error(t, err) } else { @@ -88,7 +80,6 @@ func TestScraperScrape(t *testing.T) { testCases := []struct { desc string setupMockClient func(t *testing.T) client - enableNodeMetrics bool expectedMetricGen func(t *testing.T) pmetric.Metrics expectedErr error }{ @@ -97,7 +88,6 @@ func TestScraperScrape(t *testing.T) { setupMockClient: func(*testing.T) client { return nil }, - enableNodeMetrics: false, expectedMetricGen: func(*testing.T) pmetric.Metrics { return pmetric.NewMetrics() }, @@ -108,28 +98,27 @@ func TestScraperScrape(t *testing.T) { setupMockClient: func(*testing.T) client { mockClient := mocks.MockClient{} mockClient.On("GetQueues", mock.Anything).Return(nil, errors.New("some api error")) + mockClient.On("GetNodes", mock.Anything).Return(nil, errors.New("some api error")) // Add this return &mockClient }, - enableNodeMetrics: false, expectedMetricGen: func(*testing.T) pmetric.Metrics { return pmetric.NewMetrics() }, - expectedErr: errors.New("some api error"), + expectedErr: errors.New("some api error\nsome api error"), }, { desc: "Successful Queue Collection", setupMockClient: func(t *testing.T) client { mockClient := mocks.MockClient{} - // use helper function from client tests data := loadAPIResponseData(t, "get_queues_response.json") var queues []*models.Queue err := json.Unmarshal(data, &queues) require.NoError(t, err) mockClient.On("GetQueues", mock.Anything).Return(queues, nil) + mockClient.On("GetNodes", mock.Anything).Return(nil, nil) return &mockClient }, - enableNodeMetrics: false, expectedMetricGen: func(t *testing.T) pmetric.Metrics { goldenPath := filepath.Join("testdata", "expected_metrics", "metrics_golden.yaml") expectedMetrics, err := golden.ReadMetrics(goldenPath) @@ -175,7 +164,6 @@ func TestScraperScrape(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { scraper := newScraper(zap.NewNop(), createDefaultConfig().(*Config), receivertest.NewNopSettings()) scraper.client = tc.setupMockClient(t) - scraper.cfg.EnableNodeMetrics = tc.enableNodeMetrics actualMetrics, err := scraper.scrape(context.Background()) if tc.expectedErr == nil { From b3e959dcd62f9e54c2fedc3652e78865dac34d1c Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Sun, 2 Feb 2025 17:53:59 +0530 Subject: [PATCH 16/17] rectified CodeGen issues --- exporter/zipkinexporter/zipkin_test.go | 3 ++- .../statsreader/statsreaders_mockedspanner_test.go | 3 ++- receiver/rabbitmqreceiver/generated_package_test.go | 3 ++- receiver/rabbitmqreceiver/metadata.yaml | 1 - receiver/rabbitmqreceiver/scraper_test.go | 8 ++++---- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index f42423aaf6a9..da9835d89d6f 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -86,7 +86,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { require.NoError(t, mzr.Flush()) // We expect back the exact JSON that was received - wants := []string{` + wants := []string{ + ` [{ "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db", "kind": "CLIENT","name": "get", diff --git a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go index 6b750522373b..f38696a8d7fe 100644 --- a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go +++ b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go @@ -138,7 +138,8 @@ func TestStatsReaders_Read(t *testing.T) { op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ Database: databaseName, - Statements: []string{`CREATE TABLE STATS ( + Statements: []string{ + `CREATE TABLE STATS ( INTERVAL_END TIMESTAMP, METRIC_LABEL STRING(MAX), METRIC_VALUE INT64 diff --git a/receiver/rabbitmqreceiver/generated_package_test.go b/receiver/rabbitmqreceiver/generated_package_test.go index 121c329b647d..1581d54bfae2 100644 --- a/receiver/rabbitmqreceiver/generated_package_test.go +++ b/receiver/rabbitmqreceiver/generated_package_test.go @@ -3,8 +3,9 @@ package rabbitmqreceiver import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { diff --git a/receiver/rabbitmqreceiver/metadata.yaml b/receiver/rabbitmqreceiver/metadata.yaml index 46aa4b8dc642..abc6b7c405c9 100644 --- a/receiver/rabbitmqreceiver/metadata.yaml +++ b/receiver/rabbitmqreceiver/metadata.yaml @@ -47,7 +47,6 @@ attributes: enum: - ready - unacknowledged - metrics: rabbitmq.consumer.count: description: The number of consumers currently reading from the queue. diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index 66ddd2817f2f..df206be791d5 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -98,7 +98,7 @@ func TestScraperScrape(t *testing.T) { setupMockClient: func(*testing.T) client { mockClient := mocks.MockClient{} mockClient.On("GetQueues", mock.Anything).Return(nil, errors.New("some api error")) - mockClient.On("GetNodes", mock.Anything).Return(nil, errors.New("some api error")) // Add this + mockClient.On("GetNodes", mock.Anything).Return(nil, errors.New("some api error")) return &mockClient }, expectedMetricGen: func(*testing.T) pmetric.Metrics { @@ -110,7 +110,7 @@ func TestScraperScrape(t *testing.T) { desc: "Successful Queue Collection", setupMockClient: func(t *testing.T) client { mockClient := mocks.MockClient{} - data := loadAPIResponseData(t, "get_queues_response.json") + data := loadAPIResponseData(t, queuesAPIResponseFile) var queues []*models.Queue err := json.Unmarshal(data, &queues) require.NoError(t, err) @@ -133,13 +133,13 @@ func TestScraperScrape(t *testing.T) { mockClient := mocks.MockClient{} // Mock data for nodes - nodeData := loadAPIResponseData(t, "get_nodes_response.json") + nodeData := loadAPIResponseData(t, nodesAPIResponseFile) var nodes []*models.Node err := json.Unmarshal(nodeData, &nodes) require.NoError(t, err) // Mock data for queues - queueData := loadAPIResponseData(t, "get_queues_response.json") + queueData := loadAPIResponseData(t, queuesAPIResponseFile) var queues []*models.Queue err = json.Unmarshal(queueData, &queues) require.NoError(t, err) From 7cdba65b646168554521c6a454d49a071e4eb85e Mon Sep 17 00:00:00 2001 From: venu emmadi Date: Sun, 2 Feb 2025 18:21:38 +0530 Subject: [PATCH 17/17] rectified Lint issues --- exporter/zipkinexporter/zipkin_test.go | 3 ++- .../internal/statsreader/statsreaders_mockedspanner_test.go | 3 ++- receiver/rabbitmqreceiver/scraper_test.go | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index da9835d89d6f..ae93c6ca945b 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -121,7 +121,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { "timestamp": 1472470996199000, "duration": 207000 }] - `} + `, + } for i, s := range wants { want := unmarshalZipkinSpanArrayToMap(t, s) gotBytes := buf.Next(int(sizes[i])) diff --git a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go index f38696a8d7fe..2da59a0da2e5 100644 --- a/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go +++ b/receiver/googlecloudspannerreceiver/internal/statsreader/statsreaders_mockedspanner_test.go @@ -144,7 +144,8 @@ func TestStatsReaders_Read(t *testing.T) { METRIC_LABEL STRING(MAX), METRIC_VALUE INT64 ) PRIMARY KEY (METRIC_LABEL) - `}, + `, + }, }) require.NoError(t, err) diff --git a/receiver/rabbitmqreceiver/scraper_test.go b/receiver/rabbitmqreceiver/scraper_test.go index df206be791d5..f9dcf5a2ef9d 100644 --- a/receiver/rabbitmqreceiver/scraper_test.go +++ b/receiver/rabbitmqreceiver/scraper_test.go @@ -110,6 +110,7 @@ func TestScraperScrape(t *testing.T) { desc: "Successful Queue Collection", setupMockClient: func(t *testing.T) client { mockClient := mocks.MockClient{} + // use helper function from client tests data := loadAPIResponseData(t, queuesAPIResponseFile) var queues []*models.Queue err := json.Unmarshal(data, &queues)