Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): implement answer relevancy ragas metric #11738

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions ddtrace/llmobs/_evaluators/ragas/answer_relevancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import math
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import EVALUATION_SPAN_METADATA
from ddtrace.llmobs._evaluators.ragas.base import RagasBaseEvaluator
from ddtrace.llmobs._evaluators.ragas.base import _get_ml_app_for_ragas_trace


logger = get_logger(__name__)


class RagasAnswerRelevancyEvaluator(RagasBaseEvaluator):
"""A class used by EvaluatorRunner to conduct ragas answer relevancy evaluations
on LLM Observability span events. The job of an Evaluator is to take a span and
submit evaluation metrics based on the span's attributes.
"""

LABEL = "ragas_answer_relevancy"
METRIC_TYPE = "score"

def __init__(self, llmobs_service):
"""
Initialize an evaluator that uses the ragas library to generate a context precision score on finished LLM spans.

answer relevancy focuses on assessing how pertinent the generated answer is to a given question.
A lower score is assigned to answers that are incomplete or contain redundant information and higher scores
indicate better relevancy. This metric is computed using the question, contexts, and answer.

For more information, see https://docs.ragas.io/en/latest/concepts/metrics/available_metrics/answer_relevance/

The `ragas.metrics.answer_relevancy` instance is used for answer relevancy scores.
If there is no llm attribute set on this instance, it will be set to the
default `llm_factory()` from ragas which uses openai.
If there is no embedding attribute set on this instance, it will be to to the
default `embedding_factory()` from ragas which uses openai

:param llmobs_service: An instance of the LLM Observability service used for tracing the evaluation and
submitting evaluation metrics.

Raises: NotImplementedError if the ragas library is not found or if ragas version is not supported.
"""
super().__init__(llmobs_service)
self.ragas_answer_relevancy_instance = self._get_answer_relevancy_instance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumb question - what is the purpose of having a different LLM instance per eval metric runner? Are they not all references to the same base OpenAI() LLM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each eval metric runner has access to one instance of a ragas metric (answer_relevancy, context_precision, faithfulness) each of these ragas metrics have a seperate llm attribute. We maintain a reference to the ragas metric, not the llm attribute

but yea, the default is openai for all of them

self.answer_relevancy_output_parser = self.mini_ragas.RagasoutputParser(
pydantic_object=self.mini_ragas.AnswerRelevanceClassification
)

def _get_answer_relevancy_instance(self):
"""
This helper function ensures the answer relevancy instance used in
ragas evaluator is updated with the latest ragas answer relevancy instance
instance AND has an non-null llm
"""
if self.mini_ragas.answer_relevancy is None:
return None
ragas_answer_relevancy_instance = self.mini_ragas.answer_relevancy
if not ragas_answer_relevancy_instance.llm:
ragas_answer_relevancy_instance.llm = self.mini_ragas.llm_factory()
if not ragas_answer_relevancy_instance.embeddings:
ragas_answer_relevancy_instance.embeddings = self.mini_ragas.embedding_factory()
return ragas_answer_relevancy_instance

def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]:
"""
Performs a answer relevancy evaluation on an llm span event, returning either
- answer relevancy score (float) OR failure reason (str)
- evaluation metadata (dict)
If the ragas answer relevancy instance does not have `llm` set, we set `llm` using the `llm_factory()`
method from ragas which currently defaults to openai's gpt-4o-turbo.
"""
self.ragas_answer_relevancy_instance = self._get_answer_relevancy_instance()
lievan marked this conversation as resolved.
Show resolved Hide resolved
if not self.ragas_answer_relevancy_instance:
return "fail_answer_relevancy_is_none", {}

evaluation_metadata = {} # type: dict[str, Union[str, dict, list]]
trace_metadata = {} # type: dict[str, Union[str, dict, list]]

# initialize data we annotate for tracing ragas
score, question, answer, answer_classifications = (
math.nan,
None,
None,
None,
lievan marked this conversation as resolved.
Show resolved Hide resolved
)

with self.llmobs_service.workflow(
"dd-ragas.answer_relevancy", ml_app=_get_ml_app_for_ragas_trace(span_event)
) as ragas_ar_workflow:
try:
evaluation_metadata[EVALUATION_SPAN_METADATA] = self.llmobs_service.export_span(span=ragas_ar_workflow)

cp_inputs = self._extract_evaluation_inputs_from_span(span_event)
if cp_inputs is None:
logger.debug(
"Failed to extract question and contexts from "
"span sampled for `ragas_answer_relevancy` evaluation"
)
return "fail_extract_answer_relevancy_inputs", evaluation_metadata

