Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow X-Ray and Application Signals Port Compatibility #280

Merged
merged 6 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,901 changes: 2,901 additions & 0 deletions docs/api.md

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions internal/manifests/collector/adapters/config_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ type LogMetricsCollected struct {
}

type TracesCollected struct {
XRay *xray `json:"xray,omitempty"`
OTLP *otlp `json:"otlp,omitempty"`
XRay *xray `json:"xray,omitempty"`
OTLP *otlp `json:"otlp,omitempty"`
ApplicationSignals *AppSignals `json:"application_signals,omitempty"`
AppSignals *AppSignals `json:"app_signals,omitempty"`
}

type statsD struct {
Expand Down Expand Up @@ -126,7 +128,7 @@ func ConfigStructFromJSONString(configStr string) (*CwaConfig, error) {
return config, nil
}

func (c *CwaConfig) GetApplicationSignalsConfig() *AppSignals {
func (c *CwaConfig) GetApplicationSignalsMetricsConfig() *AppSignals {
if c.Logs == nil {
return nil
}
Expand All @@ -141,3 +143,19 @@ func (c *CwaConfig) GetApplicationSignalsConfig() *AppSignals {
}
return nil
}

func (c *CwaConfig) GetApplicationSignalsTracesConfig() *AppSignals {
if c.Traces == nil {
return nil
}
if c.Traces.TracesCollected == nil {
return nil
}
if c.Traces.TracesCollected.ApplicationSignals != nil {
return c.Traces.TracesCollected.ApplicationSignals
}
if c.Traces.TracesCollected.AppSignals != nil {
return c.Traces.TracesCollected.AppSignals
}
return nil
}
132 changes: 76 additions & 56 deletions internal/manifests/collector/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,36 @@ import (
)

const (
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = "appsig-grpc"
AppSignalsHttp = "appsig-http"
AppSignalsProxy = "appsig-xray"
AppSignalsGrpcSA = ":4315"
AppSignalsHttpSA = ":4316"
AppSignalsProxySA = ":2000"
AppSignalsServerSA = ":4311"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
JmxHttp = "jmx-http"
Server = "server"
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = CWA + "appsig-grpc"
AppSignalsHttp = CWA + "appsig-http"
AppSignalsProxy = CWA + "appsig-xray"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
JmxHttp = "jmx-http"
Server = CWA + "server"
musa-asad marked this conversation as resolved.
Show resolved Hide resolved
)

var receiverDefaultPortsMap = map[string]int32{
StatsD: 8125,
CollectD: 25826,
XrayTraces: 2000,
JmxHttp: 4314,
OtlpGrpc: 4317,
OtlpHttp: 4318,
EMF: 25888,
}

var AppSignalsPortToServicePortMap = map[int32][]corev1.ServicePort{
4315: {{
Name: AppSignalsGrpc,
Port: 4315,
}},
4316: {{
Name: AppSignalsHttp,
Port: 4316,
}},
2000: {{
Name: AppSignalsProxy,
Port: 2000,
}},
StatsD: 8125,
CollectD: 25826,
XrayProxy: 2000,
XrayTraces: 2000,
OtlpGrpc: 4317,
OtlpHttp: 4318,
AppSignalsGrpc: 4315,
AppSignalsHttp: 4316,
AppSignalsProxy: 2000,
EMF: 25888,
JmxHttp: 4314,
Server: 4311,
}

func PortMapToServicePortList(portMap map[int32][]corev1.ServicePort) []corev1.ServicePort {
Expand Down Expand Up @@ -148,8 +134,12 @@ func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaCo
return PortMapToServicePortList(servicePortsMap)
}

func isAppSignalEnabled(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsConfig() != nil
func isAppSignalEnabledMetrics(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsMetricsConfig() != nil
}

func isAppSignalEnabledTraces(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsTracesConfig() != nil
}

func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
Expand Down Expand Up @@ -184,8 +174,23 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN
if err != nil {
logger.Error(err, "error parsing port from endpoint for receiver", zap.String("endpoint", serviceAddress), zap.String("receiver", receiverName))
} else {
if _, ok := servicePortsMap[port]; ok {
logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", port))
if ports, exists := servicePortsMap[port]; exists {
for _, existingPort := range ports {
if existingPort.Protocol == protocol {
logger.Info("Duplicate port and protocol combination configured", zap.Int32("port", port), zap.String("protocol", string(protocol)))
musa-asad marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
name := CWA + receiverName
if receiverName == OtlpGrpc || receiverName == OtlpHttp {
name = fmt.Sprintf("%s-%d", receiverName, port)
}
sp := corev1.ServicePort{
Name: name,
Port: port,
Protocol: protocol,
}
servicePortsMap[port] = append(servicePortsMap[port], sp)
musa-asad marked this conversation as resolved.
Show resolved Hide resolved
} else {
name := CWA + receiverName
if receiverName == OtlpGrpc || receiverName == OtlpHttp {
Expand All @@ -200,15 +205,27 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN
}
}
} else {
if _, ok := servicePortsMap[receiverDefaultPortsMap[receiverName]]; ok {
logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[receiverName]))
defaultPort := receiverDefaultPortsMap[receiverName]
if ports, exists := servicePortsMap[defaultPort]; exists {
for _, existingPort := range ports {
if existingPort.Protocol == protocol {
logger.Info("Duplicate port and protocol combination configured", zap.Int32("port", defaultPort), zap.String("protocol", string(protocol)))
return
}
}
sp := corev1.ServicePort{
Name: receiverName,
Port: defaultPort,
Protocol: protocol,
}
servicePortsMap[defaultPort] = append(servicePortsMap[defaultPort], sp)
} else {
sp := corev1.ServicePort{
Name: receiverName,
Port: receiverDefaultPortsMap[receiverName],
Port: defaultPort,
Protocol: protocol,
}
servicePortsMap[receiverDefaultPortsMap[receiverName]] = []corev1.ServicePort{sp}
servicePortsMap[defaultPort] = []corev1.ServicePort{sp}
}
}
}
Expand Down Expand Up @@ -278,22 +295,25 @@ func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConf
//Xray
if config.Traces.TracesCollected.XRay != nil {
getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.BindAddress, XrayTraces, corev1.ProtocolUDP, servicePortsMap)
serviceAddress := ""
if config.Traces.TracesCollected.XRay.TCPProxy != nil {
getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.TCPProxy.BindAddress, XrayProxy, corev1.ProtocolTCP, servicePortsMap)
serviceAddress = config.Traces.TracesCollected.XRay.TCPProxy.BindAddress
}
getReceiverServicePort(logger, serviceAddress, XrayProxy, corev1.ProtocolTCP, servicePortsMap)
}
return tracesPorts
}

func getApplicationSignalsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if !isAppSignalEnabled(config) {
return
if isAppSignalEnabledMetrics(config) || isAppSignalEnabledTraces(config) {
getReceiverServicePort(logger, "", AppSignalsGrpc, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, "", AppSignalsHttp, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, "", Server, corev1.ProtocolTCP, servicePortsMap)
}

getReceiverServicePort(logger, AppSignalsGrpcSA, AppSignalsGrpc, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsHttpSA, AppSignalsHttp, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsProxySA, AppSignalsProxy, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsServerSA, Server, corev1.ProtocolTCP, servicePortsMap)
if isAppSignalEnabledTraces(config) {
getReceiverServicePort(logger, "", AppSignalsProxy, corev1.ProtocolTCP, servicePortsMap)
}
}

func portFromEndpoint(endpoint string) (int32, error) {
Expand Down
Loading
Loading