From e31a52855eaf0c880f97ae748c4739d77dc7a6aa Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Tue, 23 Jul 2024 12:25:55 +0200 Subject: [PATCH 01/33] Wrap call to 'predict' with usage_collector --- inference/core/models/base.py | 4 +++- inference/usage_tracking/collector.py | 18 +++++++++++++----- inference/usage_tracking/utils.py | 10 ++++++++-- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/inference/core/models/base.py b/inference/core/models/base.py index bef2d83369..4af552c21a 100644 --- a/inference/core/models/base.py +++ b/inference/core/models/base.py @@ -7,6 +7,7 @@ from inference.core.entities.requests.inference import InferenceRequest from inference.core.entities.responses.inference import InferenceResponse from inference.core.models.types import PreprocessReturnMetadata +from inference.usage_tracking.collector import usage_collector class BaseInference: @@ -24,7 +25,8 @@ def infer(self, image: Any, **kwargs) -> Any: logger.debug( f"Preprocessed input shape: {getattr(preproc_image, 'shape', None)}" ) - predicted_arrays = self.predict(preproc_image, **kwargs) + metered_predict = usage_collector(self.predict) + predicted_arrays = metered_predict(preproc_image, **kwargs) postprocessed = self.postprocess(predicted_arrays, returned_metadata, **kwargs) return postprocessed diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 9536ced7e2..6d5f4c1d95 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -645,12 +645,13 @@ def _extract_usage_params_from_func_kwargs( resource_details = {} resource_id = "" category = None + enterprise = False + # TODO: add requires_api_key, True if workflow definition comes from platform or model comes from workspace if "workflow" in func_kwargs: workflow: CompiledWorkflow = func_kwargs["workflow"] if hasattr(workflow, "workflow_definition"): - # TODO: handle enterprise blocks here + # TODO: extend ParsedWorkflowDefinition to expose `enterprise` workflow_definition = workflow.workflow_definition - enterprise = False if hasattr(workflow, "init_parameters"): init_parameters = workflow.init_parameters if "workflows_core.api_key" in init_parameters: @@ -667,9 +668,16 @@ def _extract_usage_params_from_func_kwargs( resource_details=resource_details ) category = "workflows" - elif "model_id" in func_kwargs: - # TODO: handle model - pass + elif hasattr(func, "__self__"): + _self = func.__self__ + if hasattr(_self, "dataset_id") and hasattr(_self, "version_id"): + model_id = f"{_self.dataset_id}/{_self.version_id}" + category = "model" + resource_id = model_id + else: + resource_id = "unknown" + category = "unknown" + source = None runtime_parameters = func_kwargs.get("runtime_parameters") if ( diff --git a/inference/usage_tracking/utils.py b/inference/usage_tracking/utils.py index edeacf37f1..6c78ea3b5c 100644 --- a/inference/usage_tracking/utils.py +++ b/inference/usage_tracking/utils.py @@ -20,7 +20,13 @@ def collect_func_params( for default_arg in defaults: params[default_arg] = signature.parameters[default_arg].default - if set(params) != set(signature.parameters): - logger.error("Params mismatch for %s.%s", func.__module__, func.__name__) + signature_params = set(signature.parameters) + if set(params) != signature_params: + if "kwargs" in signature_params: + params["kwargs"] = kwargs + if "args" in signature_params: + params["args"] = args + if not set(params).issuperset(signature_params): + logger.error("Params mismatch for %s.%s", func.__module__, func.__name__) return params From 0f7bd463afef4779b30023d5c85af3eec1ce4087 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:07:49 +0200 Subject: [PATCH 02/33] Add model tracking to inference pipeline --- .../core/interfaces/stream/inference_pipeline.py | 1 - .../stream/model_handlers/roboflow_models.py | 2 ++ inference/core/models/base.py | 4 ++-- inference/usage_tracking/collector.py | 15 +++++++++++++-- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/inference/core/interfaces/stream/inference_pipeline.py b/inference/core/interfaces/stream/inference_pipeline.py index 8465670df9..f0c1902cc7 100644 --- a/inference/core/interfaces/stream/inference_pipeline.py +++ b/inference/core/interfaces/stream/inference_pipeline.py @@ -54,7 +54,6 @@ from inference.core.workflows.core_steps.common.entities import StepExecutionMode from inference.models.aliases import resolve_roboflow_model_alias from inference.models.utils import ROBOFLOW_MODEL_TYPES, get_model -from inference.usage_tracking.collector import usage_collector INFERENCE_PIPELINE_CONTEXT = "inference_pipeline" SOURCE_CONNECTION_ATTEMPT_FAILED_EVENT = "SOURCE_CONNECTION_ATTEMPT_FAILED" diff --git a/inference/core/interfaces/stream/model_handlers/roboflow_models.py b/inference/core/interfaces/stream/model_handlers/roboflow_models.py index 85a00654ab..3145b135ba 100644 --- a/inference/core/interfaces/stream/model_handlers/roboflow_models.py +++ b/inference/core/interfaces/stream/model_handlers/roboflow_models.py @@ -15,6 +15,8 @@ def default_process_frame( predictions = wrap_in_list( model.infer( [f.image for f in video_frame], + usage_fps=video_frame[0].fps, + usage_api_key=model.api_key, **postprocessing_args, ) ) diff --git a/inference/core/models/base.py b/inference/core/models/base.py index 4af552c21a..5dbdec3da6 100644 --- a/inference/core/models/base.py +++ b/inference/core/models/base.py @@ -16,6 +16,7 @@ class BaseInference: This class provides a basic interface for inference tasks. """ + @usage_collector def infer(self, image: Any, **kwargs) -> Any: """Runs inference on given data. - image: @@ -25,8 +26,7 @@ def infer(self, image: Any, **kwargs) -> Any: logger.debug( f"Preprocessed input shape: {getattr(preproc_image, 'shape', None)}" ) - metered_predict = usage_collector(self.predict) - predicted_arrays = metered_predict(preproc_image, **kwargs) + predicted_arrays = self.predict(preproc_image, **kwargs) postprocessed = self.postprocess(predicted_arrays, returned_metadata, **kwargs) return postprocessed diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 6d5f4c1d95..f1bf5a2f59 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -668,12 +668,23 @@ def _extract_usage_params_from_func_kwargs( resource_details=resource_details ) category = "workflows" - elif hasattr(func, "__self__"): - _self = func.__self__ + elif "self" in func_kwargs: + _self = func_kwargs["self"] if hasattr(_self, "dataset_id") and hasattr(_self, "version_id"): model_id = f"{_self.dataset_id}/{_self.version_id}" category = "model" resource_id = model_id + elif isinstance(kwargs, dict) and "model_id" in kwargs: + model_id = kwargs["model_id"] + category = "model" + resource_id = model_id + else: + resource_id = "unknown" + category = "unknown" + if isinstance(kwargs, dict) and "source" in kwargs: + resource_details["source"] = kwargs["source"] + if hasattr(_self, "task_type"): + resource_details["task_type"] = _self.task_type else: resource_id = "unknown" category = "unknown" From d34b6f4409344d180c166e8718423da9f396cd80 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 24 Jul 2024 16:45:31 +0200 Subject: [PATCH 03/33] formatting --- inference/usage_tracking/collector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index f1bf5a2f59..4b1494929f 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -680,7 +680,7 @@ def _extract_usage_params_from_func_kwargs( resource_id = model_id else: resource_id = "unknown" - category = "unknown" + category = "unknown" if isinstance(kwargs, dict) and "source" in kwargs: resource_details["source"] = kwargs["source"] if hasattr(_self, "task_type"): From eccb76fee4afc3b47926bd5f71d04d257bc66547 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:14:56 +0200 Subject: [PATCH 04/33] Handle test inference run --- inference/core/models/roboflow.py | 2 +- inference/usage_tracking/collector.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/inference/core/models/roboflow.py b/inference/core/models/roboflow.py index 5a28b3821d..a6e9dfc8de 100644 --- a/inference/core/models/roboflow.py +++ b/inference/core/models/roboflow.py @@ -667,7 +667,7 @@ def validate_model(self) -> None: def run_test_inference(self) -> None: test_image = (np.random.rand(1024, 1024, 3) * 255).astype(np.uint8) logger.debug(f"Running test inference. Image size: {test_image.shape}") - result = self.infer(test_image) + result = self.infer(test_image, usage_inference_test_run=True) logger.debug(f"Test inference finished.") return result diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 4b1494929f..756b5e5255 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -429,6 +429,7 @@ def _update_usage_payload( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", + usage_inference_test_run: bool = False, fps: float = 0, enterprise: bool = False, ): @@ -441,9 +442,9 @@ def _update_usage_payload( if not source_usage["timestamp_start"]: source_usage["timestamp_start"] = time.time_ns() source_usage["timestamp_stop"] = time.time_ns() - source_usage["processed_frames"] += frames + source_usage["processed_frames"] += frames if not usage_inference_test_run else 0 source_usage["fps"] = round(fps, 2) - source_usage["source_duration"] += frames / fps if fps else 0 + source_usage["source_duration"] += frames / fps if fps and not usage_inference_test_run else 0 source_usage["category"] = category source_usage["resource_id"] = resource_id source_usage["api_key_hash"] = api_key_hash @@ -459,6 +460,7 @@ def record_usage( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", + usage_inference_test_run: bool = False, fps: float = 0, ) -> DefaultDict[str, Any]: if self._settings.opt_out and not enterprise: @@ -481,6 +483,7 @@ def record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, + usage_inference_test_run=usage_inference_test_run, fps=fps, enterprise=enterprise, ) @@ -596,7 +599,7 @@ def _offload_to_api(self, payloads: List[APIKeyUsage]): logger.debug( "Failed to send usage - got %s status code (%s)", response.status_code, - response.raw, + response.content, ) api_keys_hashes_failed.add(api_key_hash) continue @@ -637,6 +640,7 @@ def _extract_usage_params_from_func_kwargs( usage_fps: float, usage_api_key: str, usage_workflow_id: str, + usage_inference_test_run: bool, func: Callable[[Any], Any], args: List[Any], kwargs: Dict[str, Any], @@ -709,6 +713,7 @@ def _extract_usage_params_from_func_kwargs( "category": category, "resource_details": resource_details, "resource_id": resource_id, + "usage_inference_test_run": usage_inference_test_run, "fps": usage_fps, "enterprise": enterprise, } @@ -720,6 +725,7 @@ def sync_wrapper( usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", + usage_inference_test_run: bool = False, **kwargs, ): self.record_usage( @@ -727,6 +733,7 @@ def sync_wrapper( usage_fps=usage_fps, usage_api_key=usage_api_key, usage_workflow_id=usage_workflow_id, + usage_inference_test_run=usage_inference_test_run, func=func, args=args, kwargs=kwargs, @@ -740,6 +747,7 @@ async def async_wrapper( usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", + usage_inference_test_run: bool = False, **kwargs, ): await self.async_record_usage( @@ -747,6 +755,7 @@ async def async_wrapper( usage_fps=usage_fps, usage_api_key=usage_api_key, usage_workflow_id=usage_workflow_id, + usage_inference_test_run=usage_inference_test_run, func=func, args=args, kwargs=kwargs, From c34731af39357481ff459bd97698e77c27d6c3e5 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 29 Jul 2024 14:34:08 +0200 Subject: [PATCH 05/33] fix tests --- .../core/interfaces/stream/test_interface_pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/inference/unit_tests/core/interfaces/stream/test_interface_pipeline.py b/tests/inference/unit_tests/core/interfaces/stream/test_interface_pipeline.py index 983170b58d..cff719b09d 100644 --- a/tests/inference/unit_tests/core/interfaces/stream/test_interface_pipeline.py +++ b/tests/inference/unit_tests/core/interfaces/stream/test_interface_pipeline.py @@ -116,6 +116,8 @@ def __next__(self) -> VideoFrame: class ModelStub: + def __init__(self): + self.api_key = None def infer(self, image: Any, **kwargs) -> List[ObjectDetectionInferenceResponse]: return [ ObjectDetectionInferenceResponse( From a65c76321de9e691533cd2feeecc6f2fea5e2e5e Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:40:37 +0200 Subject: [PATCH 06/33] Add wrapper annotation to preserve wrapped methods docstring in IDE --- inference/usage_tracking/collector.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 756b5e5255..5c4d7b0e01 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -10,6 +10,7 @@ from functools import wraps from queue import Queue from threading import Event, Lock, Thread +from typing_extensions import Any, Callable, DefaultDict, Dict, List, Optional, ParamSpec, Tuple, TypeVar, Union from uuid import uuid4 import requests @@ -36,6 +37,10 @@ from .config import TelemetrySettings, get_telemetry_settings from .sqlite_queue import SQLiteQueue + +T = TypeVar('T') +P = ParamSpec('P') + ResourceID = str Usage = Union[DefaultDict[str, Any], Dict[str, Any]] ResourceUsage = Union[DefaultDict[ResourceID, Usage], Dict[ResourceID, Usage]] @@ -497,6 +502,7 @@ async def async_record_usage( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", + usage_inference_test_run: bool = False, fps: float = 0, ) -> DefaultDict[str, Any]: if self._async_lock: @@ -509,6 +515,7 @@ async def async_record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, + usage_inference_test_run=usage_inference_test_run, fps=fps, ) else: @@ -520,6 +527,7 @@ async def async_record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, + usage_inference_test_run=usage_inference_test_run, fps=fps, ) @@ -718,16 +726,16 @@ def _extract_usage_params_from_func_kwargs( "enterprise": enterprise, } - def __call__(self, func: Callable[[Any], Any]): + def __call__(self, func: Callable[P, T]) -> Callable[P, T]: @wraps(func) def sync_wrapper( - *args, + *args: P.args, usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", usage_inference_test_run: bool = False, - **kwargs, - ): + **kwargs: P.kwargs, + ) -> T: self.record_usage( **self._extract_usage_params_from_func_kwargs( usage_fps=usage_fps, @@ -743,13 +751,13 @@ def sync_wrapper( @wraps(func) async def async_wrapper( - *args, + *args: P.args, usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", usage_inference_test_run: bool = False, - **kwargs, - ): + **kwargs: P.kwargs, + ) -> T: await self.async_record_usage( **self._extract_usage_params_from_func_kwargs( usage_fps=usage_fps, From 6ce025e18a43bc03b094cdf13b3f62341c0c95fa Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:07:20 +0200 Subject: [PATCH 07/33] formatting --- inference/usage_tracking/collector.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 5c4d7b0e01..118806cfdc 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -10,7 +10,18 @@ from functools import wraps from queue import Queue from threading import Event, Lock, Thread -from typing_extensions import Any, Callable, DefaultDict, Dict, List, Optional, ParamSpec, Tuple, TypeVar, Union +from typing_extensions import ( + Any, + Callable, + DefaultDict, + Dict, + List, + Optional, + ParamSpec, + Tuple, + TypeVar, + Union, +) from uuid import uuid4 import requests @@ -38,8 +49,8 @@ from .sqlite_queue import SQLiteQueue -T = TypeVar('T') -P = ParamSpec('P') +T = TypeVar("T") +P = ParamSpec("P") ResourceID = str Usage = Union[DefaultDict[str, Any], Dict[str, Any]] @@ -447,9 +458,13 @@ def _update_usage_payload( if not source_usage["timestamp_start"]: source_usage["timestamp_start"] = time.time_ns() source_usage["timestamp_stop"] = time.time_ns() - source_usage["processed_frames"] += frames if not usage_inference_test_run else 0 + source_usage["processed_frames"] += ( + frames if not usage_inference_test_run else 0 + ) source_usage["fps"] = round(fps, 2) - source_usage["source_duration"] += frames / fps if fps and not usage_inference_test_run else 0 + source_usage["source_duration"] += ( + frames / fps if fps and not usage_inference_test_run else 0 + ) source_usage["category"] = category source_usage["resource_id"] = resource_id source_usage["api_key_hash"] = api_key_hash From f05f59cb1aa00d4ee4c0e00486b1904820d1b827 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:07:44 +0200 Subject: [PATCH 08/33] sort imports --- inference/usage_tracking/collector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 118806cfdc..1d50714f6a 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -10,6 +10,9 @@ from functools import wraps from queue import Queue from threading import Event, Lock, Thread +from uuid import uuid4 + +import requests from typing_extensions import ( Any, Callable, @@ -48,7 +51,6 @@ from .config import TelemetrySettings, get_telemetry_settings from .sqlite_queue import SQLiteQueue - T = TypeVar("T") P = ParamSpec("P") From f661b2c8408b4250aa59248f8229904b98a6e34d Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:32:21 +0200 Subject: [PATCH 09/33] Rename usage_inference_test_run -> inference_test_run --- inference/usage_tracking/collector.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 1d50714f6a..a7755f8aba 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -35,7 +35,6 @@ Dict, List, Optional, - Set, Tuple, Union, ) @@ -447,7 +446,7 @@ def _update_usage_payload( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", - usage_inference_test_run: bool = False, + inference_test_run: bool = False, fps: float = 0, enterprise: bool = False, ): @@ -461,11 +460,11 @@ def _update_usage_payload( source_usage["timestamp_start"] = time.time_ns() source_usage["timestamp_stop"] = time.time_ns() source_usage["processed_frames"] += ( - frames if not usage_inference_test_run else 0 + frames if not inference_test_run else 0 ) source_usage["fps"] = round(fps, 2) source_usage["source_duration"] += ( - frames / fps if fps and not usage_inference_test_run else 0 + frames / fps if fps and not inference_test_run else 0 ) source_usage["category"] = category source_usage["resource_id"] = resource_id @@ -482,7 +481,7 @@ def record_usage( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", - usage_inference_test_run: bool = False, + inference_test_run: bool = False, fps: float = 0, ) -> DefaultDict[str, Any]: if self._settings.opt_out and not enterprise: @@ -505,7 +504,7 @@ def record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, - usage_inference_test_run=usage_inference_test_run, + inference_test_run=inference_test_run, fps=fps, enterprise=enterprise, ) @@ -519,7 +518,7 @@ async def async_record_usage( api_key: APIKey = "", resource_details: Optional[Dict[str, Any]] = None, resource_id: str = "", - usage_inference_test_run: bool = False, + inference_test_run: bool = False, fps: float = 0, ) -> DefaultDict[str, Any]: if self._async_lock: @@ -532,7 +531,7 @@ async def async_record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, - usage_inference_test_run=usage_inference_test_run, + inference_test_run=inference_test_run, fps=fps, ) else: @@ -544,7 +543,7 @@ async def async_record_usage( api_key=api_key, resource_details=resource_details, resource_id=resource_id, - usage_inference_test_run=usage_inference_test_run, + inference_test_run=inference_test_run, fps=fps, ) @@ -738,7 +737,7 @@ def _extract_usage_params_from_func_kwargs( "category": category, "resource_details": resource_details, "resource_id": resource_id, - "usage_inference_test_run": usage_inference_test_run, + "inference_test_run": usage_inference_test_run, "fps": usage_fps, "enterprise": enterprise, } From fa2cd275903ef30de7b88a4e41ccf9668271c1e6 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:35:02 +0200 Subject: [PATCH 10/33] formatting --- inference/usage_tracking/collector.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index a7755f8aba..5a99ae77f6 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -459,9 +459,7 @@ def _update_usage_payload( if not source_usage["timestamp_start"]: source_usage["timestamp_start"] = time.time_ns() source_usage["timestamp_stop"] = time.time_ns() - source_usage["processed_frames"] += ( - frames if not inference_test_run else 0 - ) + source_usage["processed_frames"] += frames if not inference_test_run else 0 source_usage["fps"] = round(fps, 2) source_usage["source_duration"] += ( frames / fps if fps and not inference_test_run else 0 From 9deedaac9bc8e4a169670e1a33fbb4cd1a390d33 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:36:40 +0200 Subject: [PATCH 11/33] sort improts --- inference/usage_tracking/collector.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 5a99ae77f6..0933f4eb09 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -25,19 +25,6 @@ TypeVar, Union, ) -from uuid import uuid4 - -import requests -from typing_extensions import ( - Any, - Callable, - DefaultDict, - Dict, - List, - Optional, - Tuple, - Union, -) from inference.core.env import API_KEY, LAMBDA, REDIS_HOST from inference.core.logger import logger From 9357c7ae281ab8fa811e819a867e494dd2399f4d Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 21:49:57 +0200 Subject: [PATCH 12/33] Limit records count when fetching payloads from sqlite db to avoid memory issues --- inference/usage_tracking/sqlite_queue.py | 31 ++++++++++++++++-------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/inference/usage_tracking/sqlite_queue.py b/inference/usage_tracking/sqlite_queue.py index 121bc69e71..a60aabb860 100644 --- a/inference/usage_tracking/sqlite_queue.py +++ b/inference/usage_tracking/sqlite_queue.py @@ -70,7 +70,7 @@ def put(self, payload: Any, connection: Optional[sqlite3.Connection] = None): self._insert(payload=payload_str, connection=connection) connection.close() except Exception as exc: - logger.debug("Failed to store usage records, %s", exc) + logger.debug("Failed to store usage records '%s', %s", payload, exc) return [] else: self._insert(payload=payload_str, connection=connection) @@ -95,7 +95,7 @@ def _count_rows(self, connection: sqlite3.Connection) -> int: count = int(cursor.fetchone()[0]) connection.commit() except Exception as exc: - logger.debug("Failed to store usage payload, %s", exc) + logger.debug("Failed to obtain records count, %s", exc) connection.rollback() cursor.close() @@ -118,10 +118,10 @@ def empty(self, connection: Optional[sqlite3.Connection] = None) -> bool: return rows_count == 0 - def _flush_db(self, connection: sqlite3.Connection) -> List[Dict[str, Any]]: + def _flush_db(self, connection: sqlite3.Connection, limit: int = 100) -> List[Dict[str, Any]]: cursor = connection.cursor() - sql_select = f"SELECT {self._col_name} FROM {self._tbl_name}" - sql_delete = f"DELETE FROM {self._tbl_name}" + sql_select = f"SELECT id, {self._col_name} FROM {self._tbl_name} ORDER BY id ASC LIMIT {limit}" + sql_delete = f"DELETE FROM {self._tbl_name} WHERE id >= ? and id <= ?" try: cursor.execute("BEGIN EXCLUSIVE") @@ -133,22 +133,33 @@ def _flush_db(self, connection: sqlite3.Connection) -> List[Dict[str, Any]]: try: cursor.execute(sql_select) payloads = cursor.fetchall() - cursor.execute(sql_delete) - connection.commit() - cursor.close() except Exception as exc: - logger.debug("Failed to store usage payload, %s", exc) + logger.debug("Failed to obtain records, %s", exc) connection.rollback() return [] parsed_payloads = [] - for (payload,) in payloads: + top_id = -1 + bottom_id = -1 + for (_id, payload) in payloads: + top_id = max(top_id, _id) + if bottom_id == -1: + bottom_id = _id + bottom_id = min(bottom_id, _id) try: parsed_payload = json.loads(payload) parsed_payloads.append(parsed_payload) except Exception as exc: logger.debug("Failed to parse usage payload %s, %s", payload, exc) + try: + cursor.execute(sql_delete, [bottom_id, top_id]) + connection.commit() + cursor.close() + except Exception as exc: + logger.debug("Failed to obtain records, %s", exc) + connection.rollback() + return parsed_payloads def get_nowait( From 695474914cc25cd2fc5902cccd5678ae4c92f933 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 21:50:48 +0200 Subject: [PATCH 13/33] formatting --- inference/usage_tracking/sqlite_queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inference/usage_tracking/sqlite_queue.py b/inference/usage_tracking/sqlite_queue.py index a60aabb860..d44ca89e3d 100644 --- a/inference/usage_tracking/sqlite_queue.py +++ b/inference/usage_tracking/sqlite_queue.py @@ -118,7 +118,9 @@ def empty(self, connection: Optional[sqlite3.Connection] = None) -> bool: return rows_count == 0 - def _flush_db(self, connection: sqlite3.Connection, limit: int = 100) -> List[Dict[str, Any]]: + def _flush_db( + self, connection: sqlite3.Connection, limit: int = 100 + ) -> List[Dict[str, Any]]: cursor = connection.cursor() sql_select = f"SELECT id, {self._col_name} FROM {self._tbl_name} ORDER BY id ASC LIMIT {limit}" sql_delete = f"DELETE FROM {self._tbl_name} WHERE id >= ? and id <= ?" @@ -141,7 +143,7 @@ def _flush_db(self, connection: sqlite3.Connection, limit: int = 100) -> List[Di parsed_payloads = [] top_id = -1 bottom_id = -1 - for (_id, payload) in payloads: + for _id, payload in payloads: top_id = max(top_id, _id) if bottom_id == -1: bottom_id = _id From e05b243f85013158fd96a00aa82beb7b64cec30d Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 23:23:32 +0200 Subject: [PATCH 14/33] Add redis sink to be used when running in hosted environment --- inference/usage_tracking/collector.py | 6 +++- inference/usage_tracking/redis_queue.py | 43 +++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 inference/usage_tracking/redis_queue.py diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 0933f4eb09..22c34f6871 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -36,6 +36,8 @@ from .config import TelemetrySettings, get_telemetry_settings from .sqlite_queue import SQLiteQueue +from .redis_queue import RedisQueue + T = TypeVar("T") P = ParamSpec("P") @@ -80,7 +82,9 @@ def __init__(self): exec_session_id=self._exec_session_id ) - if LAMBDA or self._settings.opt_out: + if LAMBDA and REDIS_HOST: + self._queue: "Queue[UsagePayload]" = RedisQueue() + elif LAMBDA or self._settings.opt_out: self._queue: "Queue[UsagePayload]" = Queue( maxsize=self._settings.queue_size ) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py new file mode 100644 index 0000000000..d50cf75ff9 --- /dev/null +++ b/inference/usage_tracking/redis_queue.py @@ -0,0 +1,43 @@ +import time +from threading import Lock + +from typing_extensions import Any, Dict, List, Optional + +from inference.core.cache import cache +from inference.core.cache.redis import RedisCache +from inference.core.logger import logger + + +class RedisQueue: + """ + Store and forget, keys with specified prefix are handled by external service + """ + def __init__( + self, + prefix: str = f"UsageCollector:{time.time()}", + redis_cache: Optional[RedisCache] = None, + ): + self._prefix: str = prefix + self._redis_cache: RedisCache = redis_cache or cache + self._increment: int = 0 + self._lock: Lock = Lock() + + def put(self, payload: Any): + with self._lock: + try: + self._increment += 1 + self._redis_cache.set( + key=f"{self._prefix}:{self._increment}", value=payload + ) + except Exception as exc: + logger.error("Failed to store usage records '%s', %s", payload, exc) + + @staticmethod + def full() -> bool: + return False + + def empty(self) -> bool: + return True + + def get_nowait(self) -> List[Dict[str, Any]]: + return [] From 3a0e8c44e862358e4473d6359ef2a9f0f63ab7d8 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 23:24:26 +0200 Subject: [PATCH 15/33] formatting --- inference/usage_tracking/collector.py | 3 +-- inference/usage_tracking/redis_queue.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 22c34f6871..aefba477f9 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -35,9 +35,8 @@ from inference.usage_tracking.utils import collect_func_params from .config import TelemetrySettings, get_telemetry_settings -from .sqlite_queue import SQLiteQueue from .redis_queue import RedisQueue - +from .sqlite_queue import SQLiteQueue T = TypeVar("T") P = ParamSpec("P") diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index d50cf75ff9..6d781c22f9 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -12,6 +12,7 @@ class RedisQueue: """ Store and forget, keys with specified prefix are handled by external service """ + def __init__( self, prefix: str = f"UsageCollector:{time.time()}", From 01ddd2d9450aa32baa5e7e97976815db7cabbf13 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 7 Aug 2024 23:58:31 +0200 Subject: [PATCH 16/33] Do not hash API keys when using Redis as off-load queue --- inference/usage_tracking/collector.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index aefba477f9..1cd0dfec6e 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -81,24 +81,32 @@ def __init__(self): exec_session_id=self._exec_session_id ) + self._hashed_api_keys: Dict[APIKey, APIKeyHash] = {} + self._api_keys_hashing_enabled = True + if LAMBDA and REDIS_HOST: + logger.debug("Persistence through RedisQueue") self._queue: "Queue[UsagePayload]" = RedisQueue() + self._api_keys_hashing_enabled = False elif LAMBDA or self._settings.opt_out: + logger.debug("No persistence") self._queue: "Queue[UsagePayload]" = Queue( maxsize=self._settings.queue_size ) + self._api_keys_hashing_enabled = False else: try: self._queue = SQLiteQueue() + logger.debug("Persistence through SQLiteQueue") except Exception as exc: logger.debug("Unable to create instance of SQLiteQueue, %s", exc) + logger.debug("No persistence") self._queue: "Queue[UsagePayload]" = Queue( maxsize=self._settings.queue_size ) + self._api_keys_hashing_enabled = False self._queue_lock = Lock() - self._hashed_api_keys: Dict[APIKey, APIKeyHash] = {} - self._system_info_sent: bool = False self._resource_details_lock = Lock() self._resource_details: DefaultDict[APIKey, Dict[ResourceID, bool]] = ( @@ -276,7 +284,10 @@ def _calculate_api_key_hash(self, api_key: APIKey) -> APIKeyHash: if api_key: api_key_hash = self._hashed_api_keys.get(api_key) if not api_key_hash: - api_key_hash = UsageCollector._hash(api_key) + if self._api_keys_hashing_enabled: + api_key_hash = UsageCollector._hash(api_key) + else: + api_key_hash = api_key self._hashed_api_keys[api_key] = api_key_hash return api_key_hash From a7981a32fa5c8440e2e966e844ddd4fd2c096745 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Fri, 9 Aug 2024 23:59:48 +0200 Subject: [PATCH 17/33] use api.roboflow.com as default API server --- inference/usage_tracking/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/usage_tracking/config.py b/inference/usage_tracking/config.py index 108209088b..80f97df280 100644 --- a/inference/usage_tracking/config.py +++ b/inference/usage_tracking/config.py @@ -13,7 +13,7 @@ class TelemetrySettings(BaseSettings): model_config = SettingsConfigDict(env_prefix="telemetry_") - api_usage_endpoint_url: str = "https://api.roboflow.one/usage/inference" + api_usage_endpoint_url: str = "https://api.roboflow.com/usage/inference" flush_interval: int = Field(default=10, ge=10, le=300) opt_out: Optional[bool] = False queue_size: int = Field(default=10, ge=10, le=10000) From c5363dd55bb174ea65b2bcc9ee4a50545b7824fe Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Sat, 10 Aug 2024 00:00:09 +0200 Subject: [PATCH 18/33] store keys in redis with score --- inference/usage_tracking/redis_queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index 6d781c22f9..a331edfcf7 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -27,8 +27,10 @@ def put(self, payload: Any): with self._lock: try: self._increment += 1 - self._redis_cache.set( - key=f"{self._prefix}:{self._increment}", value=payload + self._redis_cache.zadd( + key=f"{self._prefix}:{self._increment}", + value=payload, + score=time.time(), ) except Exception as exc: logger.error("Failed to store usage records '%s', %s", payload, exc) From 4e6e20d993e3abe2f12ea0e9fc7266211838ee87 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Sat, 10 Aug 2024 00:06:18 +0200 Subject: [PATCH 19/33] when using zadd, value is added under key, and then key is mapped for sorting --- inference/usage_tracking/redis_queue.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index a331edfcf7..f39d2ec38a 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -27,10 +27,14 @@ def put(self, payload: Any): with self._lock: try: self._increment += 1 - self._redis_cache.zadd( - key=f"{self._prefix}:{self._increment}", + redis_key = f"{self._prefix}:{self._increment}" + self._redis_cache.client.set( + key=redis_key, value=payload, - score=time.time(), + ) + self._redis_cache.client.zadd( + name="UsageCollector", + mapping={redis_key: time.time()}, ) except Exception as exc: logger.error("Failed to store usage records '%s', %s", payload, exc) From 7a87f1d3244f8975b8314ed14afaf2b2aebeb334 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Sat, 10 Aug 2024 01:02:01 +0200 Subject: [PATCH 20/33] drop payloads if 'processed_frames' are not there --- inference/usage_tracking/collector.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 1cd0dfec6e..876cbc9063 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -594,22 +594,22 @@ def _offload_to_api(self, payloads: List[APIKeyUsage]): api_keys_hashes_failed.add(api_key_hash) continue api_key = hashes_to_api_keys[api_key_hash] - if any("processed_frames" not in w for w in workflow_payloads.values()): - api_keys_hashes_failed.add(api_key_hash) - continue + complete_workflow_payloads = [ + w for w in workflow_payloads.values() if "processed_frames" in w + ] try: - for workflow_payload in workflow_payloads.values(): - if api_key_hash in workflow_payload: + for workflow_payload in complete_workflow_payloads: + if "api_key_hash" in workflow_payload: del workflow_payload["api_key_hash"] workflow_payload["api_key"] = api_key logger.debug( "Offloading usage to %s, payload: %s", self._settings.api_usage_endpoint_url, - workflow_payloads, + complete_workflow_payloads, ) response = requests.post( self._settings.api_usage_endpoint_url, - json=list(workflow_payloads.values()), + json=complete_workflow_payloads, verify=ssl_verify, headers={"Authorization": f"Bearer {api_key}"}, timeout=1, From cf93bdee5552aa296dea086526e52e6d8798e7a8 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 19 Aug 2024 13:12:50 +0200 Subject: [PATCH 21/33] Extract logic responsible for zipping usage payloads into separate utils submodule; bump version --- inference/core/version.py | 2 +- inference/usage_tracking/collector.py | 125 +----------------- inference/usage_tracking/collector_utils.py | 119 +++++++++++++++++ .../usage_tracking/test_collector.py | 23 ++-- 4 files changed, 134 insertions(+), 135 deletions(-) create mode 100644 inference/usage_tracking/collector_utils.py diff --git a/inference/core/version.py b/inference/core/version.py index 8b9c9d1e0a..11b4081a90 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.16.2" +__version__ = "0.16.3" if __name__ == "__main__": diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 876cbc9063..edac97ebd7 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -23,7 +23,6 @@ ParamSpec, Tuple, TypeVar, - Union, ) from inference.core.env import API_KEY, LAMBDA, REDIS_HOST @@ -34,6 +33,7 @@ ) from inference.usage_tracking.utils import collect_func_params +from .collector_utils import APIKey, APIKeyHash, APIKeyUsage, ResourceDetails, ResourceID, SystemDetails, UsagePayload, zip_usage_payloads from .config import TelemetrySettings, get_telemetry_settings from .redis_queue import RedisQueue from .sqlite_queue import SQLiteQueue @@ -41,16 +41,6 @@ T = TypeVar("T") P = ParamSpec("P") -ResourceID = str -Usage = Union[DefaultDict[str, Any], Dict[str, Any]] -ResourceUsage = Union[DefaultDict[ResourceID, Usage], Dict[ResourceID, Usage]] -APIKey = str -APIKeyHash = str -APIKeyUsage = Union[DefaultDict[APIKey, ResourceUsage], Dict[APIKey, ResourceUsage]] -ResourceDetails = Dict[str, Any] -SystemDetails = Dict[str, Any] -UsagePayload = Union[APIKeyUsage, ResourceDetails, SystemDetails] - class UsageCollector: _lock = Lock() @@ -143,23 +133,6 @@ def empty_usage_dict(exec_session_id: str) -> APIKeyUsage: ) ) - @staticmethod - def _merge_usage_dicts(d1: UsagePayload, d2: UsagePayload): - merged = {} - if d1 and d2 and d1.get("resource_id") != d2.get("resource_id"): - raise ValueError("Cannot merge usage for different resource IDs") - if "timestamp_start" in d1 and "timestamp_start" in d2: - merged["timestamp_start"] = min( - d1["timestamp_start"], d2["timestamp_start"] - ) - if "timestamp_stop" in d1 and "timestamp_stop" in d2: - merged["timestamp_stop"] = max(d1["timestamp_stop"], d2["timestamp_stop"]) - if "processed_frames" in d1 and "processed_frames" in d2: - merged["processed_frames"] = d1["processed_frames"] + d2["processed_frames"] - if "source_duration" in d1 and "source_duration" in d2: - merged["source_duration"] = d1["source_duration"] + d2["source_duration"] - return {**d1, **d2, **merged} - def _dump_usage_queue_no_lock(self) -> List[APIKeyUsage]: usage_payloads: List[APIKeyUsage] = [] while self._queue: @@ -178,100 +151,6 @@ def _dump_usage_queue_with_lock(self) -> List[APIKeyUsage]: usage_payloads = self._dump_usage_queue_no_lock() return usage_payloads - @staticmethod - def _get_api_key_usage_containing_resource( - api_key_hash: APIKey, usage_payloads: List[APIKeyUsage] - ) -> Optional[ResourceUsage]: - for usage_payload in usage_payloads: - for other_api_key_hash, resource_payloads in usage_payload.items(): - if api_key_hash and other_api_key_hash != api_key_hash: - continue - if other_api_key_hash == "": - continue - for resource_id, resource_usage in resource_payloads.items(): - if not resource_id: - continue - if not resource_usage or "resource_id" not in resource_usage: - continue - return resource_usage - return - - @staticmethod - def _zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: - merged_api_key_usage_payloads: APIKeyUsage = {} - system_info_payload = None - for usage_payload in usage_payloads: - for api_key_hash, resource_payloads in usage_payload.items(): - if api_key_hash == "": - if ( - resource_payloads - and len(resource_payloads) > 1 - or list(resource_payloads.keys()) != [""] - ): - logger.debug( - "Dropping usage payload %s due to missing API key", - resource_payloads, - ) - continue - api_key_usage_with_resource = ( - UsageCollector._get_api_key_usage_containing_resource( - api_key_hash=api_key_hash, - usage_payloads=usage_payloads, - ) - ) - if not api_key_usage_with_resource: - system_info_payload = resource_payloads - continue - api_key_hash = api_key_usage_with_resource["api_key_hash"] - resource_id = api_key_usage_with_resource["resource_id"] - category = api_key_usage_with_resource.get("category") - for v in resource_payloads.values(): - v["api_key_hash"] = api_key_hash - if "resource_id" not in v or not v["resource_id"]: - v["resource_id"] = resource_id - if "category" not in v or not v["category"]: - v["category"] = category - for ( - resource_usage_key, - resource_usage_payload, - ) in resource_payloads.items(): - if resource_usage_key == "": - api_key_usage_with_resource = ( - UsageCollector._get_api_key_usage_containing_resource( - api_key_hash=api_key_hash, - usage_payloads=usage_payloads, - ) - ) - if not api_key_usage_with_resource: - system_info_payload = {"": resource_usage_payload} - continue - resource_id = api_key_usage_with_resource["resource_id"] - category = api_key_usage_with_resource.get("category") - resource_usage_key = f"{category}:{resource_id}" - resource_usage_payload["api_key_hash"] = api_key_hash - resource_usage_payload["resource_id"] = resource_id - resource_usage_payload["category"] = category - merged_api_key_payload = merged_api_key_usage_payloads.setdefault( - api_key_hash, {} - ) - merged_resource_payload = merged_api_key_payload.setdefault( - resource_usage_key, {} - ) - merged_api_key_payload[resource_usage_key] = ( - UsageCollector._merge_usage_dicts( - merged_resource_payload, - resource_usage_payload, - ) - ) - - zipped_payloads = [merged_api_key_usage_payloads] - if system_info_payload: - system_info_api_key_hash = next(iter(system_info_payload.values()))[ - "api_key_hash" - ] - zipped_payloads.append({system_info_api_key_hash: system_info_payload}) - return zipped_payloads - @staticmethod def _hash(payload: str, length=5): payload_hash = hashlib.sha256(payload.encode()) @@ -305,7 +184,7 @@ def _enqueue_payload(self, payload: UsagePayload): else: usage_payloads = self._dump_usage_queue_no_lock() usage_payloads.append(payload) - merged_usage_payloads = self._zip_usage_payloads( + merged_usage_payloads = zip_usage_payloads( usage_payloads=usage_payloads, ) for usage_payload in merged_usage_payloads: diff --git a/inference/usage_tracking/collector_utils.py b/inference/usage_tracking/collector_utils.py new file mode 100644 index 0000000000..b008acecaa --- /dev/null +++ b/inference/usage_tracking/collector_utils.py @@ -0,0 +1,119 @@ +from typing import Any, DefaultDict, Dict, List, Optional, Union + + +ResourceID = str +Usage = Union[DefaultDict[str, Any], Dict[str, Any]] +ResourceUsage = Union[DefaultDict[ResourceID, Usage], Dict[ResourceID, Usage]] +APIKey = str +APIKeyHash = str +APIKeyUsage = Union[DefaultDict[APIKey, ResourceUsage], Dict[APIKey, ResourceUsage]] +ResourceDetails = Dict[str, Any] +SystemDetails = Dict[str, Any] +UsagePayload = Union[APIKeyUsage, ResourceDetails, SystemDetails] + + +def merge_usage_dicts(d1: UsagePayload, d2: UsagePayload): + merged = {} + if d1 and d2 and d1.get("resource_id") != d2.get("resource_id"): + raise ValueError("Cannot merge usage for different resource IDs") + if "timestamp_start" in d1 and "timestamp_start" in d2: + merged["timestamp_start"] = min( + d1["timestamp_start"], d2["timestamp_start"] + ) + if "timestamp_stop" in d1 and "timestamp_stop" in d2: + merged["timestamp_stop"] = max(d1["timestamp_stop"], d2["timestamp_stop"]) + if "processed_frames" in d1 and "processed_frames" in d2: + merged["processed_frames"] = d1["processed_frames"] + d2["processed_frames"] + if "source_duration" in d1 and "source_duration" in d2: + merged["source_duration"] = d1["source_duration"] + d2["source_duration"] + return {**d1, **d2, **merged} + + +def get_api_key_usage_containing_resource( + api_key_hash: APIKey, usage_payloads: List[APIKeyUsage] +) -> Optional[ResourceUsage]: + for usage_payload in usage_payloads: + for other_api_key_hash, resource_payloads in usage_payload.items(): + if api_key_hash and other_api_key_hash != api_key_hash: + continue + if other_api_key_hash == "": + continue + for resource_id, resource_usage in resource_payloads.items(): + if not resource_id: + continue + if not resource_usage or "resource_id" not in resource_usage: + continue + return resource_usage + return + + +def zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: + merged_api_key_usage_payloads: APIKeyUsage = {} + system_info_payload = None + for usage_payload in usage_payloads: + for api_key_hash, resource_payloads in usage_payload.items(): + if api_key_hash == "": + if ( + resource_payloads + and len(resource_payloads) > 1 + or list(resource_payloads.keys()) != [""] + ): + continue + api_key_usage_with_resource = ( + get_api_key_usage_containing_resource( + api_key_hash=api_key_hash, + usage_payloads=usage_payloads, + ) + ) + if not api_key_usage_with_resource: + system_info_payload = resource_payloads + continue + api_key_hash = api_key_usage_with_resource["api_key_hash"] + resource_id = api_key_usage_with_resource["resource_id"] + category = api_key_usage_with_resource.get("category") + for v in resource_payloads.values(): + v["api_key_hash"] = api_key_hash + if "resource_id" not in v or not v["resource_id"]: + v["resource_id"] = resource_id + if "category" not in v or not v["category"]: + v["category"] = category + for ( + resource_usage_key, + resource_usage_payload, + ) in resource_payloads.items(): + if resource_usage_key == "": + api_key_usage_with_resource = ( + get_api_key_usage_containing_resource( + api_key_hash=api_key_hash, + usage_payloads=usage_payloads, + ) + ) + if not api_key_usage_with_resource: + system_info_payload = {"": resource_usage_payload} + continue + resource_id = api_key_usage_with_resource["resource_id"] + category = api_key_usage_with_resource.get("category") + resource_usage_key = f"{category}:{resource_id}" + resource_usage_payload["api_key_hash"] = api_key_hash + resource_usage_payload["resource_id"] = resource_id + resource_usage_payload["category"] = category + merged_api_key_payload = merged_api_key_usage_payloads.setdefault( + api_key_hash, {} + ) + merged_resource_payload = merged_api_key_payload.setdefault( + resource_usage_key, {} + ) + merged_api_key_payload[resource_usage_key] = ( + merge_usage_dicts( + merged_resource_payload, + resource_usage_payload, + ) + ) + + zipped_payloads = [merged_api_key_usage_payloads] + if system_info_payload: + system_info_api_key_hash = next(iter(system_info_payload.values()))[ + "api_key_hash" + ] + zipped_payloads.append({system_info_api_key_hash: system_info_payload}) + return zipped_payloads diff --git a/tests/inference/unit_tests/usage_tracking/test_collector.py b/tests/inference/unit_tests/usage_tracking/test_collector.py index c19c143d5d..584b5f585b 100644 --- a/tests/inference/unit_tests/usage_tracking/test_collector.py +++ b/tests/inference/unit_tests/usage_tracking/test_collector.py @@ -5,6 +5,7 @@ from inference.core.env import LAMBDA from inference.usage_tracking.collector import UsageCollector +from inference.usage_tracking.collector_utils import get_api_key_usage_containing_resource, merge_usage_dicts, zip_usage_payloads def test_create_empty_usage_dict(): @@ -45,7 +46,7 @@ def test_merge_usage_dicts_raises_on_mismatched_resource_id(): usage_payload_2 = {"resource_id": "other"} with pytest.raises(ValueError): - UsageCollector._merge_usage_dicts(d1=usage_payload_1, d2=usage_payload_2) + merge_usage_dicts(d1=usage_payload_1, d2=usage_payload_2) def test_merge_usage_dicts_merge_with_empty(): @@ -61,11 +62,11 @@ def test_merge_usage_dicts_merge_with_empty(): usage_payload_2 = {"resource_id": "some", "api_key_hash": "some"} assert ( - UsageCollector._merge_usage_dicts(d1=usage_payload_1, d2=usage_payload_2) + merge_usage_dicts(d1=usage_payload_1, d2=usage_payload_2) == usage_payload_1 ) assert ( - UsageCollector._merge_usage_dicts(d1=usage_payload_2, d2=usage_payload_1) + merge_usage_dicts(d1=usage_payload_2, d2=usage_payload_1) == usage_payload_1 ) @@ -89,7 +90,7 @@ def test_merge_usage_dicts(): "source_duration": 1, } - assert UsageCollector._merge_usage_dicts( + assert merge_usage_dicts( d1=usage_payload_1, d2=usage_payload_2 ) == { "resource_id": "some", @@ -119,7 +120,7 @@ def test_get_api_key_usage_containing_resource_with_no_payload_containing_api_ke ] # when - api_key_usage_with_resource = UsageCollector._get_api_key_usage_containing_resource( + api_key_usage_with_resource = get_api_key_usage_containing_resource( api_key_hash="fake", usage_payloads=usage_payloads ) @@ -167,7 +168,7 @@ def test_get_api_key_usage_containing_resource_with_no_payload_containing_resour ] # when - api_key_usage_with_resource = UsageCollector._get_api_key_usage_containing_resource( + api_key_usage_with_resource = get_api_key_usage_containing_resource( api_key_hash="fake_api2_hash", usage_payloads=usage_payloads ) @@ -205,7 +206,7 @@ def test_get_api_key_usage_containing_resource(): ] # when - api_key_usage_with_resource = UsageCollector._get_api_key_usage_containing_resource( + api_key_usage_with_resource = get_api_key_usage_containing_resource( api_key_hash="fake_api2_hash", usage_payloads=usage_payloads ) @@ -303,7 +304,7 @@ def test_zip_usage_payloads(): ] # when - zipped_usage_payloads = UsageCollector._zip_usage_payloads( + zipped_usage_payloads = zip_usage_payloads( usage_payloads=dumped_usage_payloads ) @@ -395,7 +396,7 @@ def test_zip_usage_payloads_with_system_info_missing_resource_id_and_no_resource ] # when - zipped_usage_payloads = UsageCollector._zip_usage_payloads( + zipped_usage_payloads = zip_usage_payloads( usage_payloads=dumped_usage_payloads ) @@ -458,7 +459,7 @@ def test_zip_usage_payloads_with_system_info_missing_resource_id(): ] # when - zipped_usage_payloads = UsageCollector._zip_usage_payloads( + zipped_usage_payloads = zip_usage_payloads( usage_payloads=dumped_usage_payloads ) @@ -513,7 +514,7 @@ def test_zip_usage_payloads_with_system_info_missing_resource_id_and_api_key(): ] # when - zipped_usage_payloads = UsageCollector._zip_usage_payloads( + zipped_usage_payloads = zip_usage_payloads( usage_payloads=dumped_usage_payloads ) From db86686485c1ab646b653b93148d3011fed1991e Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 19 Aug 2024 13:15:14 +0200 Subject: [PATCH 22/33] formatting and sorting modules --- inference/usage_tracking/collector.py | 11 +++++++- inference/usage_tracking/collector_utils.py | 29 +++++++-------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index edac97ebd7..f786c1c012 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -33,7 +33,16 @@ ) from inference.usage_tracking.utils import collect_func_params -from .collector_utils import APIKey, APIKeyHash, APIKeyUsage, ResourceDetails, ResourceID, SystemDetails, UsagePayload, zip_usage_payloads +from .collector_utils import ( + APIKey, + APIKeyHash, + APIKeyUsage, + ResourceDetails, + ResourceID, + SystemDetails, + UsagePayload, + zip_usage_payloads, +) from .config import TelemetrySettings, get_telemetry_settings from .redis_queue import RedisQueue from .sqlite_queue import SQLiteQueue diff --git a/inference/usage_tracking/collector_utils.py b/inference/usage_tracking/collector_utils.py index b008acecaa..2295fab74b 100644 --- a/inference/usage_tracking/collector_utils.py +++ b/inference/usage_tracking/collector_utils.py @@ -1,6 +1,5 @@ from typing import Any, DefaultDict, Dict, List, Optional, Union - ResourceID = str Usage = Union[DefaultDict[str, Any], Dict[str, Any]] ResourceUsage = Union[DefaultDict[ResourceID, Usage], Dict[ResourceID, Usage]] @@ -17,9 +16,7 @@ def merge_usage_dicts(d1: UsagePayload, d2: UsagePayload): if d1 and d2 and d1.get("resource_id") != d2.get("resource_id"): raise ValueError("Cannot merge usage for different resource IDs") if "timestamp_start" in d1 and "timestamp_start" in d2: - merged["timestamp_start"] = min( - d1["timestamp_start"], d2["timestamp_start"] - ) + merged["timestamp_start"] = min(d1["timestamp_start"], d2["timestamp_start"]) if "timestamp_stop" in d1 and "timestamp_stop" in d2: merged["timestamp_stop"] = max(d1["timestamp_stop"], d2["timestamp_stop"]) if "processed_frames" in d1 and "processed_frames" in d2: @@ -59,11 +56,9 @@ def zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: or list(resource_payloads.keys()) != [""] ): continue - api_key_usage_with_resource = ( - get_api_key_usage_containing_resource( - api_key_hash=api_key_hash, - usage_payloads=usage_payloads, - ) + api_key_usage_with_resource = get_api_key_usage_containing_resource( + api_key_hash=api_key_hash, + usage_payloads=usage_payloads, ) if not api_key_usage_with_resource: system_info_payload = resource_payloads @@ -82,11 +77,9 @@ def zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: resource_usage_payload, ) in resource_payloads.items(): if resource_usage_key == "": - api_key_usage_with_resource = ( - get_api_key_usage_containing_resource( - api_key_hash=api_key_hash, - usage_payloads=usage_payloads, - ) + api_key_usage_with_resource = get_api_key_usage_containing_resource( + api_key_hash=api_key_hash, + usage_payloads=usage_payloads, ) if not api_key_usage_with_resource: system_info_payload = {"": resource_usage_payload} @@ -103,11 +96,9 @@ def zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: merged_resource_payload = merged_api_key_payload.setdefault( resource_usage_key, {} ) - merged_api_key_payload[resource_usage_key] = ( - merge_usage_dicts( - merged_resource_payload, - resource_usage_payload, - ) + merged_api_key_payload[resource_usage_key] = merge_usage_dicts( + merged_resource_payload, + resource_usage_payload, ) zipped_payloads = [merged_api_key_usage_payloads] From d9d405f1b8903792102fe7b05e2f4223cf524567 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 19 Aug 2024 13:22:22 +0200 Subject: [PATCH 23/33] Remove 'requirements/requirements.sam2.txt' --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index f2193dffd4..f14488f312 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,6 @@ def read_requirements(path): ), extras_require={ "sam": read_requirements("requirements/requirements.sam.txt"), - "sam2": read_requirements("requirements/requirements.sam2.txt"), }, classifiers=[ "Programming Language :: Python :: 3", From b79013e80fa82b5b9c51cb0c4b7194925e3b8caa Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:38:53 +0200 Subject: [PATCH 24/33] Move send_usage_payload to helpers --- inference/usage_tracking/collector.py | 53 +++++-------------- ...{collector_utils.py => payload_helpers.py} | 41 +++++++++++++- .../usage_tracking/test_collector.py | 2 +- 3 files changed, 54 insertions(+), 42 deletions(-) rename inference/usage_tracking/{collector_utils.py => payload_helpers.py} (76%) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index f786c1c012..65930b1f08 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -12,7 +12,6 @@ from threading import Event, Lock, Thread from uuid import uuid4 -import requests from typing_extensions import ( Any, Callable, @@ -33,7 +32,7 @@ ) from inference.usage_tracking.utils import collect_func_params -from .collector_utils import ( +from .payload_helpers import ( APIKey, APIKeyHash, APIKeyUsage, @@ -41,6 +40,7 @@ ResourceID, SystemDetails, UsagePayload, + send_usage_payload, zip_usage_payloads, ) from .config import TelemetrySettings, get_telemetry_settings @@ -475,45 +475,18 @@ def _offload_to_api(self, payloads: List[APIKeyUsage]): hashes_to_api_keys = dict(a[::-1] for a in self._hashed_api_keys.items()) - api_keys_hashes_failed = set() for payload in payloads: - for api_key_hash, workflow_payloads in payload.items(): - if api_key_hash not in hashes_to_api_keys: - api_keys_hashes_failed.add(api_key_hash) - continue - api_key = hashes_to_api_keys[api_key_hash] - complete_workflow_payloads = [ - w for w in workflow_payloads.values() if "processed_frames" in w - ] - try: - for workflow_payload in complete_workflow_payloads: - if "api_key_hash" in workflow_payload: - del workflow_payload["api_key_hash"] - workflow_payload["api_key"] = api_key - logger.debug( - "Offloading usage to %s, payload: %s", - self._settings.api_usage_endpoint_url, - complete_workflow_payloads, - ) - response = requests.post( - self._settings.api_usage_endpoint_url, - json=complete_workflow_payloads, - verify=ssl_verify, - headers={"Authorization": f"Bearer {api_key}"}, - timeout=1, - ) - except Exception as exc: - logger.debug("Failed to send usage - %s", exc) - api_keys_hashes_failed.add(api_key_hash) - continue - if response.status_code != 200: - logger.debug( - "Failed to send usage - got %s status code (%s)", - response.status_code, - response.content, - ) - api_keys_hashes_failed.add(api_key_hash) - continue + api_keys_hashes_failed = send_usage_payload( + payloads=payloads, + api_usage_endpoint_url=self._settings.api_usage_endpoint_url, + hashes_to_api_keys=hashes_to_api_keys, + ssl_verify=ssl_verify, + ) + if api_keys_hashes_failed: + logger.debug( + "Failed to send usage following usage payloads: %s", + api_keys_hashes_failed, + ) for api_key_hash in list(payload.keys()): if api_key_hash not in api_keys_hashes_failed: del payload[api_key_hash] diff --git a/inference/usage_tracking/collector_utils.py b/inference/usage_tracking/payload_helpers.py similarity index 76% rename from inference/usage_tracking/collector_utils.py rename to inference/usage_tracking/payload_helpers.py index 2295fab74b..a8f416c961 100644 --- a/inference/usage_tracking/collector_utils.py +++ b/inference/usage_tracking/payload_helpers.py @@ -1,4 +1,6 @@ -from typing import Any, DefaultDict, Dict, List, Optional, Union +from typing import Any, DefaultDict, Dict, List, Optional, Set, Union + +import requests ResourceID = str Usage = Union[DefaultDict[str, Any], Dict[str, Any]] @@ -108,3 +110,40 @@ def zip_usage_payloads(usage_payloads: List[APIKeyUsage]) -> List[APIKeyUsage]: ] zipped_payloads.append({system_info_api_key_hash: system_info_payload}) return zipped_payloads + + +def send_usage_payload( + payload: UsagePayload, + api_usage_endpoint_url: str, + hashes_to_api_keys: Optional[Dict[APIKeyHash, APIKey]] = None, + ssl_verify: bool = False, +) -> Set[APIKeyHash]: + hashes_to_api_keys = hashes_to_api_keys or {} + api_keys_hashes_failed = set() + for api_key_hash, workflow_payloads in payload.items(): + if hashes_to_api_keys and api_key_hash not in hashes_to_api_keys: + api_keys_hashes_failed.add(api_key_hash) + continue + api_key = hashes_to_api_keys.get(api_key_hash) or api_key_hash + complete_workflow_payloads = [ + w for w in workflow_payloads.values() if "processed_frames" in w + ] + try: + for workflow_payload in complete_workflow_payloads: + if "api_key_hash" in workflow_payload: + del workflow_payload["api_key_hash"] + workflow_payload["api_key"] = api_key + response = requests.post( + api_usage_endpoint_url, + json=complete_workflow_payloads, + verify=ssl_verify, + headers={"Authorization": f"Bearer {api_key}"}, + timeout=1, + ) + except Exception: + api_keys_hashes_failed.add(api_key_hash) + continue + if response.status_code != 200: + api_keys_hashes_failed.add(api_key_hash) + continue + return api_keys_hashes_failed diff --git a/tests/inference/unit_tests/usage_tracking/test_collector.py b/tests/inference/unit_tests/usage_tracking/test_collector.py index 584b5f585b..f0a68a37ce 100644 --- a/tests/inference/unit_tests/usage_tracking/test_collector.py +++ b/tests/inference/unit_tests/usage_tracking/test_collector.py @@ -5,7 +5,7 @@ from inference.core.env import LAMBDA from inference.usage_tracking.collector import UsageCollector -from inference.usage_tracking.collector_utils import get_api_key_usage_containing_resource, merge_usage_dicts, zip_usage_payloads +from inference.usage_tracking.payload_helpers import get_api_key_usage_containing_resource, merge_usage_dicts, zip_usage_payloads def test_create_empty_usage_dict(): From ddf90dcf2b50bf20332e04afc5e27d8285cec72d Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Mon, 19 Aug 2024 17:12:25 +0200 Subject: [PATCH 25/33] sort imports --- inference/usage_tracking/collector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 65930b1f08..317ff02d15 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -32,6 +32,7 @@ ) from inference.usage_tracking.utils import collect_func_params +from .config import TelemetrySettings, get_telemetry_settings from .payload_helpers import ( APIKey, APIKeyHash, @@ -43,7 +44,6 @@ send_usage_payload, zip_usage_payloads, ) -from .config import TelemetrySettings, get_telemetry_settings from .redis_queue import RedisQueue from .sqlite_queue import SQLiteQueue From 9bf9347563c35d7d31c8edf74e59aabfd0d31d3f Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:54:04 +0200 Subject: [PATCH 26/33] Create cache dir if it does not exist yet --- inference/usage_tracking/sqlite_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/inference/usage_tracking/sqlite_queue.py b/inference/usage_tracking/sqlite_queue.py index d44ca89e3d..f0f42657d6 100644 --- a/inference/usage_tracking/sqlite_queue.py +++ b/inference/usage_tracking/sqlite_queue.py @@ -19,6 +19,8 @@ def __init__( self._db_file_path: str = db_file_path if not connection: + if not os.path.exists(MODEL_CACHE_DIR): + os.makedirs(MODEL_CACHE_DIR) connection: sqlite3.Connection = sqlite3.connect(db_file_path, timeout=1) self._create_table(connection=connection) connection.close() From c7a78c5bca9fe5e9b0430a57a85f529b1d1a7db1 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 21 Aug 2024 09:19:28 +0200 Subject: [PATCH 27/33] fix redis queue --- inference/usage_tracking/redis_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index f39d2ec38a..6d9467db85 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -29,7 +29,7 @@ def put(self, payload: Any): self._increment += 1 redis_key = f"{self._prefix}:{self._increment}" self._redis_cache.client.set( - key=redis_key, + name=redis_key, value=payload, ) self._redis_cache.client.zadd( From 9b9cf218e032d61e5b21821e6ac1f5807d1234c5 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 21 Aug 2024 10:05:41 +0200 Subject: [PATCH 28/33] store bytes in redis --- inference/usage_tracking/redis_queue.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index 6d9467db85..2dfac7aeda 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -1,3 +1,4 @@ +import json import time from threading import Lock @@ -24,6 +25,12 @@ def __init__( self._lock: Lock = Lock() def put(self, payload: Any): + if not isinstance(payload, str): + try: + payload = json.dumps(payload) + except Exception as exc: + logger.error("Failed to parse payload '%s' to JSON - %s", payload, exc) + return with self._lock: try: self._increment += 1 From f3c79d629526890ac1ea4d9520835db1656fc9ee Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:39:59 +0200 Subject: [PATCH 29/33] add hash tag when setting redis keys --- inference/usage_tracking/redis_queue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index 2dfac7aeda..85524401b0 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -11,15 +11,15 @@ class RedisQueue: """ - Store and forget, keys with specified prefix are handled by external service + Store and forget, keys with specified hash tag are handled by external service """ def __init__( self, - prefix: str = f"UsageCollector:{time.time()}", + hash_tag: str = "UsageCollector", redis_cache: Optional[RedisCache] = None, ): - self._prefix: str = prefix + self._prefix: str = f"{{{hash_tag}}}:{time.time()}" self._redis_cache: RedisCache = redis_cache or cache self._increment: int = 0 self._lock: Lock = Lock() From 65242f6a301fc9acedbed942a08f99f1b250cd54 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:42:44 +0200 Subject: [PATCH 30/33] decrease chance of key collision --- inference/usage_tracking/redis_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index 85524401b0..71d31ecb03 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -1,6 +1,7 @@ import json import time from threading import Lock +from uuid import uuid4 from typing_extensions import Any, Dict, List, Optional @@ -19,7 +20,7 @@ def __init__( hash_tag: str = "UsageCollector", redis_cache: Optional[RedisCache] = None, ): - self._prefix: str = f"{{{hash_tag}}}:{time.time()}" + self._prefix: str = f"{{{hash_tag}}}:{uuid4().hex[:5]}:{time.time()}" self._redis_cache: RedisCache = redis_cache or cache self._increment: int = 0 self._lock: Lock = Lock() From 97159cf1db50213625214481dc93a09e8796dd68 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:59:29 +0200 Subject: [PATCH 31/33] Do not attempt to send payload if API key is empty --- inference/usage_tracking/payload_helpers.py | 3 +++ inference/usage_tracking/redis_queue.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/inference/usage_tracking/payload_helpers.py b/inference/usage_tracking/payload_helpers.py index a8f416c961..996424ac32 100644 --- a/inference/usage_tracking/payload_helpers.py +++ b/inference/usage_tracking/payload_helpers.py @@ -125,6 +125,9 @@ def send_usage_payload( api_keys_hashes_failed.add(api_key_hash) continue api_key = hashes_to_api_keys.get(api_key_hash) or api_key_hash + if not api_key: + api_keys_hashes_failed.add(api_key_hash) + continue complete_workflow_payloads = [ w for w in workflow_payloads.values() if "processed_frames" in w ] diff --git a/inference/usage_tracking/redis_queue.py b/inference/usage_tracking/redis_queue.py index 71d31ecb03..206d58c7ba 100644 --- a/inference/usage_tracking/redis_queue.py +++ b/inference/usage_tracking/redis_queue.py @@ -20,6 +20,9 @@ def __init__( hash_tag: str = "UsageCollector", redis_cache: Optional[RedisCache] = None, ): + # prefix must contain hash-tag to avoid CROSSLOT errors when using mget + # hash-tag is common part of the key wrapped within '{}' + # removing hash-tag will cause clients utilizing mget to fail self._prefix: str = f"{{{hash_tag}}}:{uuid4().hex[:5]}:{time.time()}" self._redis_cache: RedisCache = redis_cache or cache self._increment: int = 0 From 4f4ed77a629aa34abc954189e05468cd127a74c5 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:01:42 +0200 Subject: [PATCH 32/33] Add is_preview to WorkflowSpecificationInferenceRequest (/workflows/run); pass is_preview to UsageCollector; store is_preview in resource details --- inference/core/entities/requests/workflows.py | 4 ++++ inference/core/workflows/execution_engine/core.py | 2 ++ .../core/workflows/execution_engine/entities/engine.py | 1 + inference/core/workflows/execution_engine/v1/core.py | 2 ++ inference/usage_tracking/collector.py | 8 +++++++- 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/inference/core/entities/requests/workflows.py b/inference/core/entities/requests/workflows.py index cb9db3eba3..d364ecf013 100644 --- a/inference/core/entities/requests/workflows.py +++ b/inference/core/entities/requests/workflows.py @@ -22,6 +22,10 @@ class WorkflowInferenceRequest(BaseModel): class WorkflowSpecificationInferenceRequest(WorkflowInferenceRequest): specification: dict + is_preview: bool = Field( + default=False, + description="Reserved, used internally by Roboflow to distinguish between preview and non-preview runs" + ) class DescribeBlocksRequest(BaseModel): diff --git a/inference/core/workflows/execution_engine/core.py b/inference/core/workflows/execution_engine/core.py index e9d9ad4010..7b7f78fbde 100644 --- a/inference/core/workflows/execution_engine/core.py +++ b/inference/core/workflows/execution_engine/core.py @@ -61,10 +61,12 @@ def run( self, runtime_parameters: Dict[str, Any], fps: float = 0, + _is_preview: bool = False, ) -> List[Dict[str, Any]]: return self._engine.run( runtime_parameters=runtime_parameters, fps=fps, + _is_preview=_is_preview, ) diff --git a/inference/core/workflows/execution_engine/entities/engine.py b/inference/core/workflows/execution_engine/entities/engine.py index 06a31880d2..021fba5b21 100644 --- a/inference/core/workflows/execution_engine/entities/engine.py +++ b/inference/core/workflows/execution_engine/entities/engine.py @@ -21,5 +21,6 @@ def run( self, runtime_parameters: Dict[str, Any], fps: float = 0, + _is_preview: bool = False, ) -> List[Dict[str, Any]]: pass diff --git a/inference/core/workflows/execution_engine/v1/core.py b/inference/core/workflows/execution_engine/v1/core.py index 379186f6cb..5a5a3d987b 100644 --- a/inference/core/workflows/execution_engine/v1/core.py +++ b/inference/core/workflows/execution_engine/v1/core.py @@ -61,6 +61,7 @@ def run( self, runtime_parameters: Dict[str, Any], fps: float = 0, + _is_preview: bool = False, ) -> List[Dict[str, Any]]: runtime_parameters = assembly_runtime_parameters( runtime_parameters=runtime_parameters, @@ -77,4 +78,5 @@ def run( max_concurrent_steps=self._max_concurrent_steps, usage_fps=fps, usage_workflow_id=self._workflow_id, + usage_workflow_preview=_is_preview, ) diff --git a/inference/usage_tracking/collector.py b/inference/usage_tracking/collector.py index 317ff02d15..6eac36af1f 100644 --- a/inference/usage_tracking/collector.py +++ b/inference/usage_tracking/collector.py @@ -508,7 +508,7 @@ async def async_push_usage_payloads(self): @staticmethod def _resource_details_from_workflow_json( workflow_json: Dict[str, Any] - ) -> Tuple[ResourceID, ResourceDetails]: + ) -> ResourceDetails: if not isinstance(workflow_json, dict): raise ValueError("workflow_json must be dict") return { @@ -524,6 +524,7 @@ def _extract_usage_params_from_func_kwargs( usage_fps: float, usage_api_key: str, usage_workflow_id: str, + usage_workflow_preview: bool, usage_inference_test_run: bool, func: Callable[[Any], Any], args: List[Any], @@ -550,6 +551,7 @@ def _extract_usage_params_from_func_kwargs( resource_details = UsageCollector._resource_details_from_workflow_json( workflow_json=workflow_json, ) + resource_details["is_preview"] = usage_workflow_preview resource_id = usage_workflow_id if not resource_id and resource_details: usage_workflow_id = UsageCollector._calculate_resource_hash( @@ -609,6 +611,7 @@ def sync_wrapper( usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", + usage_workflow_preview: bool = False, usage_inference_test_run: bool = False, **kwargs: P.kwargs, ) -> T: @@ -617,6 +620,7 @@ def sync_wrapper( usage_fps=usage_fps, usage_api_key=usage_api_key, usage_workflow_id=usage_workflow_id, + usage_workflow_preview=usage_workflow_preview, usage_inference_test_run=usage_inference_test_run, func=func, args=args, @@ -631,6 +635,7 @@ async def async_wrapper( usage_fps: float = 0, usage_api_key: APIKey = "", usage_workflow_id: str = "", + usage_workflow_preview: bool = False, usage_inference_test_run: bool = False, **kwargs: P.kwargs, ) -> T: @@ -639,6 +644,7 @@ async def async_wrapper( usage_fps=usage_fps, usage_api_key=usage_api_key, usage_workflow_id=usage_workflow_id, + usage_workflow_preview=usage_workflow_preview, usage_inference_test_run=usage_inference_test_run, func=func, args=args, From a5874c7623a1abaf8ee07e039af4bf9c37af1d24 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:39:00 +0200 Subject: [PATCH 33/33] formatting --- inference/core/entities/requests/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/core/entities/requests/workflows.py b/inference/core/entities/requests/workflows.py index d364ecf013..0a94d612e6 100644 --- a/inference/core/entities/requests/workflows.py +++ b/inference/core/entities/requests/workflows.py @@ -24,7 +24,7 @@ class WorkflowSpecificationInferenceRequest(WorkflowInferenceRequest): specification: dict is_preview: bool = Field( default=False, - description="Reserved, used internally by Roboflow to distinguish between preview and non-preview runs" + description="Reserved, used internally by Roboflow to distinguish between preview and non-preview runs", )