Skip to content

Commit

Permalink
Test with Python 3.13
Browse files Browse the repository at this point in the history
- Switch bytecode wrapping with wrapt.wrap_function_wrapper
- Implement unpatching
- Add patching tests
  • Loading branch information
Yun-Kim committed Jan 21, 2025
1 parent 4611816 commit 8ba66cc
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 217 deletions.
244 changes: 128 additions & 116 deletions ddtrace/contrib/internal/openai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from ddtrace import config
from ddtrace.contrib.internal.openai import _endpoint_hooks
from ddtrace.contrib.internal.openai.utils import _format_openai_api_key
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.contrib.trace_utils import wrap
from ddtrace.internal.logger import get_logger
from ddtrace.internal.schema import schematize_service_name
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.internal.wrapping import wrap
from ddtrace.llmobs._integrations import OpenAIIntegration
from ddtrace.trace import Pin

Expand Down Expand Up @@ -114,10 +116,6 @@ def get_version():
}


def _wrap_classmethod(obj, wrapper):
wrap(obj.__func__, wrapper)


def patch():
# Avoid importing openai at the module level, eventually will be an import hook
import openai
Expand All @@ -127,69 +125,103 @@ def patch():

Pin().onto(openai)
integration = OpenAIIntegration(integration_config=config.openai, openai=openai)
openai._datadog_integration = integration

if OPENAI_VERSION >= (1, 0, 0):
if OPENAI_VERSION >= (1, 8, 0):
wrap(openai._base_client.SyncAPIClient._process_response, _patched_convert(openai, integration))
wrap(openai._base_client.AsyncAPIClient._process_response, _patched_convert(openai, integration))
wrap(openai, "_base_client.SyncAPIClient._process_response", patched_convert(openai))
wrap(openai, "_base_client.AsyncAPIClient._process_response", patched_convert(openai))
else:
wrap(openai._base_client.BaseClient._process_response, _patched_convert(openai, integration))
wrap(openai.OpenAI.__init__, _patched_client_init(openai, integration))
wrap(openai.AsyncOpenAI.__init__, _patched_client_init(openai, integration))
wrap(openai.AzureOpenAI.__init__, _patched_client_init(openai, integration))
wrap(openai.AsyncAzureOpenAI.__init__, _patched_client_init(openai, integration))
wrap(openai, "_base_client.BaseClient._process_response", patched_convert(openai))
wrap(openai, "OpenAI.__init__", patched_client_init(openai))
wrap(openai, "AsyncOpenAI.__init__", patched_client_init(openai))
wrap(openai, "AzureOpenAI.__init__", patched_client_init(openai))
wrap(openai, "AsyncAzureOpenAI.__init__", patched_client_init(openai))

for resource, method_hook_dict in _RESOURCES.items():
if deep_getattr(openai.resources, resource) is None:
continue
for method_name, endpoint_hook in method_hook_dict.items():
sync_method = deep_getattr(openai.resources, "%s.%s" % (resource, method_name))
async_method = deep_getattr(
openai.resources, "%s.%s" % (".Async".join(resource.split(".")), method_name)
)
wrap(sync_method, _patched_endpoint(openai, integration, endpoint_hook))
wrap(async_method, _patched_endpoint_async(openai, integration, endpoint_hook))
sync_method = "resources.{}.{}".format(resource, method_name)
async_method = "resources.{}.{}".format(".Async".join(resource.split(".")), method_name)
wrap(openai, sync_method, _patched_endpoint(openai, endpoint_hook))
wrap(openai, async_method, _patched_endpoint_async(openai, endpoint_hook))
else:
import openai.api_requestor

wrap(openai.api_requestor._make_session, _patched_make_session)
wrap(openai.util.convert_to_openai_object, _patched_convert(openai, integration))
wrap(openai, "api_requestor._make_session", _patched_make_session)
wrap(openai, "util.convert_to_openai_object", patched_convert(openai))

for resource, method_hook_dict in _RESOURCES.items():
if deep_getattr(openai.api_resources, resource) is None:
continue
for method_name, endpoint_hook in method_hook_dict.items():
sync_method = deep_getattr(openai.api_resources, "%s.%s" % (resource, method_name))
async_method = deep_getattr(openai.api_resources, "%s.a%s" % (resource, method_name))
_wrap_classmethod(sync_method, _patched_endpoint(openai, integration, endpoint_hook))
_wrap_classmethod(async_method, _patched_endpoint_async(openai, integration, endpoint_hook))
sync_method = "api_resources.{}.{}".format(resource, method_name)
async_method = "api_resources.{}.a{}".format(resource, method_name)
wrap(openai, sync_method, _patched_endpoint(openai, endpoint_hook))
wrap(openai, async_method, _patched_endpoint_async(openai, endpoint_hook))

openai.__datadog_patch = True


