Skip to content

Commit

Permalink
feat: advisory parameters for metrics instruments
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Jan 30, 2025
1 parent 19b5fbe commit 8811940
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 40 deletions.
16 changes: 8 additions & 8 deletions metrics_api/lib/opentelemetry/internal/proxy_meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ def delegate=(meter)

private

def create_instrument(kind, name, unit, description, callback)
def create_instrument(kind, name, unit, description, callback, **advisory_parameters)
super do
next ProxyInstrument.new(kind, name, unit, description, callback) if @delegate.nil?

case kind
when :counter then @delegate.create_counter(name, unit: unit, description: description)
when :histogram then @delegate.create_histogram(name, unit: unit, description: description)
when :gauge then @delegate.create_gauge(name, unit: unit, description: description)
when :up_down_counter then @delegate.create_up_down_counter(name, unit: unit, description: description)
when :observable_counter then @delegate.create_observable_counter(name, unit: unit, description: description, callback: callback)
when :observable_gauge then @delegate.create_observable_gauge(name, unit: unit, description: description, callback: callback)
when :observable_up_down_counter then @delegate.create_observable_up_down_counter(name, unit: unit, description: description, callback: callback)
when :counter then @delegate.create_counter(name, unit: unit, description: description, **advisory_parameters)
when :histogram then @delegate.create_histogram(name, unit: unit, description: description, **advisory_parameters)
when :gauge then @delegate.create_gauge(name, unit: unit, description: description, **advisory_parameters)
when :up_down_counter then @delegate.create_up_down_counter(name, unit: unit, description: description, **advisory_parameters)
when :observable_counter then @delegate.create_observable_counter(name, unit: unit, description: description, callback: callback, **advisory_parameters)
when :observable_gauge then @delegate.create_observable_gauge(name, unit: unit, description: description, callback: callback, **advisory_parameters)
when :observable_up_down_counter then @delegate.create_observable_up_down_counter(name, unit: unit, description: description, callback: callback, **advisory_parameters)
end
end
end
Expand Down
30 changes: 15 additions & 15 deletions metrics_api/lib/opentelemetry/metrics/meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def initialize
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of counter, it will be stored in instrument_registry
def create_counter(name, unit: nil, description: nil)
create_instrument(:counter, name, unit, description, nil) { COUNTER }
def create_counter(name, unit: nil, description: nil, **advisory_parameters)
create_instrument(:counter, name, unit, description, nil, **advisory_parameters) { COUNTER }
end

