From 7cda434b00937a2339f4d4d57f1a2d90bcff80e9 Mon Sep 17 00:00:00 2001 From: Stephen Goodall Date: Thu, 14 Nov 2024 00:49:20 +1100 Subject: [PATCH] feat: introduce opentelemetry semconv fields, re-attempt metrics and fix duration of workflow --- requirements.txt | 2 +- src/exporter.py | 36 ++++++++++++++++++++++-------------- src/otel/__init__.py | 33 ++++++++++++++++++++++++++++----- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9581115..977a7cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ opentelemetry.api -opentelemetry.sdk +opentelemetry.sdk>=1.27.0 opentelemetry.exporter.otlp.proto.grpc opentelemetry.exporter.otlp.proto.http opentelemetry.instrumentation.logging diff --git a/src/exporter.py b/src/exporter.py index 5f5fe38..2d845b6 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -3,6 +3,7 @@ import json import logging import os +import opentelemetry.semconv._incubating.attributes.cicd_attributes as cicd_semconv from opentelemetry import trace from opentelemetry.instrumentation.logging import LoggingInstrumentor from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -85,7 +86,7 @@ # Set workflow level tracer. meter and logger global_resource = Resource(attributes=global_attributes) tracer = otel_tracer(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "tracer", OTLP_PROTOCOL) -#meter = otel_meter(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "meter", OTLP_PROTOCOL) +meter = otel_meter(OTEL_EXPORTER_OTLP_ENDPOINT, headers, global_resource, "meter", OTLP_PROTOCOL) # Ensure we don't export data for Dynatrace_OTel_GitHubAction exporter workflow_run = json.loads(get_workflow_run_jobs_by_run_id) @@ -98,16 +99,18 @@ print("No data to export, assuming this github action workflow job is a Dynatrace_OTel_GitHubAction exporter") exit(0) -#job_counter = meter.create_counter(name="github.workflow.overall.job_count", description="Total Number of Jobs in the Workflow Run") -#job_counter.add(len(job_lst)) +job_counter = meter.create_counter(name="github.workflow.overall.job_count", description="Total Number of Jobs in the Workflow Run") +job_counter.add(len(job_lst)) -#successful_job_counter = meter.create_counter(name="github.workflow.successful.job_count", description="Number of Successful Jobs in the Workflow Run") -#failed_job_counter = meter.create_counter(name="github.workflow.failed.job_count", description="Number of Failed Jobs in the Workflow Run") +successful_job_counter = meter.create_counter(name="github.workflow.successful.job_count", description="Number of Successful Jobs in the Workflow Run") +failed_job_counter = meter.create_counter(name="github.workflow.failed.job_count", description="Number of Failed Jobs in the Workflow Run") # Trace parent workflow_run_atts = json.loads(get_workflow_run_by_run_id) atts=parse_attributes(workflow_run_atts,"","workflow") +atts[cicd_semconv.CICD_PIPELINE_NAME] = str(WORKFLOW_RUN_NAME) +atts[cicd_semconv.CICD_PIPELINE_RUN_ID] = WORKFLOW_RUN_ID print("Processing Workflow ->",WORKFLOW_RUN_NAME,"run id ->",WORKFLOW_RUN_ID) p_parent = tracer.start_span(name=str(WORKFLOW_RUN_NAME),attributes=atts,start_time=do_time(workflow_run_atts['run_started_at']),kind=trace.SpanKind.SERVER) @@ -137,15 +140,18 @@ for job in job_lst: try: print("Processing job ->",job['name']) - child_0 = tracer.start_span(name=str(job['name']),context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER) - child_0.set_attributes(create_otel_attributes(parse_attributes(job,"steps","job"),GITHUB_REPOSITORY_NAME)) + child_0_attributes = create_otel_attributes(parse_attributes(job,"steps","job"),GITHUB_REPOSITORY_NAME) + child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME] = job['name'] + child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_RUN_ID] = job['run_id'] + child_0_attributes[cicd_semconv.CICD_PIPELINE_TASK_RUN_URL_FULL] = job['html_url'] + child_0 = tracer.start_span(name=str(job['name']), attributes=child_0_attributes, context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER) p_sub_context = trace.set_span_in_context(child_0) # Update Job Metrics - #if job['conclusion'] == 'success': - # successful_job_counter.add(1) - #else: - # failed_job_counter.add(1) + if job['conclusion'] == 'success': + successful_job_counter.add(1) + else: + failed_job_counter.add(1) # Steps trace span for index,step in enumerate(job['steps']): @@ -164,6 +170,7 @@ step_tracer = otel_tracer(OTEL_EXPORTER_OTLP_ENDPOINT, headers, resource_log, "step_tracer", OTLP_PROTOCOL) + resource_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME.replace("pipeline.task", "pipeline.task.step")] = step['name'] resource_attributes.update(create_otel_attributes(parse_attributes(step,"","step"),GITHUB_REPOSITORY_NAME)) resource_log = Resource(attributes=resource_attributes) job_logger = otel_logger(OTEL_EXPORTER_OTLP_ENDPOINT,headers,resource_log, "job_logger", OTLP_PROTOCOL) @@ -177,8 +184,9 @@ else: step_started_at=step['started_at'] - child_1 = step_tracer.start_span(name=str(step['name']),start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER) - child_1.set_attributes(create_otel_attributes(parse_attributes(step,"","job"),GITHUB_REPOSITORY_NAME)) + child_1_attributes = create_otel_attributes(parse_attributes(step,"","job"),GITHUB_REPOSITORY_NAME) + child_1_attributes[cicd_semconv.CICD_PIPELINE_TASK_NAME.replace("pipeline.task", "pipeline.task.step")] = step['name'] + child_1 = step_tracer.start_span(name=str(step['name']), attributes= child_1_attributes, start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER) with trace.use_span(child_1, end_on_exit=False): # Parse logs try: @@ -237,11 +245,11 @@ print("Unable to process step ->",step['name'],"<- due to error",e) child_0.end(end_time=do_time(job['completed_at'])) - workflow_run_finish_time=do_time(job['completed_at']) print("Finished processing job ->",job['name']) except Exception as e: print("Unable to process job ->",job['name'],"<- due to error",e) +workflow_run_finish_time=do_time(workflow_run_atts['updated_at']) p_parent.end(end_time=workflow_run_finish_time) print("Finished processing Workflow ->",WORKFLOW_RUN_NAME,"run id ->",WORKFLOW_RUN_ID) print("All data exported to Dynatrace") diff --git a/src/otel/__init__.py b/src/otel/__init__.py index 0d7ad1e..890db08 100644 --- a/src/otel/__init__.py +++ b/src/otel/__init__.py @@ -12,8 +12,31 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, AggregationTemporality +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + MeterProvider, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + PeriodicExportingMetricReader, +) + + +deltaTemporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, +} + + def getLogExporter(endpoint, headers, protocol): match protocol: case "HTTP": @@ -35,11 +58,11 @@ def getSpanExporter(endpoint, headers, protocol): def getMetricExporter(endpoint, headers, protocol): match protocol: case "HTTP": - return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA}) + return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality) case "GRPC": - return GRPCOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA}) + return GRPCOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality) case _: - return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_aggregation={AggregationTemporality.DELTA}) + return HTTPOTLPMetricExporter(endpoint=endpoint, headers=headers, preferred_temporality=deltaTemporality) def create_otel_attributes(atts, GITHUB_SERVICE_NAME): attributes={SERVICE_NAME: GITHUB_SERVICE_NAME}