Skip to content

Commit

Permalink
fix(llmobs): respect enabled setting when forking (#11026)
Browse files Browse the repository at this point in the history
The code was assuming that the LLMObs service is enabled when forking.
If a user had explicitly disabled it via the disable() method or
DD_LLMOBS_ENABLED=0 it would be re-enabled in the child process.

Potentially closes #10859
  • Loading branch information
Kyle-Verhoog authored Oct 24, 2024
1 parent 3fd6696 commit 127e5b7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 16 deletions.
19 changes: 4 additions & 15 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,14 @@ def _child_after_fork(self):
self._evaluator_runner = self._evaluator_runner.recreate()
self._trace_processor._span_writer = self._llmobs_span_writer
self._trace_processor._evaluator_runner = self._evaluator_runner
tracer_filters = self.tracer._filters
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
tracer_filters += [self._trace_processor]
self.tracer.configure(settings={"FILTERS": tracer_filters})
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
except ServiceStatusError:
log.debug("Error starting LLMObs writers after fork")

try:
self._evaluator_runner.start()
except ServiceStatusError:
log.debug("Error starting evaluator runner after fork")
if self.enabled:
self._start_service()

def _start_service(self) -> None:
tracer_filters = self.tracer._filters
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
tracer_filters += [self._trace_processor]
self.tracer.configure(settings={"FILTERS": tracer_filters})
self.tracer.configure(settings={"FILTERS": tracer_filters})
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
Expand Down Expand Up @@ -245,6 +233,7 @@ def enable(

if integrations_enabled:
cls._patch_integrations()

# override the default _instance with a new tracer
cls._instance = cls(tracer=_tracer)
cls.enabled = True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
LLM Observability: Resolves errors where the disabled setting was being ignored when forking.
45 changes: 44 additions & 1 deletion tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,7 @@ def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluato

def test_llmobs_fork_create_span(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0)
monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", "5.0")
with mock.patch("ddtrace.internal.writer.HTTPWriter._send_payload"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
pid = os.fork()
Expand Down Expand Up @@ -1684,6 +1684,49 @@ def process_trace(self, trace):
llmobs_service.disable()


def test_llmobs_fork_disabled(monkeypatch):
"""Test that after being disabled the service remains disabled when forking"""
monkeypatch.setenv("DD_LLMOBS_ENABLED", "0")
svc = llmobs_service(tracer=DummyTracer())
pid = os.fork()
assert not svc.enabled, "both the parent and child should be disabled"
assert svc._llmobs_span_writer.status == ServiceStatus.STOPPED
assert svc._llmobs_eval_metric_writer.status == ServiceStatus.STOPPED
if not pid:
svc.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
svc.disable()


def test_llmobs_fork_disabled_then_enabled(monkeypatch):
"""Test that after being initially disabled, the service can be enabled in a fork"""
monkeypatch.setenv("DD_LLMOBS_ENABLED", "0")
svc = llmobs_service._instance
pid = os.fork()
assert not svc.enabled, "both the parent and child should be disabled"
assert svc._llmobs_span_writer.status == ServiceStatus.STOPPED
assert svc._llmobs_eval_metric_writer.status == ServiceStatus.STOPPED
if not pid:
# Enable the service in the child
with override_global_config(dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>")):
monkeypatch.setenv("DD_LLMOBS_ENABLED", "1")
llmobs_service.enable(_tracer=DummyTracer())
svc = llmobs_service._instance
assert svc._llmobs_span_writer.status == ServiceStatus.RUNNING
assert svc._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING
svc.disable()
os._exit(12)

_, status = os.waitpid(pid, 0)
exit_code = os.WEXITSTATUS(status)
assert exit_code == 12
svc.disable()


def test_annotation_context_modifies_span_tags(LLMObs):
with LLMObs.annotation_context(tags={"foo": "bar"}):
with LLMObs.agent(name="test_agent") as span:
Expand Down

0 comments on commit 127e5b7

Please sign in to comment.