# Histogram is a synchronous Instrument which can be used to report arbitrary values that are likely
Expand All @@ -62,8 +62,8 @@ def create_counter(name, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of histogram, it will be stored in instrument_registry
def create_histogram(name, unit: nil, description: nil)
create_instrument(:histogram, name, unit, description, nil) { HISTOGRAM }
def create_histogram(name, unit: nil, description: nil, **advisory_parameters)
create_instrument(:histogram, name, unit, description, nil, **advisory_parameters) { HISTOGRAM }
end

# Gauge is an synchronous Instrument which reports non-additive value(s)
Expand All @@ -80,8 +80,8 @@ def create_histogram(name, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of gauge, it will be stored in instrument_registry
def create_gauge(name, unit: nil, description: nil)
create_instrument(:gauge, name, unit, description, nil) { GAUGE }
def create_gauge(name, unit: nil, description: nil, **advisory_parameters)
create_instrument(:gauge, name, unit, description, nil, **advisory_parameters) { GAUGE }
end

# UpDownCounter is a synchronous Instrument which supports increments and decrements.
Expand All @@ -97,8 +97,8 @@ def create_gauge(name, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of up_down_counter, it will be stored in instrument_registry
def create_up_down_counter(name, unit: nil, description: nil)
create_instrument(:up_down_counter, name, unit, description, nil) { UP_DOWN_COUNTER }
def create_up_down_counter(name, unit: nil, description: nil, **advisory_parameters)
create_instrument(:up_down_counter, name, unit, description, nil, **advisory_parameters) { UP_DOWN_COUNTER }
end

# ObservableCounter is an asynchronous Instrument which reports monotonically
Expand All @@ -119,8 +119,8 @@ def create_up_down_counter(name, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of observable_counter, it will be stored in instrument_registry
def create_observable_counter(name, callback:, unit: nil, description: nil)
create_instrument(:observable_counter, name, unit, description, callback) { OBSERVABLE_COUNTER }
def create_observable_counter(name, callback:, unit: nil, description: nil, **advisory_parameters)
create_instrument(:observable_counter, name, unit, description, callback, **advisory_parameters) { OBSERVABLE_COUNTER }
end

# ObservableGauge is an asynchronous Instrument which reports non-additive value(s)
Expand All @@ -142,8 +142,8 @@ def create_observable_counter(name, callback:, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of observable_gauge, it will be stored in instrument_registry
def create_observable_gauge(name, callback:, unit: nil, description: nil)
create_instrument(:observable_gauge, name, unit, description, callback) { OBSERVABLE_GAUGE }
def create_observable_gauge(name, callback:, unit: nil, description: nil, **advisory_parameters)
create_instrument(:observable_gauge, name, unit, description, callback, **advisory_parameters) { OBSERVABLE_GAUGE }
end

# ObservableUpDownCounter is an asynchronous Instrument which reports additive value(s)
Expand All @@ -165,13 +165,13 @@ def create_observable_gauge(name, callback:, unit: nil, description: nil)
# @param description [optional String] an optional free-form text provided by user.
#
# @return [nil] after creation of observable_up_down_counter, it will be stored in instrument_registry
def create_observable_up_down_counter(name, callback:, unit: nil, description: nil)
create_instrument(:observable_up_down_counter, name, unit, description, callback) { OBSERVABLE_UP_DOWN_COUNTER }
def create_observable_up_down_counter(name, callback:, unit: nil, description: nil, **advisory_parameters)
create_instrument(:observable_up_down_counter, name, unit, description, callback, **advisory_parameters) { OBSERVABLE_UP_DOWN_COUNTER }
end

private

def create_instrument(kind, name, unit, description, callback)
def create_instrument(kind, name, unit, description, callback, **)
@mutex.synchronize do
OpenTelemetry.logger.warn("duplicate instrument registration occurred for instrument #{name}") if @instrument_registry.include? name

Expand Down
10 changes: 10 additions & 0 deletions metrics_api/test/opentelemetry/metrics/meter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,21 @@
_(counter.class).must_equal(OpenTelemetry::Metrics::Instrument::Counter)
end

it 'test create_counter with advisory parameters' do
counter = meter.create_counter('test', attributes: { 'test' => true })
_(counter.class).must_equal(OpenTelemetry::Metrics::Instrument::Counter)
end

it 'test create_histogram' do
counter = meter.create_histogram('test')
_(counter.class).must_equal(OpenTelemetry::Metrics::Instrument::Histogram)
end

it 'test create_histogram with advisory parameters' do
histogram = meter.create_histogram('test', explicit_bucket_boundaries: [1, 2, 3])
_(histogram.class).must_equal(OpenTelemetry::Metrics::Instrument::Histogram)
end

it 'test create_up_down_counter' do
counter = meter.create_up_down_counter('test')
_(counter.class).must_equal(OpenTelemetry::Metrics::Instrument::UpDownCounter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ module Aggregation
class Drop
attr_reader :aggregation_temporality

def initialize(aggregation_temporality: :delta)
def initialize(aggregation_temporality: :delta, attributes: nil)
@aggregation_temporality = aggregation_temporality
@attributes = attributes if attributes
end

def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
end

def update(increment, attributes, data_points)
attributes = @attributes.merge(attributes) if @attributes

data_points[attributes] = NumberDataPoint.new(
{},
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ class ExplicitBucketHistogram
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), # TODO: the default should be :cumulative, see issue #1555
boundaries: DEFAULT_BOUNDARIES,
attributes: nil,
record_min_max: true
)
@aggregation_temporality = aggregation_temporality.to_sym
@boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil
@record_min_max = record_min_max
@attributes = attributes if attributes
end

def collect(start_time, end_time, data_points)
Expand All @@ -53,6 +55,8 @@ def collect(start_time, end_time, data_points)
end

def update(amount, attributes, data_points)
attributes = @attributes.merge(attributes) if @attributes

hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ module Aggregation
class LastValue
attr_reader :aggregation_temporality

def initialize(aggregation_temporality: :delta)
def initialize(aggregation_temporality: :delta, attributes: nil)
@aggregation_temporality = aggregation_temporality
@attributes = attributes if attributes
end

def collect(start_time, end_time, data_points)
Expand All @@ -37,6 +38,8 @@ def collect(start_time, end_time, data_points)
end

def update(increment, attributes, data_points)
attributes = @attributes.merge(attributes) if @attributes

data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
Expand Down
8 changes: 7 additions & 1 deletion metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ module Aggregation
class Sum
attr_reader :aggregation_temporality

def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta))
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
attributes: nil
)
# TODO: the default should be :cumulative, see issue #1555
@aggregation_temporality = aggregation_temporality.to_sym
@attributes = attributes if attributes
end

def collect(start_time, end_time, data_points)
Expand All @@ -39,6 +43,8 @@ def collect(start_time, end_time, data_points)
end

def update(increment, attributes, data_points)
attributes = @attributes.merge(attributes) if @attributes

ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def add(increment, attributes: {})
private

def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(attributes: @attributes)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def record(value, attributes: {})
private

def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new
OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new(attributes: @attributes)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,21 @@ def record(amount, attributes: {})
private

def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new
# This hash is assembled to avoid implicitly passing `boundaries: nil`,
# which should be valid explicit call according to ExplicitBucketHistogram#initialize
kwargs = {}
kwargs[:attributes] = @attributes if @attributes
kwargs[:boundaries] = @boundaries if @boundaries

OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new(**kwargs)
end

def validate_advisory_parameters(parameters)
if (boundaries = parameters.delete(:explicit_bucket_boundaries))
@boundaries = boundaries
end

super
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ module Instrument
# {SynchronousInstrument} contains the common functionality shared across
# the synchronous instruments SDK instruments.
class SynchronousInstrument
def initialize(name, unit, description, instrumentation_scope, meter_provider)
def initialize(name, unit, description, instrumentation_scope, meter_provider, **advisory_parameters)
@name = name
@unit = unit
@description = description
@instrumentation_scope = instrumentation_scope
@meter_provider = meter_provider
@metric_streams = []

validate_advisory_parameters(advisory_parameters)

meter_provider.register_synchronous_instrument(self)
end

# @api private
def register_with_new_metric_store(metric_store, aggregation: default_aggregation)
def register_with_new_metric_store(metric_store)
ms = OpenTelemetry::SDK::Metrics::State::MetricStream.new(
@name,
@description,
Expand All @@ -37,11 +39,25 @@ def register_with_new_metric_store(metric_store, aggregation: default_aggregatio
metric_store.add_metric_stream(ms)
end

def aggregation
@aggregation || default_aggregation
end

private

def update(value, attributes)
@metric_streams.each { |ms| ms.update(value, attributes) }
end

def validate_advisory_parameters(advisory_parameters)
if (attributes = advisory_parameters.delete(:attributes))
@attributes = attributes
end

advisory_parameters.each_key do |parameter_name|
OpenTelemetry.logger.warn "Advisory parameter `#{parameter_name}` is not valid for instrument kind `#{instrument_kind}`; ignoring"
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def add(amount, attributes: {})
private

def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(attributes: @attributes)
end
end
end
Expand Down
16 changes: 8 additions & 8 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def add_metric_reader(metric_reader)
end
end

def create_instrument(kind, name, unit, description, callback)
def create_instrument(kind, name, unit, description, callback, **advisory_parameters)
raise InstrumentNameError if name.nil?
raise InstrumentNameError if name.empty?
raise InstrumentNameError unless NAME_REGEX.match?(name)
Expand All @@ -44,13 +44,13 @@ def create_instrument(kind, name, unit, description, callback)

super do
case kind
when :counter then OpenTelemetry::SDK::Metrics::Instrument::Counter.new(name, unit, description, @instrumentation_scope, @meter_provider)
when :observable_counter then OpenTelemetry::SDK::Metrics::Instrument::ObservableCounter.new(name, unit, description, callback, @instrumentation_scope, @meter_provider)
when :gauge then OpenTelemetry::SDK::Metrics::Instrument::Gauge.new(name, unit, description, @instrumentation_scope, @meter_provider)
when :histogram then OpenTelemetry::SDK::Metrics::Instrument::Histogram.new(name, unit, description, @instrumentation_scope, @meter_provider)
when :observable_gauge then OpenTelemetry::SDK::Metrics::Instrument::ObservableGauge.new(name, unit, description, callback, @instrumentation_scope, @meter_provider)
when :up_down_counter then OpenTelemetry::SDK::Metrics::Instrument::UpDownCounter.new(name, unit, description, @instrumentation_scope, @meter_provider)
when :observable_up_down_counter then OpenTelemetry::SDK::Metrics::Instrument::ObservableUpDownCounter.new(name, unit, description, callback, @instrumentation_scope, @meter_provider)
when :counter then OpenTelemetry::SDK::Metrics::Instrument::Counter.new(name, unit, description, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :observable_counter then OpenTelemetry::SDK::Metrics::Instrument::ObservableCounter.new(name, unit, description, callback, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :gauge then OpenTelemetry::SDK::Metrics::Instrument::Gauge.new(name, unit, description, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :histogram then OpenTelemetry::SDK::Metrics::Instrument::Histogram.new(name, unit, description, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :observable_gauge then OpenTelemetry::SDK::Metrics::Instrument::ObservableGauge.new(name, unit, description, callback, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :up_down_counter then OpenTelemetry::SDK::Metrics::Instrument::UpDownCounter.new(name, unit, description, @instrumentation_scope, @meter_provider, **advisory_parameters)
when :observable_up_down_counter then OpenTelemetry::SDK::Metrics::Instrument::ObservableUpDownCounter.new(name, unit, description, callback, @instrumentation_scope, @meter_provider, **advisory_parameters)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,30 @@
_(last_snapshot[0].data_points[0].attributes).must_equal('foo' => 'bar')
_(last_snapshot[0].aggregation_temporality).must_equal(:delta)
end

describe 'with advisory parameters' do
let(:random_key) { "a#{SecureRandom.hex}" }
let(:counter) do
meter.create_counter(
'counter',
unit: 'smidgen',
description: 'a small amount of something',
attributes: { random_key => true }
)
end

it 'counts' do
counter.add(1, attributes: { 'foo' => 'bar' })
metric_exporter.pull
last_snapshot = metric_exporter.metric_snapshots

_(last_snapshot[0].name).must_equal('counter')
_(last_snapshot[0].unit).must_equal('smidgen')
_(last_snapshot[0].description).must_equal('a small amount of something')
_(last_snapshot[0].instrumentation_scope.name).must_equal('test')
_(last_snapshot[0].data_points[0].value).must_equal(1)
_(last_snapshot[0].data_points[0].attributes).must_equal('foo' => 'bar', random_key => true)
_(last_snapshot[0].aggregation_temporality).must_equal(:delta)
end
end
end
Loading

0 comments on commit 8811940

Please sign in to comment.