Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/rabbitmq] Enhancement/rabbitmq Added node level metrics #37495

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bfba7ed
Enhance RabbitMQ receiver with node metrics support
VenuEmmadi Jan 18, 2025
42c2c7a
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 18, 2025
ceb2cee
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 19, 2025
4498f2b
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 20, 2025
c0e24b4
Here’s the changelog entry
VenuEmmadi Jan 20, 2025
8c105df
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 20, 2025
88d22a0
added issue id
VenuEmmadi Jan 20, 2025
f59d9df
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 21, 2025
9b6e76d
Updated Readme and metadata.yaml
VenuEmmadi Jan 21, 2025
b1c8824
updated some nits
VenuEmmadi Jan 21, 2025
b505883
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 24, 2025
56a98d2
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Jan 26, 2025
31f7a2c
Updated some nits
VenuEmmadi Jan 26, 2025
99fb373
Updated some nits
VenuEmmadi Jan 26, 2025
19d07ce
Updated some nits
VenuEmmadi Jan 26, 2025
50a9a01
Updated some nits
VenuEmmadi Jan 26, 2025
0904bca
updated enable_node_metrics (default = rue)
VenuEmmadi Jan 26, 2025
7b462e0
Rectified lint issues
VenuEmmadi Jan 26, 2025
9359f5b
Fix gofumpt formatting in scraper.go
VenuEmmadi Jan 26, 2025
846e529
Run make generate to update generated code
VenuEmmadi Jan 26, 2025
8c2bb39
Run make generate to update generated code and Fix gofumpt formatting…
VenuEmmadi Jan 26, 2025
7fb8eb5
Merge branch 'main' into enhancement/rabbitmq-node-metrics
VenuEmmadi Feb 1, 2025
fc3183a
aligned with the reviewer's feedback to make the PR non-breaking and …
VenuEmmadi Feb 2, 2025
b3e959d
rectified CodeGen issues
VenuEmmadi Feb 2, 2025
7cdba65
rectified Lint issues
VenuEmmadi Feb 2, 2025
702e96e
Merge branch 'open-telemetry:main' into enhancement/rabbitmq-node-met…
VenuEmmadi Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/rabbitmqreceiver_node_metrics_enhancement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: rabbitmqreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Enhance the RabbitMQ receiver to collect and report node-level metrics (`rabbitmq.node.disk_free`, `rabbitmq.node.fd_used`, `rabbitmq.node.mem_limit`, and `rabbitmq.node.mem_used`). This provides additional observability into the state and resource usage of RabbitMQ nodes."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36925]
6 changes: 4 additions & 2 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
require.NoError(t, mzr.Flush())

