Skip to content

Commit

Permalink
Merge branch 'main' into gnufede/test-no-initializer-reset
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufede authored Mar 1, 2024
2 parents 48add02 + 6bc3fd9 commit bd30850
Show file tree
Hide file tree
Showing 36 changed files with 1,277 additions and 579 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/188ca09.in
# pip-compile --no-annotate .riot/requirements/1544815.in
#
attrs==23.1.0
coverage[toml]==7.3.4
attrs==23.2.0
coverage[toml]==7.4.3
exceptiongroup==1.2.0
hypothesis==6.45.0
importlib-metadata==7.0.0
importlib-metadata==7.0.1
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.3.0
pytest==7.4.3
pluggy==1.4.0
pysqlite3-binary==0.5.2.post3
pytest==8.0.2
pytest-cov==4.1.0
pytest-mock==3.12.0
pytest-randomly==3.15.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/1fccf5b.in
# pip-compile --no-annotate .riot/requirements/16eb426.in
#
attrs==23.1.0
coverage[toml]==7.3.4
attrs==23.2.0
coverage[toml]==7.4.3
exceptiongroup==1.2.0
hypothesis==6.45.0
importlib-metadata==7.0.0
importlib-metadata==7.0.1
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.3.0
pytest==7.4.3
pluggy==1.4.0
pytest==8.0.2
pytest-cov==4.1.0
pytest-mock==3.12.0
pytest-randomly==3.15.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/f4c3852.in
# pip-compile --no-annotate .riot/requirements/1c64cfc.in
#
attrs==23.1.0
coverage[toml]==7.3.4
attrs==23.2.0
coverage[toml]==7.4.3
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.3.0
pytest==7.4.3
pluggy==1.4.0
pysqlite3-binary==0.5.2.post3
pytest==8.0.2
pytest-cov==4.1.0
pytest-mock==3.12.0
pytest-randomly==3.15.0
Expand Down
4 changes: 2 additions & 2 deletions .riot/requirements/1d73048.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# pip-compile --config=pyproject.toml --no-annotate --resolver=backtracking .riot/requirements/1d73048.in
#
attrs==23.1.0
attrs==23.2.0
coverage[toml]==7.2.7
exceptiongroup==1.2.0
hypothesis==6.45.0
Expand All @@ -14,7 +14,7 @@ mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.2.0
pytest==7.4.3
pytest==7.4.4
pytest-cov==4.1.0
pytest-mock==3.11.1
pytest-randomly==3.12.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/11921fa.in
# pip-compile --no-annotate .riot/requirements/1e311f5.in
#
attrs==23.1.0
coverage[toml]==7.3.4
attrs==23.2.0
coverage[toml]==7.4.3
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.3.0
pytest==7.4.3
pluggy==1.4.0
pysqlite3-binary==0.5.2.post3
pytest==8.0.2
pytest-cov==4.1.0
pytest-mock==3.12.0
pytest-randomly==3.15.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/da724fd.in
# pip-compile --no-annotate .riot/requirements/1fc50b1.in
#
attrs==23.1.0
coverage[toml]==7.3.4
attrs==23.2.0
coverage[toml]==7.4.3
exceptiongroup==1.2.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==23.2
pluggy==1.3.0
pytest==7.4.3
pluggy==1.4.0
pysqlite3-binary==0.5.2.post3
pytest==8.0.2
pytest-cov==4.1.0
pytest-mock==3.12.0
pytest-randomly==3.15.0
Expand Down
134 changes: 77 additions & 57 deletions ddtrace/contrib/kafka/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys

import confluent_kafka

Expand Down Expand Up @@ -115,8 +116,11 @@ def patch():
for producer in (TracedProducer, TracedSerializingProducer):
trace_utils.wrap(producer, "produce", traced_produce)
for consumer in (TracedConsumer, TracedDeserializingConsumer):
trace_utils.wrap(consumer, "poll", traced_poll)
trace_utils.wrap(consumer, "poll", traced_poll_or_consume)
trace_utils.wrap(consumer, "commit", traced_commit)

# Consume is not implemented in deserializing consumers
trace_utils.wrap(TracedConsumer, "consume", traced_poll_or_consume)
Pin().onto(confluent_kafka.Producer)
Pin().onto(confluent_kafka.Consumer)
Pin().onto(confluent_kafka.SerializingProducer)
Expand All @@ -136,6 +140,10 @@ def unpatch():
if trace_utils.iswrapped(consumer.commit):
trace_utils.unwrap(consumer, "commit")

