Skip to content

Commit

Permalink
feat: implement opentelemetry for distributed tracing (litmuschaos#706)
Browse files Browse the repository at this point in the history
* feat: add otel & tracing for distributed tracing

Signed-off-by: namkyu1999 <[email protected]>

* feat: add tracing codes to chaslib

Signed-off-by: namkyu1999 <[email protected]>

* fix: misc

Signed-off-by: namkyu1999 <[email protected]>

* fix: make otel optional

Signed-off-by: namkyu1999 <[email protected]>

* fix: skip if litmus-go not received trace_parent

Signed-off-by: namkyu1999 <[email protected]>

* fix: Set context.Context as a parameter in each function

Signed-off-by: namkyu1999 <[email protected]>

* update templates

Signed-off-by: namkyu1999 <[email protected]>

* feat: rename spans and enhance coverage

Signed-off-by: namkyu1999 <[email protected]>

* fix: avoid shadowing

Signed-off-by: namkyu1999 <[email protected]>

* fix: add logs

Signed-off-by: namkyu1999 <[email protected]>

* fix: add logs

Signed-off-by: namkyu1999 <[email protected]>

* fix: fix templates

Signed-off-by: namkyu1999 <[email protected]>

---------

Signed-off-by: namkyu1999 <[email protected]>
  • Loading branch information
namkyu1999 authored Oct 24, 2024
1 parent 0cd6c6f commit 3ef23b0
Show file tree
Hide file tree
Showing 113 changed files with 1,496 additions and 816 deletions.
124 changes: 73 additions & 51 deletions bin/experiment/experiment.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"context"
"errors"
"flag"
"os"

// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"

Expand Down Expand Up @@ -59,10 +63,11 @@ import (
k6Loadgen "github.com/litmuschaos/litmus-go/experiments/load/k6-loadgen/experiment"
springBootFaults "github.com/litmuschaos/litmus-go/experiments/spring-boot/spring-boot-faults/experiment"
vmpoweroff "github.com/litmuschaos/litmus-go/experiments/vmware/vm-poweroff/experiment"

"github.com/litmuschaos/litmus-go/pkg/clients"
cli "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
)

func init() {
Expand All @@ -75,8 +80,25 @@ func init() {
}

func main() {
initCtx := context.Background()

// Set up Observability.
if otelExporterEndpoint := os.Getenv(telemetry.OTELExporterOTLPEndpoint); otelExporterEndpoint != "" {
shutdown, err := telemetry.InitOTelSDK(initCtx, true, otelExporterEndpoint)
if err != nil {
log.Errorf("Failed to initialize OTel SDK: %v", err)
return
}
defer func() {
err = errors.Join(err, shutdown(initCtx))
}()
initCtx = telemetry.GetTraceParentContext()
}

clients := cli.ClientSets{}

clients := clients.ClientSets{}
ctx, span := otel.Tracer(telemetry.TracerName).Start(initCtx, "ExecuteExperiment")
defer span.End()

// parse the experiment name
experimentName := flag.String("name", "pod-delete", "name of the chaos experiment")
Expand All @@ -92,101 +114,101 @@ func main() {
// invoke the corresponding experiment based on the (-name) flag
switch *experimentName {
case "container-kill":
containerKill.ContainerKill(clients)
containerKill.ContainerKill(ctx, clients)
case "disk-fill":
diskFill.DiskFill(clients)
diskFill.DiskFill(ctx, clients)
case "kafka-broker-pod-failure":
kafkaBrokerPodFailure.KafkaBrokerPodFailure(clients)
kafkaBrokerPodFailure.KafkaBrokerPodFailure(ctx, clients)
case "kubelet-service-kill":
kubeletServiceKill.KubeletServiceKill(clients)
kubeletServiceKill.KubeletServiceKill(ctx, clients)
case "docker-service-kill":
dockerServiceKill.DockerServiceKill(clients)
dockerServiceKill.DockerServiceKill(ctx, clients)
case "node-cpu-hog":
nodeCPUHog.NodeCPUHog(clients)
nodeCPUHog.NodeCPUHog(ctx, clients)
case "node-drain":
nodeDrain.NodeDrain(clients)
nodeDrain.NodeDrain(ctx, clients)
case "node-io-stress":
nodeIOStress.NodeIOStress(clients)
nodeIOStress.NodeIOStress(ctx, clients)
case "node-memory-hog":
nodeMemoryHog.NodeMemoryHog(clients)
nodeMemoryHog.NodeMemoryHog(ctx, clients)
case "node-taint":
nodeTaint.NodeTaint(clients)
nodeTaint.NodeTaint(ctx, clients)
case "pod-autoscaler":
podAutoscaler.PodAutoscaler(clients)
podAutoscaler.PodAutoscaler(ctx, clients)
case "pod-cpu-hog-exec":
podCPUHogExec.PodCPUHogExec(clients)
podCPUHogExec.PodCPUHogExec(ctx, clients)
case "pod-delete":
podDelete.PodDelete(clients)
podDelete.PodDelete(ctx, clients)
case "pod-io-stress":
podIOStress.PodIOStress(clients)
podIOStress.PodIOStress(ctx, clients)
case "pod-memory-hog-exec":
podMemoryHogExec.PodMemoryHogExec(clients)
podMemoryHogExec.PodMemoryHogExec(ctx, clients)
case "pod-network-corruption":
podNetworkCorruption.PodNetworkCorruption(clients)
podNetworkCorruption.PodNetworkCorruption(ctx, clients)
case "pod-network-duplication":
podNetworkDuplication.PodNetworkDuplication(clients)
podNetworkDuplication.PodNetworkDuplication(ctx, clients)
case "pod-network-latency":
podNetworkLatency.PodNetworkLatency(clients)
podNetworkLatency.PodNetworkLatency(ctx, clients)
case "pod-network-loss":
podNetworkLoss.PodNetworkLoss(clients)
podNetworkLoss.PodNetworkLoss(ctx, clients)
case "pod-network-partition":
podNetworkPartition.PodNetworkPartition(clients)
podNetworkPartition.PodNetworkPartition(ctx, clients)
case "pod-memory-hog":
podMemoryHog.PodMemoryHog(clients)
podMemoryHog.PodMemoryHog(ctx, clients)
case "pod-cpu-hog":
podCPUHog.PodCPUHog(clients)
podCPUHog.PodCPUHog(ctx, clients)
case "cassandra-pod-delete":
cassandraPodDelete.CasssandraPodDelete(clients)
cassandraPodDelete.CasssandraPodDelete(ctx, clients)
case "aws-ssm-chaos-by-id":
awsSSMChaosByID.AWSSSMChaosByID(clients)
awsSSMChaosByID.AWSSSMChaosByID(ctx, clients)
case "aws-ssm-chaos-by-tag":
awsSSMChaosByTag.AWSSSMChaosByTag(clients)
awsSSMChaosByTag.AWSSSMChaosByTag(ctx, clients)
case "ec2-terminate-by-id":
ec2TerminateByID.EC2TerminateByID(clients)
ec2TerminateByID.EC2TerminateByID(ctx, clients)
case "ec2-terminate-by-tag":
ec2TerminateByTag.EC2TerminateByTag(clients)
ec2TerminateByTag.EC2TerminateByTag(ctx, clients)
case "ebs-loss-by-id":
ebsLossByID.EBSLossByID(clients)
ebsLossByID.EBSLossByID(ctx, clients)
case "ebs-loss-by-tag":
ebsLossByTag.EBSLossByTag(clients)
ebsLossByTag.EBSLossByTag(ctx, clients)
case "node-restart":
nodeRestart.NodeRestart(clients)
nodeRestart.NodeRestart(ctx, clients)
case "pod-dns-error":
podDNSError.PodDNSError(clients)
podDNSError.PodDNSError(ctx, clients)
case "pod-dns-spoof":
podDNSSpoof.PodDNSSpoof(clients)
podDNSSpoof.PodDNSSpoof(ctx, clients)
case "pod-http-latency":
podHttpLatency.PodHttpLatency(clients)
podHttpLatency.PodHttpLatency(ctx, clients)
case "pod-http-status-code":
podHttpStatusCode.PodHttpStatusCode(clients)
podHttpStatusCode.PodHttpStatusCode(ctx, clients)
case "pod-http-modify-header":
podHttpModifyHeader.PodHttpModifyHeader(clients)
podHttpModifyHeader.PodHttpModifyHeader(ctx, clients)
case "pod-http-modify-body":
podHttpModifyBody.PodHttpModifyBody(clients)
podHttpModifyBody.PodHttpModifyBody(ctx, clients)
case "pod-http-reset-peer":
podHttpResetPeer.PodHttpResetPeer(clients)
podHttpResetPeer.PodHttpResetPeer(ctx, clients)
case "vm-poweroff":
vmpoweroff.VMPoweroff(clients)
vmpoweroff.VMPoweroff(ctx, clients)
case "azure-instance-stop":
azureInstanceStop.AzureInstanceStop(clients)
azureInstanceStop.AzureInstanceStop(ctx, clients)
case "azure-disk-loss":
azureDiskLoss.AzureDiskLoss(clients)
azureDiskLoss.AzureDiskLoss(ctx, clients)
case "gcp-vm-disk-loss":
gcpVMDiskLoss.VMDiskLoss(clients)
gcpVMDiskLoss.VMDiskLoss(ctx, clients)
case "pod-fio-stress":
podFioStress.PodFioStress(clients)
podFioStress.PodFioStress(ctx, clients)
case "gcp-vm-instance-stop":
gcpVMInstanceStop.VMInstanceStop(clients)
gcpVMInstanceStop.VMInstanceStop(ctx, clients)
case "redfish-node-restart":
redfishNodeRestart.NodeRestart(clients)
redfishNodeRestart.NodeRestart(ctx, clients)
case "gcp-vm-instance-stop-by-label":
gcpVMInstanceStopByLabel.GCPVMInstanceStopByLabel(clients)
gcpVMInstanceStopByLabel.GCPVMInstanceStopByLabel(ctx, clients)
case "gcp-vm-disk-loss-by-label":
gcpVMDiskLossByLabel.GCPVMDiskLossByLabel(clients)
gcpVMDiskLossByLabel.GCPVMDiskLossByLabel(ctx, clients)
case "spring-boot-cpu-stress", "spring-boot-memory-stress", "spring-boot-exceptions", "spring-boot-app-kill", "spring-boot-faults", "spring-boot-latency":
springBootFaults.Experiment(clients, *experimentName)
springBootFaults.Experiment(ctx, clients, *experimentName)
case "k6-loadgen":
k6Loadgen.Experiment(clients)
k6Loadgen.Experiment(ctx, clients)
default:
log.Errorf("Unsupported -name %v, please provide the correct value of -name args", *experimentName)
return
Expand Down
27 changes: 24 additions & 3 deletions bin/helper/helper.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"context"
"errors"
"flag"
"os"

// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"

Expand All @@ -17,10 +21,11 @@ import (
networkChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/network-chaos/helper"
dnsChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/pod-dns-chaos/helper"
stressChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/stress-chaos/helper"

"github.com/litmuschaos/litmus-go/pkg/clients"
cli "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
)

func init() {
Expand All @@ -33,8 +38,24 @@ func init() {
}

func main() {
ctx := context.Background()
// Set up Observability.
if otelExporterEndpoint := os.Getenv(telemetry.OTELExporterOTLPEndpoint); otelExporterEndpoint != "" {
shutdown, err := telemetry.InitOTelSDK(ctx, true, otelExporterEndpoint)
if err != nil {
log.Errorf("Failed to initialize OTel SDK: %v", err)
return
}
defer func() {
err = errors.Join(err, shutdown(ctx))
}()
ctx = telemetry.GetTraceParentContext()
}

clients := cli.ClientSets{}

clients := clients.ClientSets{}
_, span := otel.Tracer(telemetry.TracerName).Start(ctx, "ExecuteExperimentHelper")
defer span.End()

// parse the helper name
helperName := flag.String("name", "", "name of the helper pod")
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Multi-stage docker build
# Build stage
FROM golang:1.20 AS builder
FROM golang:1.22 AS builder

ARG TARGETOS=linux
ARG TARGETARCH
Expand Down
17 changes: 12 additions & 5 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm-chaos.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package lib

import (
"context"
"os"
"strings"
"time"

experimentTypes "github.com/litmuschaos/litmus-go/pkg/aws-ssm/aws-ssm-chaos/types"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/cloud/aws/ssm"
"github.com/litmuschaos/litmus-go/pkg/events"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/probe"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
)

// InjectChaosInSerialMode will inject the aws ssm chaos in serial mode that is one after other
func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
func InjectChaosInSerialMode(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "InjectAWSSSMFaultInSerialMode")
defer span.End()

select {
case <-inject:
Expand Down Expand Up @@ -60,7 +65,7 @@ func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetai

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 && i == 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand All @@ -85,7 +90,9 @@ func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetai
}

// InjectChaosInParallelMode will inject the aws ssm chaos in parallel mode that is all at once
func InjectChaosInParallelMode(experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
func InjectChaosInParallelMode(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "InjectAWSSSMFaultInParallelMode")
defer span.End()

select {
case <-inject:
Expand Down Expand Up @@ -125,7 +132,7 @@ func InjectChaosInParallelMode(experimentsDetails *experimentTypes.ExperimentDet

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand Down
13 changes: 9 additions & 4 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm/aws-ssm-chaos-by-id.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ssm

import (
"context"
"fmt"
"os"
"os/signal"
Expand All @@ -10,12 +11,14 @@ import (
"github.com/litmuschaos/litmus-go/chaoslib/litmus/aws-ssm-chaos/lib"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/aws-ssm/aws-ssm-chaos/types"
"github.com/litmuschaos/litmus-go/pkg/cerrors"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/cloud/aws/ssm"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
)

var (
Expand All @@ -24,7 +27,9 @@ var (
)

// PrepareAWSSSMChaosByID contains the prepration and injection steps for the experiment
func PrepareAWSSSMChaosByID(experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {
func PrepareAWSSSMChaosByID(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "PrepareAWSSSMFaultByID")
defer span.End()

// inject channel is used to transmit signal notifications.
inject = make(chan os.Signal, 1)
Expand Down Expand Up @@ -60,11 +65,11 @@ func PrepareAWSSSMChaosByID(experimentsDetails *experimentTypes.ExperimentDetail

switch strings.ToLower(experimentsDetails.Sequence) {
case "serial":
if err = lib.InjectChaosInSerialMode(experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
if err = lib.InjectChaosInSerialMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
return stacktrace.Propagate(err, "could not run chaos in serial mode")
}
case "parallel":
if err = lib.InjectChaosInParallelMode(experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
if err = lib.InjectChaosInParallelMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
return stacktrace.Propagate(err, "could not run chaos in parallel mode")
}
default:
Expand Down
Loading

0 comments on commit 3ef23b0

Please sign in to comment.