question = cp_inputs["question"]
contexts = cp_inputs["contexts"]
answer = cp_inputs["answer"]
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before, doesn't seem necessary to explicitly separate into new variables since they're only called once immediately at most


prompt = self.ragas_answer_relevancy_instance.question_generation.format(
answer=answer,
context="\n".join(contexts),
)

# 'strictness' is a parameter that can be set to control the number of generations
lievan marked this conversation as resolved.
Show resolved Hide resolved
trace_metadata["strictness"] = self.ragas_answer_relevancy_instance.strictness
result = self.ragas_answer_relevancy_instance.llm.generate_text(
prompt, n=self.ragas_answer_relevancy_instance.strictness
)

try:
answers = [self.answer_relevancy_output_parser.parse(res.text) for res in result.generations[0]]
answers = [answer for answer in answers if answer is not None]
except Exception as e:
logger.debug("Failed to parse answer relevancy output: %s", e)
return "fail_parse_answer_relevancy_output", evaluation_metadata

gen_questions = [answer.question for answer in answers]
answer_classifications = [
{"question": answer.question, "noncommittal": answer.noncommittal} for answer in answers
]
trace_metadata["answer_classifications"] = answer_classifications
if all(q == "" for q in gen_questions):
logger.warning("Invalid JSON response. Expected dictionary with key 'question'")
return "fail_parse_answer_relevancy_output", evaluation_metadata

# calculate cosine similarity between the question and generated questions
with self.llmobs_service.workflow("dd-ragas.calculate_similarity") as ragas_cs_workflow:
cosine_sim = self.ragas_answer_relevancy_instance.calculate_similarity(question, gen_questions)
self.llmobs_service.annotate(
span=ragas_cs_workflow,
input_data={"question": question, "generated_questions": gen_questions},
output_data=cosine_sim.mean(),
)

score = cosine_sim.mean() * int(not any(answer.noncommittal for answer in answers))
return score, evaluation_metadata
finally:
self.llmobs_service.annotate(
span=ragas_ar_workflow,
input_data=span_event,
output_data=score,
metadata=trace_metadata,
)
221 changes: 221 additions & 0 deletions ddtrace/llmobs/_evaluators/ragas/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import traceback
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace.internal.logger import get_logger
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._constants import INTERNAL_CONTEXT_VARIABLE_KEYS
from ddtrace.llmobs._constants import INTERNAL_QUERY_VARIABLE_KEYS
from ddtrace.llmobs._constants import RAGAS_ML_APP_PREFIX


logger = get_logger(__name__)


class MiniRagas:
"""
A helper class to store instances of ragas classes and functions
that may or may not exist in a user's environment.
"""

def __init__(self):
import ragas

self.ragas_version = parse_version(ragas.__version__)
if self.ragas_version >= (0, 2, 0) or self.ragas_version < (0, 1, 10):
raise NotImplementedError(
"Ragas version: {} is not supported".format(self.ragas_version),
)

from ragas.llms import llm_factory

self.llm_factory = llm_factory

from ragas.llms.output_parser import RagasoutputParser

self.RagasoutputParser = RagasoutputParser

from ragas.metrics import context_precision

self.context_precision = context_precision

from ragas.metrics.base import ensembler

self.ensembler = ensembler

from ddtrace.llmobs._evaluators.ragas.models import ContextPrecisionVerification

self.ContextPrecisionVerification = ContextPrecisionVerification

from ragas.metrics import faithfulness

self.faithfulness = faithfulness

from ragas.metrics.base import get_segmenter

self.get_segmenter = get_segmenter

from ddtrace.llmobs._evaluators.ragas.models import StatementFaithfulnessAnswers

self.StatementFaithfulnessAnswers = StatementFaithfulnessAnswers

from ddtrace.llmobs._evaluators.ragas.models import StatementsAnswers

self.StatementsAnswers = StatementsAnswers

from ddtrace.llmobs._evaluators.ragas.models import AnswerRelevanceClassification
lievan marked this conversation as resolved.
Show resolved Hide resolved

self.AnswerRelevanceClassification = AnswerRelevanceClassification

from ragas.metrics import answer_relevancy

self.answer_relevancy = answer_relevancy

from ragas.embeddings import embedding_factory

self.embedding_factory = embedding_factory


def _get_ml_app_for_ragas_trace(span_event: dict) -> str:
"""
The `ml_app` spans generated from traces of ragas will be named as `dd-ragas-<ml_app>`
or `dd-ragas` if `ml_app` is not present in the span event.
"""
tags = span_event.get("tags", []) # list[str]
ml_app = None
for tag in tags:
if isinstance(tag, str) and tag.startswith("ml_app:"):
ml_app = tag.split(":")[1]
break
if not ml_app:
return RAGAS_ML_APP_PREFIX
return "{}-{}".format(RAGAS_ML_APP_PREFIX, ml_app)


