diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 14ae70c1214..1e8caa99861 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -124,26 +124,14 @@ def _child_after_fork(self): self._evaluator_runner = self._evaluator_runner.recreate() self._trace_processor._span_writer = self._llmobs_span_writer self._trace_processor._evaluator_runner = self._evaluator_runner - tracer_filters = self.tracer._filters - if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters): - tracer_filters += [self._trace_processor] - self.tracer.configure(settings={"FILTERS": tracer_filters}) - try: - self._llmobs_span_writer.start() - self._llmobs_eval_metric_writer.start() - except ServiceStatusError: - log.debug("Error starting LLMObs writers after fork") - - try: - self._evaluator_runner.start() - except ServiceStatusError: - log.debug("Error starting evaluator runner after fork") + if self.enabled: + self._start_service() def _start_service(self) -> None: tracer_filters = self.tracer._filters if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters): tracer_filters += [self._trace_processor] - self.tracer.configure(settings={"FILTERS": tracer_filters}) + self.tracer.configure(settings={"FILTERS": tracer_filters}) try: self._llmobs_span_writer.start() self._llmobs_eval_metric_writer.start() @@ -245,6 +233,7 @@ def enable( if integrations_enabled: cls._patch_integrations() + # override the default _instance with a new tracer cls._instance = cls(tracer=_tracer) cls.enabled = True diff --git a/releasenotes/notes/llm-obs-fork-disabled-f710f0ccde71c36b.yaml b/releasenotes/notes/llm-obs-fork-disabled-f710f0ccde71c36b.yaml new file mode 100644 index 00000000000..30f571bae30 --- /dev/null +++ b/releasenotes/notes/llm-obs-fork-disabled-f710f0ccde71c36b.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + LLM Observability: Resolves errors where the disabled setting was being ignored when forking. diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index c69a0c36829..f174b2c219a 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1574,7 +1574,7 @@ def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluato def test_llmobs_fork_create_span(monkeypatch): """Test that forking a process correctly encodes new spans created in each process.""" - monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0) + monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", "5.0") with mock.patch("ddtrace.internal.writer.HTTPWriter._send_payload"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") pid = os.fork() @@ -1684,6 +1684,49 @@ def process_trace(self, trace): llmobs_service.disable() +def test_llmobs_fork_disabled(monkeypatch): + """Test that after being disabled the service remains disabled when forking""" + monkeypatch.setenv("DD_LLMOBS_ENABLED", "0") + svc = llmobs_service(tracer=DummyTracer()) + pid = os.fork() + assert not svc.enabled, "both the parent and child should be disabled" + assert svc._llmobs_span_writer.status == ServiceStatus.STOPPED + assert svc._llmobs_eval_metric_writer.status == ServiceStatus.STOPPED + if not pid: + svc.disable() + os._exit(12) + + _, status = os.waitpid(pid, 0) + exit_code = os.WEXITSTATUS(status) + assert exit_code == 12 + svc.disable() + + +def test_llmobs_fork_disabled_then_enabled(monkeypatch): + """Test that after being initially disabled, the service can be enabled in a fork""" + monkeypatch.setenv("DD_LLMOBS_ENABLED", "0") + svc = llmobs_service._instance + pid = os.fork() + assert not svc.enabled, "both the parent and child should be disabled" + assert svc._llmobs_span_writer.status == ServiceStatus.STOPPED + assert svc._llmobs_eval_metric_writer.status == ServiceStatus.STOPPED + if not pid: + # Enable the service in the child + with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): + monkeypatch.setenv("DD_LLMOBS_ENABLED", "1") + llmobs_service.enable(_tracer=DummyTracer()) + svc = llmobs_service._instance + assert svc._llmobs_span_writer.status == ServiceStatus.RUNNING + assert svc._llmobs_eval_metric_writer.status == ServiceStatus.RUNNING + svc.disable() + os._exit(12) + + _, status = os.waitpid(pid, 0) + exit_code = os.WEXITSTATUS(status) + assert exit_code == 12 + svc.disable() + + def test_annotation_context_modifies_span_tags(LLMObs): with LLMObs.annotation_context(tags={"foo": "bar"}): with LLMObs.agent(name="test_agent") as span: