diff --git a/nextmv/cloud/__init__.py b/nextmv/cloud/__init__.py index 3bdb72f..47cf645 100644 --- a/nextmv/cloud/__init__.py +++ b/nextmv/cloud/__init__.py @@ -1,4 +1,22 @@ """Functionality for interacting with the Nextmv Cloud.""" +from .acceptance_test import AcceptanceTest as AcceptanceTest +from .acceptance_test import AcceptanceTestParams as AcceptanceTestParams +from .acceptance_test import Comparison as Comparison +from .acceptance_test import ComparisonInstance as ComparisonInstance +from .acceptance_test import Metric as Metric +from .acceptance_test import MetricType as MetricType from .application import Application as Application +from .application import DownloadURL as DownloadURL +from .application import ErrorLog as ErrorLog +from .application import Metadata as Metadata +from .application import PollingOptions as PollingOptions +from .application import RunInformation as RunInformation +from .application import RunResult as RunResult +from .application import UploadURL as UploadURL +from .batch_experiment import BatchExperiment as BatchExperiment +from .batch_experiment import BatchExperimentInformation as BatchExperimentInformation +from .batch_experiment import BatchExperimentMetadata as BatchExperimentMetadata +from .batch_experiment import BatchExperimentRun as BatchExperimentRun from .client import Client as Client +from .input_set import InputSet as InputSet diff --git a/nextmv/cloud/acceptance_test.py b/nextmv/cloud/acceptance_test.py new file mode 100644 index 0000000..c3b92c9 --- /dev/null +++ b/nextmv/cloud/acceptance_test.py @@ -0,0 +1,90 @@ +"""This module contains definitions for acceptance tests.""" + +from datetime import datetime +from enum import Enum + +from nextmv.base_model import BaseModel + + +class MetricType(str, Enum): + """Type of metric when doing a comparison.""" + + absolute_threshold = "absolute-threshold" + """Absolute threshold metric type.""" + difference_threshold = "difference-threshold" + """Difference threshold metric type.""" + direct_comparison = "direct-comparison" + """Direct comparison metric type.""" + + +class Comparison(str, Enum): + """Comparison to use for two metrics.""" + + equal_to = "eq" + """Equal to metric type.""" + greater_than = "gt" + """Greater than metric type.""" + greater_than_or_equal_to = "ge" + """Greater than or equal to metric type.""" + less_than = "lt" + """Less than metric type.""" + less_than_or_equal_to = "le" + """Less than or equal to metric type.""" + not_equal_to = "ne" + """Not equal to metric type.""" + + +class AcceptanceTestParams(BaseModel): + """Parameters of an acceptance test.""" + + operator: Comparison + """Operator used to compare two metrics.""" + + +class Metric(BaseModel): + """A metric is a key performance indicator that is used to evaluate the + performance of a test.""" + + field: str + """Field of the metric.""" + metric_type: MetricType + """Type of the metric.""" + params: AcceptanceTestParams + """Parameters of the metric.""" + statistic: str + """Statistic of the metric.""" + + +class ComparisonInstance(BaseModel): + """An instance used for a comparison.""" + + instance_id: str + """ID of the instance.""" + version_id: str + """ID of the version.""" + + +class AcceptanceTest(BaseModel): + """An acceptance test gives a go/no-go decision criteria for a set of + metrics. It relies on a batch experiment.""" + + id: str + """ID of the acceptance test.""" + name: str + """Name of the acceptance test.""" + description: str + """Description of the acceptance test.""" + app_id: str + """ID of the app that owns the acceptance test.""" + experiment_id: str + """ID of the batch experiment underlying in the acceptance test.""" + control: ComparisonInstance + """Control instance of the acceptance test.""" + candidate: ComparisonInstance + """Candidate instance of the acceptance test.""" + metrics: list[Metric] + """Metrics of the acceptance test.""" + created_at: datetime + """Creation date of the acceptance test.""" + updated_at: datetime + """Last update date of the acceptance test.""" diff --git a/nextmv/cloud/application.py b/nextmv/cloud/application.py index 77575bb..b2f664c 100644 --- a/nextmv/cloud/application.py +++ b/nextmv/cloud/application.py @@ -1,51 +1,105 @@ """This module contains the application class.""" +import sys +import time from dataclasses import dataclass from datetime import datetime from typing import Any from nextmv.base_model import BaseModel +from nextmv.cloud.acceptance_test import AcceptanceTest, Metric +from nextmv.cloud.batch_experiment import BatchExperiment, BatchExperimentMetadata, BatchExperimentRun from nextmv.cloud.client import Client +from nextmv.cloud.input_set import InputSet + + +class DownloadURL(BaseModel): + """Result of getting a download URL.""" + + url: str + """URL to use for downloading the file.""" + + +class ErrorLog(BaseModel): + """Error log of a run, when it was not successful.""" + + error: str | None = None + """Error message.""" + stdout: str | None = None + """Standard output.""" + stderr: str | None = None + """Standard error.""" class Metadata(BaseModel): """Metadata of a run, whether it was successful or not.""" - status: str - """Status of the run.""" + application_id: str + """ID of the application where the run was submitted to.""" + application_instance_id: str + """ID of the instance where the run was submitted to.""" + application_version_id: str + """ID of the version of the application where the run was submitted to.""" created_at: datetime """Date and time when the run was created.""" duration: float """Duration of the run in milliseconds.""" + error: str + """Error message if the run failed.""" input_size: float """Size of the input in bytes.""" output_size: float """Size of the output in bytes.""" - error: str - """Error message if the run failed.""" - application_id: str - """ID of the application where the run was submitted to.""" - application_instance_id: str - """ID of the instance where the run was submitted to.""" - application_version_id: str - """ID of the version of the application where the run was submitted to.""" + status: str + """Status of the run.""" -class RunResult(BaseModel): - """Result of a run, wheter it was successful or not.""" +class PollingOptions(BaseModel): + """Options to use when polling for a run result.""" + + backoff: float = 1.5 + """Backoff factor to use between polls.""" + delay: float = 1 + """Delay to use between polls, in seconds.""" + initial_delay: float = 1 + """Initial delay to use between polls, in seconds.""" + max_duration: float = 60 + """Maximum duration of the polling strategy, in seconds.""" + max_tries: int = 10 + """Maximum number of tries to use.""" + + +class RunInformation(BaseModel): + """Information of a run.""" - id: str - """ID of the run.""" - user_email: str - """Email of the user who submitted the run.""" - name: str - """Name of the run.""" description: str """Description of the run.""" + id: str + """ID of the run.""" metadata: Metadata """Metadata of the run.""" - output: dict[str, Any] - """Output of the run.""" + name: str + """Name of the run.""" + user_email: str + """Email of the user who submitted the run.""" + + +class RunResult(RunInformation): + """Result of a run, wheter it was successful or not.""" + + error_log: ErrorLog | None = None + """Error log of the run. Only available if the run failed.""" + output: dict[str, Any] | None = None + """Output of the run. Only available if the run succeeded.""" + + +class UploadURL(BaseModel): + """Result of getting an upload URL.""" + + upload_id: str + """ID of the upload.""" + upload_url: str + """URL to use for uploading the file.""" @dataclass @@ -56,15 +110,323 @@ class Application: """Client to use for interacting with the Nextmv Cloud API.""" id: str """ID of the application.""" - endpoint: str = "v1/applications/{id}" - """Base endpoint for the application.""" + default_instance_id: str = "devint" """Default instance ID to use for submitting runs.""" + endpoint: str = "v1/applications/{id}" + """Base endpoint for the application.""" + experiments_endpoint: str = "{base}/experiments" + """Base endpoint for the experiments in the application.""" def __post_init__(self): """Logic to run after the class is initialized.""" self.endpoint = self.endpoint.format(id=self.id) + self.experiments_endpoint = self.experiments_endpoint.format(base=self.endpoint) + + def acceptance_test(self, acceptance_test_id: str) -> AcceptanceTest: + """ + Get an acceptance test. + + Args: + acceptance_test_id: ID of the acceptance test. + + Returns: + Acceptance test. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/acceptance/{acceptance_test_id}", + ) + + return AcceptanceTest.from_dict(response.json()) + + def batch_experiment(self, batch_id: str) -> BatchExperiment: + """ + Get a batch experiment. + + Args: + batch_id: ID of the batch experiment. + + Returns: + Batch experiment. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/batch/{batch_id}", + ) + + return BatchExperiment.from_dict(response.json()) + + def input_set(self, input_set_id: str) -> InputSet: + """ + Get an input set. + + Args: + input_set_id: ID of the input set. + + Returns: + Input set. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/inputsets/{input_set_id}", + ) + + return InputSet.from_dict(response.json()) + + def list_acceptance_tests(self) -> list[AcceptanceTest]: + """ + List all acceptance tests. + + Returns: + List of acceptance tests. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/acceptance", + ) + + return [AcceptanceTest.from_dict(acceptance_test) for acceptance_test in response.json()] + + def list_batch_experiments(self) -> list[BatchExperimentMetadata]: + """ + List all batch experiments. + + Returns: + List of batch experiments. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/batch", + ) + + return [BatchExperimentMetadata.from_dict(batch_experiment) for batch_experiment in response.json()] + + def list_input_sets(self) -> list[InputSet]: + """ + List all input sets. + + Returns: + List of input sets. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.experiments_endpoint}/inputsets", + ) + + return [InputSet.from_dict(input_set) for input_set in response.json()] + + def new_acceptance_test( + self, + candidate_instance_id: str, + control_instance_id: str, + id: str, + metrics: list[Metric | dict[str, Any]], + name: str, + input_set_id: str | None = None, + description: str | None = None, + ) -> AcceptanceTest: + """ + Create a new acceptance test. The acceptance test is based on a batch + experiment. If you already started a batch experiment, you don't need + to provide the input_set_id parameter. In that case, the ID of the + acceptance test and the batch experiment must be the same. If the batch + experiment does not exist, you can provide the input_set_id parameter + and a new batch experiment will be created for you. + + Args: + candidate_instance_id: ID of the candidate instance. + control_instance_id: ID of the control instance. + id: ID of the acceptance test. + metrics: List of metrics to use for the acceptance test. + name: Name of the acceptance test. + input_set_id: ID of the input set to use for the underlying batch + experiment, in case it hasn't been started. + description: Description of the acceptance test. + + Returns: + Acceptance test. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + ValueError: If the batch experiment ID does not match the + acceptance test ID. + """ + + if input_set_id is None: + batch_experiment = self.batch_experiment(batch_id=id) + batch_experiment_id = batch_experiment.id + else: + batch_experiment_id = self.new_batch_experiment( + name=name, + input_set_id=input_set_id, + instance_ids=[candidate_instance_id, control_instance_id], + description=description, + id=id, + ) + + if batch_experiment_id != id: + raise ValueError(f"batch experiment_id ({batch_experiment_id}) does not match acceptance test id ({id})") + + payload_metrics = [{}] * len(metrics) + for i, metric in enumerate(metrics): + payload_metrics[i] = metric.to_dict() if isinstance(metric, Metric) else metric + + payload = { + "candidate": {"instance_id": candidate_instance_id}, + "control": {"instance_id": control_instance_id}, + "metrics": payload_metrics, + "experiment_id": batch_experiment_id, + "name": name, + } + if description is not None: + payload["description"] = description + if id is not None: + payload["id"] = id + + response = self.client.request( + method="POST", + endpoint=f"{self.experiments_endpoint}/acceptance", + payload=payload, + ) + + return AcceptanceTest.from_dict(response.json()) + + def new_batch_experiment( + self, + name: str, + input_set_id: str, + instance_ids: list[str], + description: str | None = None, + id: str | None = None, + option_sets: dict[str, Any] | None = None, + runs: list[BatchExperimentRun | dict[str, Any]] | None = None, + ) -> str: + """ + Create a new batch experiment. + + Args: + name: Name of the batch experiment. + input_set_id: ID of the input set to use for the experiment. + instance_ids: List of instance IDs to use for the experiment. + description: Description of the batch experiment. + id: ID of the batch experiment. + option_sets: Option sets to use for the experiment. + runs: Runs to use for the experiment. + + Returns: + ID of the batch experiment. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + payload = { + "name": name, + "input_set_id": input_set_id, + "instance_ids": instance_ids, + } + if description is not None: + payload["description"] = description + if id is not None: + payload["id"] = id + if option_sets is not None: + payload["option_sets"] = option_sets + if runs is not None: + payload_runs = [{}] * len(runs) + for i, run in enumerate(runs): + payload_runs[i] = run.to_dict() if isinstance(run, BatchExperimentRun) else run + payload["runs"] = payload_runs + + response = self.client.request( + method="POST", + endpoint=f"{self.experiments_endpoint}/batch", + payload=payload, + ) + + return response.json()["id"] + + def new_input_set( + self, + id: str, + name: str, + description: str | None = None, + end_time: datetime | None = None, + instance_id: str | None = None, + maximum_runs: int | None = None, + run_ids: list[str] | None = None, + start_time: datetime | None = None, + ) -> InputSet: + """ + Create a new input set. + + Args: + id: ID of the input set. + name: Name of the input set. + description: Description of the input set. + end_time: End time of the runs to construct the input set. + instance_id: ID of the instance to use for the input set. If not + provided, the default_instance_id will be used. + maximum_runs: Maximum number of runs to use for the input set. + run_ids: IDs of the runs to use for the input set. + start_time: Start time of the runs to construct the input set. + + Returns: + Input set. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + payload = { + "id": id, + "name": name, + } + if description is not None: + payload["description"] = description + if end_time is not None: + payload["end_time"] = end_time.isoformat() + if instance_id is not None: + payload["instance_id"] = instance_id + if maximum_runs is not None: + payload["maximum_runs"] = maximum_runs + if run_ids is not None: + payload["run_ids"] = run_ids + if start_time is not None: + payload["start_time"] = start_time.isoformat() + + response = self.client.request( + method="POST", + endpoint=f"{self.experiments_endpoint}/inputsets", + payload=payload, + ) + + return InputSet.from_dict(response.json()) def new_run( self, @@ -90,24 +452,40 @@ def new_run( Returns: ID of the submitted run. + + Raises: + requests.HTTPError: If the response status code is not 2xx. """ - payload = {} + input_size = 0 if input is not None: + input_size = _get_size(input) + + upload_id_used = upload_id is not None + if not upload_id_used and input_size > self.client.max_lambda_payload_size: + upload_url = self.upload_url() + self.upload_large_input(upload_url=upload_url, input=input) + upload_id = upload_url.upload_id + upload_id_used = True + + payload = {} + if upload_id_used: + payload["upload_id"] = upload_id + else: payload["input"] = input + if name is not None: payload["name"] = name if description is not None: payload["description"] = description - if upload_id is not None: - payload["upload_id"] = upload_id if options is not None: payload["options"] = options query_params = { "instance_id": instance_id if instance_id is not None else self.default_instance_id, } - response = self.client.post( + response = self.client.request( + method="POST", endpoint=f"{self.endpoint}/runs", payload=payload, query_params=query_params, @@ -115,22 +493,246 @@ def new_run( return response.json()["run_id"] - def run_result( + def new_run_with_result( + self, + input: dict[str, Any] = None, + instance_id: str | None = None, + name: str | None = None, + description: str | None = None, + upload_id: str | None = None, + run_options: dict[str, Any] | None = None, + polling_options: PollingOptions | None = None, + ) -> RunResult: + """ + Submit an input to start a new run of the application and poll for the + result. This is a convenience method that combines the new_run and + run_result_with_polling methods, applying polling logic to check when + the run succeeded. + + Args: + input: Input to use for the run. + instance_id: ID of the instance to use for the run. If not + provided, the default_instance_id will be used. + name: Name of the run. + description: Description of the run. + upload_id: ID to use when running a large input. + run_options: Options to use for the run. + polling_options: Options to use when polling for the run result. If + not provided, the default options will be used. + + Returns: + Result of the run. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + TimeoutError: If the run does not succeed after the polling + strategy is exhausted based on time duration. + RuntimeError: If the run does not succeed after the polling + strategy is exhausted based on number of tries. + """ + + run_id = self.new_run( + input=input, + instance_id=instance_id, + name=name, + description=description, + upload_id=upload_id, + options=run_options, + ) + + return self.run_result_with_polling( + run_id=run_id, + polling_options=polling_options, + ) + + def run_metadata(self, run_id: str) -> RunInformation: + """ + Get the metadata of a run. The result does not include the run output. + + Args: + run_id: ID of the run. + + Returns: + Metadata of the run (Run result with no output). + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="GET", + endpoint=f"{self.endpoint}/runs/{run_id}/metadata", + ) + + return RunInformation.from_dict(response.json()) + + def run_result(self, run_id: str) -> RunResult: + """ + Get the result of a run. The result includes the run output. + + Args: + run_id: ID of the run. + + Returns: + Result of the run. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + run_information = self.run_metadata(run_id=run_id) + + return self._run_result(run_id=run_id, run_information=run_information) + + def run_result_with_polling( + self, + run_id: str, + polling_options: PollingOptions, + ) -> RunResult: + """ + Get the result of a run. The result includes the run output. This + method polls for the result until the run finishes executing or the + polling strategy is exhausted. + + Args: + run_id: ID of the run. + + Returns: + Result of the run. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + if polling_options is None: + polling_options = PollingOptions() + + time.sleep(polling_options.initial_delay) + delay = polling_options.delay + polling_ok = False + for _ in range(polling_options.max_tries): + run_information = self.run_metadata(run_id=run_id) + if run_information.metadata.status in ["succeeded", "failed"]: + polling_ok = True + break + + if delay > polling_options.max_duration: + raise TimeoutError( + f"run {run_id} did not succeed after {delay} seconds", + ) + + time.sleep(delay) + delay *= polling_options.backoff + + if not polling_ok: + raise RuntimeError( + f"run {run_id} did not succeed after {polling_options.max_tries} tries", + ) + + return self._run_result(run_id=run_id, run_information=run_information) + + def upload_large_input( + self, + upload_url: UploadURL, + input: dict[str, Any], + ) -> None: + """ + Upload the file located at the given path to the provided upload URL. + + Args: + upload_url: Upload URL to use for uploading the file. + input_path: Path to the input file. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + _ = self.client.request( + method="PUT", + endpoint=upload_url.upload_url, + payload=input, + headers={"Content-Type": "application/json"}, + ) + + def upload_url(self) -> UploadURL: + """ + Get an upload URL to use for uploading a file. + + Returns: + Result of getting an upload URL. + + Raises: + requests.HTTPError: If the response status code is not 2xx. + """ + + response = self.client.request( + method="POST", + endpoint=f"{self.endpoint}/runs/uploadurl", + ) + + return UploadURL.from_dict(response.json()) + + def _run_result( self, run_id: str, + run_information: RunInformation, ) -> RunResult: """ - Get the result of a run. + Get the result of a run. The result includes the run output. This is a + private method that is the base for retrieving a run result, regardless + of polling. Args: run_id: ID of the run. + run_information: Information of the run. Returns: Result of the run. + + Raises: + requests.HTTPError: If the response status code is not 2xx. """ + query_params = None + large_output = False + if run_information.metadata.output_size > self.client.max_lambda_payload_size: + query_params = {"format": "url"} + large_output = True - response = self.client.get( + response = self.client.request( + method="GET", endpoint=f"{self.endpoint}/runs/{run_id}", + query_params=query_params, + ) + if not large_output: + return RunResult.from_dict(response.json()) + + download_url = DownloadURL.from_dict(response.json()) + response2 = self.client.request( + method="GET", + endpoint=download_url.url, ) - return RunResult.from_dict(response.json()) + return RunResult.from_dict(response2.json()) + + +def _get_size(obj: dict | Any, seen: set | Any = None) -> int: + """Recursively finds size of objects""" + + size = sys.getsizeof(obj) + if seen is None: + seen = set() + obj_id = id(obj) + if obj_id in seen: + return 0 + + # Important mark as seen *before* entering recursion to gracefully handle + # self-referential objects + seen.add(obj_id) + if isinstance(obj, dict): + size += sum([_get_size(v, seen) for v in obj.values()]) + size += sum([_get_size(k, seen) for k in obj.keys()]) + elif hasattr(obj, "__dict__"): + size += _get_size(obj.__dict__, seen) + elif hasattr(obj, "__iter__") and not isinstance(obj, str | bytes | bytearray): + size += sum([_get_size(i, seen) for i in obj]) + return size diff --git a/nextmv/cloud/batch_experiment.py b/nextmv/cloud/batch_experiment.py new file mode 100644 index 0000000..7723ab4 --- /dev/null +++ b/nextmv/cloud/batch_experiment.py @@ -0,0 +1,63 @@ +"""This module contains definitios for batch experiments.""" + +from datetime import datetime +from typing import Any + +from nextmv.base_model import BaseModel + + +class BatchExperimentInformation(BaseModel): + """Information about a batch experiment. This serves as a base for all the + other batch experiment models.""" + + name: str + """Name of the batch experiment.""" + input_set_id: str + """ID of the input set used for the experiment.""" + instance_ids: list[str] + """List of instance IDs used for the experiment.""" + + description: str | None = None + """Description of the batch experiment.""" + id: str | None = None + """ID of the batch experiment.""" + + +class BatchExperiment(BatchExperimentInformation): + """A batch experiment compares two or more instances by executing all the + inputs contained in the input set.""" + + created_at: datetime + """Creation date of the batch experiment.""" + status: str + """Status of the batch experiment.""" + + grouped_distributional_summaries: list[dict[str, Any]] | None = None + """Grouped distributional summaries of the batch experiment.""" + option_sets: dict[str, Any] | None = None + """Option sets used for the experiment.""" + + +class BatchExperimentRun(BaseModel): + """A batch experiment run is a single execution of a batch experiment.""" + + option_set: str + """Option set used for the experiment.""" + input_id: str + """ID of the input used for the experiment.""" + + instance_id: str | None = None + """ID of the instance used for the experiment.""" + version_id: str | None = None + """ID of the version used for the experiment.""" + + +class BatchExperimentMetadata(BatchExperimentInformation): + """Metadata of a batch experiment.""" + + status: str + """Status of the batch experiment.""" + created_at: datetime + """Creation date of the batch experiment.""" + number_of_runs: int + """Number of runs of the batch experiment.""" diff --git a/nextmv/cloud/client.py b/nextmv/cloud/client.py index e0eec8c..005faf4 100644 --- a/nextmv/cloud/client.py +++ b/nextmv/cloud/client.py @@ -1,10 +1,14 @@ """Module with the client class.""" import os -from dataclasses import dataclass +import random +import time +from dataclasses import dataclass, field from typing import Any +from urllib.parse import urljoin import requests +from requests.adapters import HTTPAdapter, Retry @dataclass @@ -19,10 +23,35 @@ class Client: """API key to use for authenticating with the Nextmv Cloud API. If not provided, the client will look for the NEXTMV_API_KEY environment variable.""" - url: str = "https://api.cloud.nextmv.io" - """URL of the Nextmv Cloud API.""" + backoff_factor: float = 0.8 + """Exponential backoff factor to use for requests to the Nextmv Cloud + API.""" + backoff_jitter: float = 0.1 + """Jitter to use for requests to the Nextmv Cloud API when backing off.""" headers: dict[str, str] | None = None """Headers to use for requests to the Nextmv Cloud API.""" + initial_delay: float = 0.1 + """Initial delay to use for requests to the Nextmv Cloud API when backing + off.""" + max_lambda_payload_size: int = 5 * 1024 * 1024 + """Maximum size of the payload handled by the Nextmv Cloud API. This + value is used to determine whether to use the large input upload and/or + result download endpoints.""" + max_retries: int = 10 + """Maximum number of retries to use for requests to the Nextmv Cloud + API.""" + max_wait: int = 60 + """Maximum number of seconds that a request will wait for when retrying. If + exponential backoff is used, this is the maximum value of the backoff. + After this value is achieved, the backof stops increasing.""" + status_forcelist: list[int] = field( + default_factory=lambda: [500, 502, 503, 504, 507, 509], + ) + """Status codes to retry for requests to the Nextmv Cloud API.""" + timeout: float = 20 + """Timeout to use for requests to the Nextmv Cloud API.""" + url: str = "https://api.cloud.nextmv.io" + """URL of the Nextmv Cloud API.""" def __post_init__(self): """Logic to run after the class is initialized.""" @@ -41,55 +70,72 @@ def __post_init__(self): "Content-Type": "application/json", } - def post( + def request( self, + method: str, endpoint: str, - payload: dict[str, Any], + data: Any | None = None, + headers: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, query_params: dict[str, Any] | None = None, ) -> requests.Response: """ - Send a POST request to the Nextmv Cloud API. + Method to make a request to the Nextmv Cloud API. Args: + method: HTTP method to use. Valid methods include: GET, POST. endpoint: Endpoint to send the request to. - payload: Payload to send with the request. + data: Data to send with the request. + headers: Headers to send with the request. + payload: Payload to send with the request. Prefer using this over + data. query_params: Query parameters to send with the request. Returns: Response from the Nextmv Cloud API. - """ - - response = requests.post( - url=f"{self.url}/{endpoint}", - json=payload, - headers=self.headers, - params=query_params, - ) - response.raise_for_status() - - return response - def get( - self, - endpoint: str, - query_params: dict[str, Any] | None = None, - ) -> requests.Response: + Raises: + requests.HTTPError: If the response status code is not 2xx. """ - Send a GET request to the Nextmv Cloud API. - - Args: - endpoint: Endpoint to send the request to. - query_params: Query parameters to send with the request. - Returns: - Response from the Nextmv Cloud API. - """ - - response = requests.get( - url=f"{self.url}/{endpoint}", - headers=self.headers, - params=query_params, + session = requests.Session() + retries = Retry( + total=self.max_retries, + backoff_factor=self.backoff_factor, + backoff_jitter=self.backoff_jitter, + status_forcelist=self.status_forcelist, ) - response.raise_for_status() + adapter = HTTPAdapter(max_retries=retries) + session.mount("https://", adapter) + + kwargs = { + "url": urljoin(self.url, endpoint), + "timeout": self.timeout, + } + kwargs["headers"] = headers if headers is not None else self.headers + if data is not None: + kwargs["data"] = data + if payload is not None: + kwargs["json"] = payload + if query_params is not None: + kwargs["params"] = query_params + + # Backoff logic for 429 responses. + delay = self.initial_delay + for n in range(1, self.max_retries + 1): + response = session.request(method=method, **kwargs) + if response.status_code == 429: + time.sleep(min(delay, self.max_wait)) + delay = self.backoff_factor * 2**n + random.uniform(0, self.backoff_jitter) + continue + + break + + try: + response.raise_for_status() + except requests.HTTPError as e: + raise requests.HTTPError( + f"request to {endpoint} failed with status code {response.status_code} and message: {response.text}" + ) from e return response diff --git a/nextmv/cloud/input_set.py b/nextmv/cloud/input_set.py new file mode 100644 index 0000000..5c179e3 --- /dev/null +++ b/nextmv/cloud/input_set.py @@ -0,0 +1,24 @@ +"""This module contains definitions for input sets.""" + +from datetime import datetime + +from nextmv.base_model import BaseModel + + +class InputSet(BaseModel): + """An input set is the collection of inputs from the associated runs.""" + + app_id: str + """ID of the application that the input set belongs to.""" + created_at: datetime + """Creation time of the input set.""" + description: str + """Description of the input set.""" + id: str + """ID of the input set.""" + input_ids: list[str] + """IDs of the inputs in the input set.""" + name: str + """Name of the input set.""" + updated_at: datetime + """Last update time of the input set.""" diff --git a/nextmv/nextroute/schema/location.py b/nextmv/nextroute/schema/location.py index 015c389..73ba560 100644 --- a/nextmv/nextroute/schema/location.py +++ b/nextmv/nextroute/schema/location.py @@ -7,7 +7,7 @@ class Location(BaseModel): """Location represents a geographical location.""" - lon: float - """Longitude of the location.""" lat: float """Latitude of the location.""" + lon: float + """Longitude of the location."""