def unpatch():
# FIXME: add unpatching. The current wrapping.unwrap method requires
# the wrapper function to be provided which we don't keep a reference to.
pass
import openai

if not getattr(openai, "__datadog_patch", False):
return

openai.__datadog_patch = False

if OPENAI_VERSION >= (1, 0, 0):
if OPENAI_VERSION >= (1, 8, 0):
unwrap(openai._base_client.SyncAPIClient, "_process_response")
unwrap(openai._base_client.AsyncAPIClient, "_process_response")
else:
unwrap(openai._base_client.BaseClient, "_process_response")
unwrap(openai.OpenAI, "__init__")
unwrap(openai.AsyncOpenAI, "__init__")
unwrap(openai.AzureOpenAI, "__init__")
unwrap(openai.AsyncAzureOpenAI, "__init__")

for resource, method_hook_dict in _RESOURCES.items():
if deep_getattr(openai.resources, resource) is None:
continue
for method_name, _ in method_hook_dict.items():
sync_resource = deep_getattr(openai.resources, resource)
async_resource = deep_getattr(openai.resources, ".Async".join(resource.split(".")))
unwrap(sync_resource, method_name)
unwrap(async_resource, method_name)
else:
import openai.api_requestor

unwrap(openai.api_requestor, "_make_session")
wrap(openai.util, "convert_to_openai_object")

for resource, method_hook_dict in _RESOURCES.items():
if deep_getattr(openai.api_resources, resource) is None:
continue
for method_name, _ in method_hook_dict.items():
resource_obj = deep_getattr(openai.api_resources, resource)
unwrap(resource_obj, method_name)
unwrap(resource_obj, "a{}".format(method_name))

delattr(openai, "_datadog_integration")

def _patched_client_init(openai, integration):

@with_traced_module
def patched_client_init(openai, pin, func, instance, args, kwargs):
"""
Patch for `openai.OpenAI/AsyncOpenAI` client init methods to add the client object to the OpenAIIntegration object.
"""

def patched_client_init(func, args, kwargs):
func(*args, **kwargs)
client = args[0]
integration._client = client
api_key = kwargs.get("api_key")
if api_key is None:
api_key = client.api_key
if api_key is not None:
integration.user_api_key = api_key
return

return patched_client_init
func(*args, **kwargs)
integration = openai._datadog_integration
integration._client = instance
api_key = kwargs.get("api_key")
if api_key is None:
api_key = instance.api_key
if api_key is not None:
integration.user_api_key = api_key
return


def _patched_make_session(func, args, kwargs):
Expand Down Expand Up @@ -238,18 +270,10 @@ def _traced_endpoint(endpoint_hook, integration, pin, args, kwargs):
integration.metric(span, "dist", "request.duration", span.duration_ns)


def _patched_endpoint(openai, integration, patch_hook):
def patched_endpoint(func, args, kwargs):
# FIXME: this is a temporary workaround for the fact that our bytecode wrapping seems to modify
# a function keyword argument into a cell when it shouldn't. This is only an issue on
# Python 3.11+.
if sys.version_info >= (3, 11) and kwargs.get("encoding_format", None):
kwargs["encoding_format"] = kwargs["encoding_format"].cell_contents

pin = Pin._find(openai, args[0])
if not pin or not pin.enabled():
return func(*args, **kwargs)

def _patched_endpoint(openai, patch_hook):
@with_traced_module
def patched_endpoint(openai, pin, func, instance, args, kwargs):
integration = openai._datadog_integration
g = _traced_endpoint(patch_hook, integration, pin, args, kwargs)
g.send(None)
resp, err = None, None
Expand All @@ -267,21 +291,14 @@ def patched_endpoint(func, args, kwargs):
# This return takes priority over `return resp`
return e.value # noqa: B012

return patched_endpoint
return patched_endpoint(openai)


def _patched_endpoint_async(openai, integration, patch_hook):
def _patched_endpoint_async(openai, patch_hook):
# Same as _patched_endpoint but async
async def patched_endpoint(func, args, kwargs):
# FIXME: this is a temporary workaround for the fact that our bytecode wrapping seems to modify
# a function keyword argument into a cell when it shouldn't. This is only an issue on
# Python 3.11+.
if sys.version_info >= (3, 11) and kwargs.get("encoding_format", None):
kwargs["encoding_format"] = kwargs["encoding_format"].cell_contents

pin = Pin._find(openai, args[0])
if not pin or not pin.enabled():
return await func(*args, **kwargs)
@with_traced_module
async def patched_endpoint(openai, pin, func, instance, args, kwargs):
integration = openai._datadog_integration
g = _traced_endpoint(patch_hook, integration, pin, args, kwargs)
g.send(None)
resp, err = None, None
Expand All @@ -304,59 +321,54 @@ async def patched_endpoint(func, args, kwargs):
# This return takes priority over `return resp`
return e.value # noqa: B012

return patched_endpoint
return patched_endpoint(openai)