# Consume is not implemented in deserializing consumers
if trace_utils.iswrapped(TracedConsumer.consume):
trace_utils.unwrap(TracedConsumer, "consume")

confluent_kafka.Producer = _Producer
confluent_kafka.Consumer = _Consumer
if _SerializingProducer is not None:
Expand Down Expand Up @@ -194,7 +202,7 @@ def traced_produce(func, instance, args, kwargs):
return func(*args, **kwargs)


def traced_poll(func, instance, args, kwargs):
def traced_poll_or_consume(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
Expand All @@ -204,67 +212,79 @@ def traced_poll(func, instance, args, kwargs):
start_ns = time_ns()
# wrap in a try catch and raise exception after span is started
err = None
result = None
try:
message = func(*args, **kwargs)
result = func(*args, **kwargs)
return result
except Exception as e:
err = e
raise err
finally:
if isinstance(result, confluent_kafka.Message):
# poll returns a single message
_instrument_message([result], pin, start_ns, instance, err)
elif isinstance(result, list):
# consume returns a list of messages,
_instrument_message(result, pin, start_ns, instance, err)
elif config.kafka.trace_empty_poll_enabled:
_instrument_message([None], pin, start_ns, instance, err)


def _instrument_message(messages, pin, start_ns, instance, err):
ctx = None
if message and config.kafka.distributed_tracing_enabled and message.headers():
ctx = Propagator.extract(dict(message.headers()))
if message or config.kafka.trace_empty_poll_enabled:
with pin.tracer.start_span(
name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING),
service=trace_utils.ext_service(pin, config.kafka),
span_type=SpanTypes.WORKER,
child_of=ctx if ctx is not None else pin.tracer.context_provider.active(),
activate=True,
) as span:
# reset span start time to before function call
span.start_ns = start_ns

span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
span.set_tag_str(COMPONENT, config.kafka.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
# First message is used to extract context and enrich datadog spans
# This approach aligns with the opentelemetry confluent kafka semantics
first_message = messages[0]
if first_message and config.kafka.distributed_tracing_enabled and first_message.headers():
ctx = Propagator.extract(dict(first_message.headers()))
with pin.tracer.start_span(
name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING),
service=trace_utils.ext_service(pin, config.kafka),
span_type=SpanTypes.WORKER,
child_of=ctx if ctx is not None else pin.tracer.context_provider.active(),
activate=True,
) as span:
# reset span start time to before function call
span.start_ns = start_ns

for message in messages:
if message is not None:
core.set_item("kafka_topic", message.topic())
core.dispatch("kafka.consume.start", (instance, message, span))

message_key = message.key() or ""
message_offset = message.offset() or -1
span.set_tag_str(kafkax.TOPIC, message.topic())

# If this is a deserializing consumer, do not set the key as a tag since we
# do not have the serialization function
if (
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
or isinstance(message_key, str)
or isinstance(message_key, bytes)
):
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
span.set_tag(kafkax.PARTITION, message.partition())
is_tombstone = False
try:
is_tombstone = len(message) == 0
except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192
pass
span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone))
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
span.set_tag(SPAN_MEASURED_KEY)
rate = config.kafka.get_analytics_sample_rate()
if rate is not None:
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)

# raise exception if one was encountered
if err is not None:
raise err
return message
else:
core.set_item("kafka_topic", first_message.topic())
core.dispatch("kafka.consume.start", (instance, first_message, span))

span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
span.set_tag_str(COMPONENT, config.kafka.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(first_message is not None))
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
if messages[0] is not None:
message_key = messages[0].key() or ""
message_offset = messages[0].offset() or -1
span.set_tag_str(kafkax.TOPIC, messages[0].topic())

# If this is a deserializing consumer, do not set the key as a tag since we
# do not have the serialization function
if (
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
or isinstance(message_key, str)
or isinstance(message_key, bytes)
):
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
span.set_tag(kafkax.PARTITION, messages[0].partition())
is_tombstone = False
try:
is_tombstone = len(messages[0]) == 0
except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192
pass
span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone))
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
span.set_tag(SPAN_MEASURED_KEY)
rate = config.kafka.get_analytics_sample_rate()
if rate is not None:
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)

if err is not None:
raise err
else:
return message
span.set_exc_info(*sys.exc_info())


def traced_commit(func, instance, args, kwargs):
Expand Down
Loading

0 comments on commit bd30850

Please sign in to comment.