class RagasBaseEvaluator:
lievan marked this conversation as resolved.
Show resolved Hide resolved
"""A class used by EvaluatorRunner to conduct ragas evaluations
on LLM Observability span events. The job of an Evaluator is to take a span and
submit evaluation metrics based on the span's attributes.
"""

LABEL = "ragas"
METRIC_TYPE = "score"

def __init__(self, llmobs_service):
"""
Initialize an evaluator that uses the ragas library to generate a score on finished LLM spans.

:param llmobs_service: An instance of the LLM Observability service used for tracing the evaluation and
submitting evaluation metrics.

Raises: NotImplementedError if the ragas library is not found or if ragas version is not supported.
"""
self.llmobs_service = llmobs_service
self.ragas_version = "unknown"
telemetry_state = "ok"
try:
self.mini_ragas = MiniRagas()
except Exception as e:
telemetry_state = "fail"
telemetry_writer.add_log(
level=TELEMETRY_LOG_LEVEL.ERROR,
message="Failed to import Ragas dependencies",
stack_trace=traceback.format_exc(),
tags={"ragas_version": self.ragas_version},
)
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
finally:
telemetry_writer.add_count_metric(
namespace=TELEMETRY_APM_PRODUCT.LLMOBS,
name="evaluators.init",
value=1,
tags=(
("evaluator_label", self.LABEL),
("state", telemetry_state),
("ragas_version", self.ragas_version),
),
)

def _extract_evaluation_inputs_from_span(self, span_event: dict) -> Optional[dict]:
"""
Extracts the question, answer, and context used as inputs for a ragas evaluation on a span event.

question - input.prompt.variables.question OR input.messages[-1].content
contexts - list of context prompt variables specified by
`input.prompt._dd_context_variable_keys` or defaults to `input.prompt.variables.context`
answer - output.messages[-1].content
"""
with self.llmobs_service.workflow("dd-ragas.extract_evaluation_inputs_from_span") as extract_inputs_workflow:
self.llmobs_service.annotate(span=extract_inputs_workflow, input_data=span_event)
question, answer, contexts = None, None, None

meta_io = span_event.get("meta")
if meta_io is None:
return None

meta_input = meta_io.get("input")
meta_output = meta_io.get("output")

if not (meta_input and meta_output):
return None

prompt = meta_input.get("prompt")
if prompt is None:
logger.debug(
"Failed to extract `prompt` from span for ragas evaluation",
)
return None
prompt_variables = prompt.get("variables")

input_messages = meta_input.get("messages")

messages = meta_output.get("messages")
if messages is not None and len(messages) > 0:
answer = messages[-1].get("content")

if prompt_variables:
context_keys = prompt.get(INTERNAL_CONTEXT_VARIABLE_KEYS, ["context"])
question_keys = prompt.get(INTERNAL_QUERY_VARIABLE_KEYS, ["question"])
contexts = [prompt_variables.get(key) for key in context_keys if prompt_variables.get(key)]
question = " ".join([prompt_variables.get(key) for key in question_keys if prompt_variables.get(key)])

if not question and input_messages is not None and len(input_messages) > 0:
question = input_messages[-1].get("content")

self.llmobs_service.annotate(
span=extract_inputs_workflow, output_data={"question": question, "contexts": contexts, "answer": answer}
)
if any(field is None for field in (question, contexts, answer)):
logger.debug("Failed to extract inputs required for ragas evaluation")
return None

return {"question": question, "contexts": contexts, "answer": answer}

def run_and_submit_evaluation(self, span_event: dict):
if not span_event:
return
score_result_or_failure, metric_metadata = self.evaluate(span_event)
telemetry_writer.add_count_metric(
TELEMETRY_APM_PRODUCT.LLMOBS,
"evaluators.run",
1,
tags=(
("evaluator_label", self.LABEL),
("state", score_result_or_failure if isinstance(score_result_or_failure, str) else "success"),
),
)
if isinstance(score_result_or_failure, float):
self.llmobs_service.submit_evaluation(
span_context={"trace_id": span_event.get("trace_id"), "span_id": span_event.get("span_id")},
label=self.LABEL,
metric_type=self.METRIC_TYPE,
value=score_result_or_failure,
metadata=metric_metadata,
)

def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]:
lievan marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError("evaluate method must be implemented by individual ragas metrics")
Loading
Loading