def _patched_convert(openai, integration):
def patched_convert(func, args, kwargs):
"""Patch convert captures header information in the openai response"""
pin = Pin.get_from(openai)
if not pin or not pin.enabled():
return func(*args, **kwargs)
@with_traced_module
def patched_convert(openai, pin, func, instance, args, kwargs):
"""Patch convert captures header information in the openai response"""
integration = openai._datadog_integration
span = pin.tracer.current_span()
if not span:
return func(*args, **kwargs)

span = pin.tracer.current_span()
if not span:
if OPENAI_VERSION < (1, 0, 0):
resp = args[0]
if not isinstance(resp, openai.openai_response.OpenAIResponse):
return func(*args, **kwargs)

if OPENAI_VERSION < (1, 0, 0):
resp = args[0]
if not isinstance(resp, openai.openai_response.OpenAIResponse):
return func(*args, **kwargs)
headers = resp._headers
else:
resp = kwargs.get("response", {})
headers = resp.headers
# This function is called for each chunk in the stream.
# To prevent needlessly setting the same tags for each chunk, short-circuit here.
if span.get_tag("openai.organization.name") is not None:
return func(*args, **kwargs)
if headers.get("openai-organization"):
org_name = headers.get("openai-organization")
span.set_tag_str("openai.organization.name", org_name)

# Gauge total rate limit
if headers.get("x-ratelimit-limit-requests"):
v = headers.get("x-ratelimit-limit-requests")
if v is not None:
integration.metric(span, "gauge", "ratelimit.requests", int(v))
span.set_metric("openai.organization.ratelimit.requests.limit", int(v))
if headers.get("x-ratelimit-limit-tokens"):
v = headers.get("x-ratelimit-limit-tokens")
if v is not None:
integration.metric(span, "gauge", "ratelimit.tokens", int(v))
span.set_metric("openai.organization.ratelimit.tokens.limit", int(v))
# Gauge and set span info for remaining requests and tokens
if headers.get("x-ratelimit-remaining-requests"):
v = headers.get("x-ratelimit-remaining-requests")
if v is not None:
integration.metric(span, "gauge", "ratelimit.remaining.requests", int(v))
span.set_metric("openai.organization.ratelimit.requests.remaining", int(v))
if headers.get("x-ratelimit-remaining-tokens"):
v = headers.get("x-ratelimit-remaining-tokens")
if v is not None:
integration.metric(span, "gauge", "ratelimit.remaining.tokens", int(v))
span.set_metric("openai.organization.ratelimit.tokens.remaining", int(v))

headers = resp._headers
else:
resp = kwargs.get("response", {})
headers = resp.headers
# This function is called for each chunk in the stream.
# To prevent needlessly setting the same tags for each chunk, short-circuit here.
if span.get_tag("openai.organization.name") is not None:
return func(*args, **kwargs)

return patched_convert
if headers.get("openai-organization"):
org_name = headers.get("openai-organization")
span.set_tag_str("openai.organization.name", org_name)

# Gauge total rate limit
if headers.get("x-ratelimit-limit-requests"):
v = headers.get("x-ratelimit-limit-requests")
if v is not None:
integration.metric(span, "gauge", "ratelimit.requests", int(v))
span.set_metric("openai.organization.ratelimit.requests.limit", int(v))
if headers.get("x-ratelimit-limit-tokens"):
v = headers.get("x-ratelimit-limit-tokens")
if v is not None:
integration.metric(span, "gauge", "ratelimit.tokens", int(v))
span.set_metric("openai.organization.ratelimit.tokens.limit", int(v))
# Gauge and set span info for remaining requests and tokens
if headers.get("x-ratelimit-remaining-requests"):
v = headers.get("x-ratelimit-remaining-requests")
if v is not None:
integration.metric(span, "gauge", "ratelimit.remaining.requests", int(v))
span.set_metric("openai.organization.ratelimit.requests.remaining", int(v))
if headers.get("x-ratelimit-remaining-tokens"):
v = headers.get("x-ratelimit-remaining-tokens")
if v is not None:
integration.metric(span, "gauge", "ratelimit.remaining.tokens", int(v))
span.set_metric("openai.organization.ratelimit.tokens.remaining", int(v))

return func(*args, **kwargs)
4 changes: 2 additions & 2 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2504,14 +2504,14 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
},
),
Venv(
pys=select_pys(min_version="3.7", max_version="3.11"),
pys=select_pys(min_version="3.7"),
pkgs={
"openai[embeddings,datalib]": ["==1.1.1", "==1.30.1"],
"pillow": "==9.5.0",
},
),
Venv(
pys=select_pys(min_version="3.8", max_version="3.11"),
pys=select_pys(min_version="3.8"),
pkgs={
"openai[datalib]": ["==1.30.1"],
"tiktoken": latest,
Expand Down
Loading

0 comments on commit 8ba66cc

Please sign in to comment.