Skip to content

Commit

Permalink
feat: introduce opentelemetry semconv fields, re-attempt metrics and …
Browse files Browse the repository at this point in the history
…fix duration of workflow
  • Loading branch information
StephenGoodall committed Nov 13, 2024
1 parent 6722429 commit 7cda434
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
opentelemetry.api
opentelemetry.sdk
opentelemetry.sdk>=1.27.0
opentelemetry.exporter.otlp.proto.grpc
opentelemetry.exporter.otlp.proto.http
opentelemetry.instrumentation.logging
Expand Down
36 changes: 22 additions & 14 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import opentelemetry.semconv._incubating.attributes.cicd_attributes as cicd_semconv
from opentelemetry import trace
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
Expand Down Expand Up @@ -85,7 +86,7 @@
# Set workflow level tracer. meter and logger
global_resource = Resource(attributes=global_attributes)
tracer = otel_tracer(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "tracer", OTLP_PROTOCOL)
#meter = otel_meter(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "meter", OTLP_PROTOCOL)
meter = otel_meter(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "meter", OTLP_PROTOCOL)

# Ensure we don't export data for Dynatrace_OTel_GitHubAction exporter
workflow_run = json.loads(get_workflow_run_jobs_by_run_id)
Expand All @@ -98,16 +99,18 @@
print("No data to export, assuming this github action workflow job is a Dynatrace_OTel_GitHubAction exporter")
exit(0)

#job_counter = meter.create_counter(name="github.workflow.overall.job_count", description="Total Number of Jobs in the Workflow Run")
#job_counter.add(len(job_lst))
job_counter = meter.create_counter(name="github.workflow.overall.job_count", description="Total Number of Jobs in the Workflow Run")
job_counter.add(len(job_lst))

#successful_job_counter = meter.create_counter(name="github.workflow.successful.job_count", description="Number of Successful Jobs in the Workflow Run")
#failed_job_counter = meter.create_counter(name="github.workflow.failed.job_count", description="Number of Failed Jobs in the Workflow Run")
successful_job_counter = meter.create_counter(name="github.workflow.successful.job_count", description="Number of Successful Jobs in the Workflow Run")
failed_job_counter = meter.create_counter(name="github.workflow.failed.job_count", description="Number of Failed Jobs in the Workflow Run")


# Trace parent
workflow_run_atts = json.loads(get_workflow_run_by_run_id)
atts=parse_attributes(workflow_run_atts,"","workflow")
atts[cicd_semconv.CICD_PIPELINE_NAME] = str(WORKFLOW_RUN_NAME)
atts[cicd_semconv.CICD_PIPELINE_RUN_ID] = WORKFLOW_RUN_ID
print("Processing Workflow ->",WORKFLOW_RUN_NAME,"run id ->",WORKFLOW_RUN_ID)
p_parent = tracer.start_span(name=str(WORKFLOW_RUN_NAME),attributes=atts,start_time=do_time(workflow_run_atts['run_started_at']),kind=trace.SpanKind.SERVER)

Expand Down Expand Up @@ -137,15 +140,18 @@
for job in job_lst:
try:
print("Processing job ->",job['name'])
child_0 = tracer.start_span(name=str(job['name']),context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER)
child_0.set_attributes(create_otel_attributes(parse_attributes(job,"steps","job"),GITHUB_REPOSITORY_NAME))
child_0_attributes = create_otel_attributes(parse_attributes(job,"steps","job"),GITHUB_REPOSITORY_NAME)
child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME] = job['name']
child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_RUN_ID] = job['run_id']
child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_RUN_URL_FULL] = job['html_url']
child_0 = tracer.start_span(name=str(job['name']), attributes=child_0_attributes, context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER)
p_sub_context = trace.set_span_in_context(child_0)

# Update Job Metrics
#if job['conclusion'] == 'success':
# successful_job_counter.add(1)
#else:
# failed_job_counter.add(1)
if job['conclusion'] == 'success':
successful_job_counter.add(1)
else:
failed_job_counter.add(1)

# Steps trace span
for index,step in enumerate(job['steps']):
Expand All @@ -164,6 +170,7 @@

step_tracer = otel_tracer(OTEL_EXPORTER_OTLP_ENDPOINT, headers, resource_log, "step_tracer", OTLP_PROTOCOL)

resource_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME.replace("pipeline.task", "pipeline.task.step")] = step['name']
resource_attributes.update(create_otel_attributes(parse_attributes(step,"","step"),GITHUB_REPOSITORY_NAME))
resource_log = Resource(attributes=resource_attributes)
job_logger = otel_logger(OTEL_EXPORTER_OTLP_ENDPOINT,headers,resource_log, "job_logger", OTLP_PROTOCOL)
Expand All @@ -177,8 +184,9 @@
else:
step_started_at=step['started_at']

child_1 = step_tracer.start_span(name=str(step['name']),start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER)
child_1.set_attributes(create_otel_attributes(parse_attributes(step,"","job"),GITHUB_REPOSITORY_NAME))
child_1_attributes = create_otel_attributes(parse_attributes(step,"","job"),GITHUB_REPOSITORY_NAME)
child_1_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME.replace("pipeline.task", "pipeline.task.step")] = step['name']
child_1 = step_tracer.start_span(name=str(step['name']), attributes= child_1_attributes, start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER)
with trace.use_span(child_1, end_on_exit=False):
# Parse logs
try:
Expand Down Expand Up @@ -237,11 +245,11 @@
print("Unable to process step ->",step['name'],"<- due to error",e)

child_0.end(end_time=do_time(job['completed_at']))
workflow_run_finish_time=do_time(job['completed_at'])
print("Finished processing job ->",job['name'])
except Exception as e:
print("Unable to process job ->",job['name'],"<- due to error",e)

workflow_run_finish_time=do_time(workflow_run_atts['updated_at'])
p_parent.end(end_time=workflow_run_finish_time)
print("Finished processing Workflow ->",WORKFLOW_RUN_NAME,"run id ->",WORKFLOW_RUN_ID)
print("All data exported to Dynatrace")
33 changes: 28 additions & 5 deletions src/otel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,31 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, AggregationTemporality
from opentelemetry.sdk.metrics import (
Counter,
Histogram,
MeterProvider,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
PeriodicExportingMetricReader,
)


deltaTemporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}


def getLogExporter(endpoint, headers, protocol):
match protocol:
case "HTTP":
Expand All @@ -35,11 +58,11 @@ def getSpanExporter(endpoint, headers, protocol):
def getMetricExporter(endpoint, headers, protocol):
match protocol:
case "HTTP":
return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA})
return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality)
case "GRPC":
return GRPCOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA})
return GRPCOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality)
case _:
return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA})
return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality)

def create_otel_attributes(atts, GITHUB_SERVICE_NAME):
attributes={SERVICE_NAME: GITHUB_SERVICE_NAME}
Expand Down

0 comments on commit 7cda434

Please sign in to comment.