// We expect back the exact JSON that was received
wants := []string{`
wants := []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert changes to the zipkin exporter in this PR

`
[{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db",
"kind": "CLIENT","name": "get",
Expand Down Expand Up @@ -120,7 +121,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
"timestamp": 1472470996199000,
"duration": 207000
}]
`}
`,
}
for i, s := range wants {
want := unmarshalZipkinSpanArrayToMap(t, s)
gotBytes := buf.Next(int(sizes[i]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ func TestStatsReaders_Read(t *testing.T) {

op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: databaseName,
Statements: []string{`CREATE TABLE STATS (
Statements: []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert changes to the googlecloudspannerreceiver in this PR

`CREATE TABLE STATS (
INTERVAL_END TIMESTAMP,
METRIC_LABEL STRING(MAX),
METRIC_VALUE INT64
) PRIMARY KEY (METRIC_LABEL)
`},
`,
},
})
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions receiver/rabbitmqreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ The following settings are optional:

- `endpoint` (default: `http://localhost:15672`): The URL of the node to be monitored.
- `collection_interval` (default = `10s`): This receiver collects metrics on an interval. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.

atoulme marked this conversation as resolved.
Show resolved Hide resolved
- `tls`: TLS control. [By default, insecure settings are rejected and certificate verification is on](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).
- `enable_node_metrics` (default = `true`): Enables collection of RabbitMQ node-level metrics such as memory usage, file descriptors, and disk space.
atoulme marked this conversation as resolved.
Show resolved Hide resolved

### Example Configuration

Expand All @@ -45,6 +47,7 @@ receivers:
username: otelu
password: ${env:RABBITMQ_PASSWORD}
collection_interval: 10s
enable_node_metrics: true
```

The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).
Expand Down
34 changes: 24 additions & 10 deletions receiver/rabbitmqreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver/internal/models"
)

// queuePath is the path to queues endpoint
const queuePath = "/api/queues"
const (
// queuePath is the endpoint for RabbitMQ queues.
queuePath = "/api/queues"

// nodePath is the endpoint for RabbitMQ nodes.
nodePath = "/api/nodes"
)

type client interface {
// GetQueues calls "/api/queues" endpoint to get list of queues for the target node
GetQueues(ctx context.Context) ([]*models.Queue, error)
// GetQueues calls "/api/nodes" endpoint to get list of nodes for the target node
atoulme marked this conversation as resolved.
Show resolved Hide resolved
GetNodes(ctx context.Context) ([]*models.Node, error)
}

var _ client = (*rabbitmqClient)(nil)
Expand Down Expand Up @@ -66,6 +73,17 @@ func (c *rabbitmqClient) GetQueues(ctx context.Context) ([]*models.Queue, error)
return queues, nil
}

func (c *rabbitmqClient) GetNodes(ctx context.Context) ([]*models.Node, error) {
var nodes []*models.Node

if err := c.get(ctx, nodePath, &nodes); err != nil {
c.logger.Error("Failed to retrieve nodes", zap.Error(err))
return nil, err
}

return nodes, nil
}

func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) error {
// Construct endpoint and create request
url := c.hostEndpoint + path
Expand Down Expand Up @@ -94,15 +112,11 @@ func (c *rabbitmqClient) get(ctx context.Context, path string, respObj any) erro
if resp.StatusCode != http.StatusOK {
c.logger.Debug("rabbitMQ API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode))

// Attempt to extract the error payload
payloadData, err := io.ReadAll(resp.Body)
if err != nil {
c.logger.Debug("failed to read payload error message", zap.Error(err))
} else {
c.logger.Debug("rabbitMQ API Error", zap.ByteString("api_error", payloadData))
atoulme marked this conversation as resolved.
Show resolved Hide resolved
payload, readErr := io.ReadAll(resp.Body)
if readErr == nil {
c.logger.Debug("Error response payload", zap.ByteString("error_payload", payload))
}

return fmt.Errorf("non 200 code returned %d", resp.StatusCode)
return fmt.Errorf("non-200 response code: %d", resp.StatusCode)
atoulme marked this conversation as resolved.
Show resolved Hide resolved
}

// Decode the payload into the passed in response object
Expand Down
45 changes: 24 additions & 21 deletions receiver/rabbitmqreceiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-coll
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -26,12 +25,13 @@ import (

const (
queuesAPIResponseFile = "get_queues_response.json"
nodesAPIResponseFile = "get_nodes_response.json"
)

func TestNewClient(t *testing.T) {
clientConfigNonExistandCA := confighttp.NewDefaultClientConfig()
clientConfigNonExistandCA.Endpoint = defaultEndpoint
clientConfigNonExistandCA.TLSSetting = configtls.ClientConfig{
clientConfigInvalid := confighttp.NewDefaultClientConfig()
clientConfigInvalid.Endpoint = defaultEndpoint
clientConfigInvalid.TLSSetting = configtls.ClientConfig{
Config: configtls.Config{
CAFile: "/non/existent",
},
Expand All @@ -40,42 +40,45 @@ func TestNewClient(t *testing.T) {
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = defaultEndpoint

testCase := []struct {
testCases := []struct {
desc string
cfg *Config
host component.Host
settings component.TelemetrySettings
logger *zap.Logger
expectError error
expectError string // Updated to string to match substrings in the error
atoulme marked this conversation as resolved.
Show resolved Hide resolved
}{
{
desc: "Invalid HTTP config",
cfg: &Config{
ClientConfig: clientConfigNonExistandCA,
ClientConfig: clientConfigInvalid,
},
host: componenttest.NewNopHost(),
settings: componenttest.NewNopTelemetrySettings(),
logger: zap.NewNop(),
expectError: errors.New("failed to create HTTP Client"),
expectError: "failed to create HTTP Client: failed to load TLS config",
},
{
desc: "Valid Configuration",
cfg: &Config{
ClientConfig: clientConfig,
Username: "valid_user",
Password: "valid_password",
},
host: componenttest.NewNopHost(),
settings: componenttest.NewNopTelemetrySettings(),
logger: zap.NewNop(),
expectError: nil,
expectError: "",
},
}

for _, tc := range testCase {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ac, err := newClient(context.Background(), tc.cfg, tc.host, tc.settings, tc.logger)
if tc.expectError != nil {
if tc.expectError != "" {
require.Nil(t, ac)
require.ErrorContains(t, err, tc.expectError.Error())
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectError) // Check for the substring
atoulme marked this conversation as resolved.
Show resolved Hide resolved
} else {
require.NoError(t, err)

Expand Down Expand Up @@ -108,9 +111,9 @@ func TestGetQueuesDetails(t *testing.T) {

tc := createTestClient(t, ts.URL)

clusters, err := tc.GetQueues(context.Background())
require.Nil(t, clusters)
require.EqualError(t, err, "non 200 code returned 401")
queues, err := tc.GetQueues(context.Background())
require.Nil(t, queues)
require.EqualError(t, err, "non-200 response code: 401")
},
},
{
Expand All @@ -125,13 +128,13 @@ func TestGetQueuesDetails(t *testing.T) {

tc := createTestClient(t, ts.URL)

clusters, err := tc.GetQueues(context.Background())
require.Nil(t, clusters)
require.ErrorContains(t, err, "failed to decode response payload")
queues, err := tc.GetQueues(context.Background())
require.Nil(t, queues)
require.ErrorContains(t, err, "failed to decode response payload: json: cannot unmarshal object into Go value of type []*models.Queue")
},
},
{
desc: "Successful call",
desc: "Successful Queue API call",
testFunc: func(t *testing.T) {
data := loadAPIResponseData(t, queuesAPIResponseFile)

Expand All @@ -149,9 +152,9 @@ func TestGetQueuesDetails(t *testing.T) {
err := json.Unmarshal(data, &expected)
require.NoError(t, err)

clusters, err := tc.GetQueues(context.Background())
queues, err := tc.GetQueues(context.Background())
require.NoError(t, err)
require.Equal(t, expected, clusters)
require.Equal(t, expected, queues)
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions receiver/rabbitmqreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,25 @@ type Config struct {
confighttp.ClientConfig `mapstructure:",squash"`
Username string `mapstructure:"username"`
Password configopaque.String `mapstructure:"password"`
EnableNodeMetrics bool `mapstructure:"enable_node_metrics"` // Added flag for node metrics
atoulme marked this conversation as resolved.
Show resolved Hide resolved
metadata.MetricsBuilderConfig `mapstructure:",squash"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
var err []error

// Validate username
atoulme marked this conversation as resolved.
Show resolved Hide resolved
if cfg.Username == "" {
err = append(err, errMissingUsername)
}

// Validate password
if cfg.Password == "" {
err = append(err, errMissingPassword)
}

// Validate endpoint
_, parseErr := url.Parse(cfg.Endpoint)
if parseErr != nil {
wrappedErr := fmt.Errorf("%s: %w", errInvalidEndpoint.Error(), parseErr)
Expand Down
30 changes: 21 additions & 9 deletions receiver/rabbitmqreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

func TestValidate(t *testing.T) {
clientConfigInvalid := confighttp.NewDefaultClientConfig()
clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg"
clientConfigInvalid.Endpoint = "invalid://endpoint: 12efg"
atoulme marked this conversation as resolved.
Show resolved Hide resolved

clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = defaultEndpoint
Expand All @@ -38,7 +38,7 @@ func TestValidate(t *testing.T) {
expectedErr: errors.Join(
errMissingUsername,
errMissingPassword,
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)),
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`)),
},
{
desc: "missing password and invalid endpoint",
Expand All @@ -48,7 +48,7 @@ func TestValidate(t *testing.T) {
},
expectedErr: errors.Join(
errMissingPassword,
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
),
},
{
Expand All @@ -59,7 +59,7 @@ func TestValidate(t *testing.T) {
},
expectedErr: errors.Join(
errMissingUsername,
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
),
},
{
Expand All @@ -70,15 +70,26 @@ func TestValidate(t *testing.T) {
ClientConfig: clientConfigInvalid,
},
expectedErr: errors.Join(
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
fmt.Errorf("%w: %s", errInvalidEndpoint, `parse "invalid://endpoint: 12efg": invalid port ": 12efg" after host`),
),
},
{
desc: "valid config",
desc: "valid config with node metrics enabled",
cfg: &Config{
Username: "otelu",
Password: "otelp",
ClientConfig: clientConfig,
Username: "otelu",
Password: "otelp",
ClientConfig: clientConfig,
EnableNodeMetrics: true,
},
expectedErr: nil,
},
{
desc: "valid config with node metrics disabled",
cfg: &Config{
Username: "otelu",
Password: "otelp",
ClientConfig: clientConfig,
EnableNodeMetrics: false,
},
expectedErr: nil,
},
Expand Down Expand Up @@ -112,6 +123,7 @@ func TestLoadConfig(t *testing.T) {
expected.Username = "otelu"
expected.Password = "${env:RABBITMQ_PASSWORD}"
expected.CollectionInterval = 10 * time.Second
expected.EnableNodeMetrics = true

require.Equal(t, expected, cfg)
}
32 changes: 32 additions & 0 deletions receiver/rabbitmqreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,38 @@ The number of messages published to a queue.
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {messages} | Sum | Int | Cumulative | true |

### rabbitmq.node.disk_free

Free disk space on the node.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {bytes} | Sum | Int | Cumulative | false |

### rabbitmq.node.fd_used

The number of file descriptors used on the node.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {fd} | Sum | Int | Cumulative | false |

### rabbitmq.node.mem_limit

The memory limit on the node.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {bytes} | Sum | Int | Cumulative | false |

### rabbitmq.node.mem_used

The memory used on the node.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {bytes} | Sum | Int | Cumulative | false |

## Resource Attributes

| Name | Description | Values | Enabled |
Expand Down
Loading