From c59b26e2fef84f061cca432cfc98c1b40d7993f0 Mon Sep 17 00:00:00 2001 From: Michal Kapinus Date: Fri, 19 Apr 2024 10:59:33 +0200 Subject: [PATCH] feat: added support for the edge switchover version bump --- era_5g_client/BUILD | 2 +- era_5g_client/client.py | 25 ++++++++-- era_5g_client/middleware_resource_checker.py | 21 ++++++--- tests/BUILD | 4 ++ tests/dummy_middleware.py | 48 ++++++++++++++++++++ 5 files changed, 87 insertions(+), 13 deletions(-) create mode 100644 tests/dummy_middleware.py diff --git a/era_5g_client/BUILD b/era_5g_client/BUILD index 19172d5..c3dae56 100644 --- a/era_5g_client/BUILD +++ b/era_5g_client/BUILD @@ -9,7 +9,7 @@ python_distribution( sdist=True, provides=setup_py( name="era_5g_client", - version="0.11.0", + version="1.0.0", description="A client for 5G-ERA Network Applications", author="Michal Kapinus", author_email="ikapinus@fit.vutbr.cz", diff --git a/era_5g_client/client.py b/era_5g_client/client.py index adfbde0..8ea711d 100644 --- a/era_5g_client/client.py +++ b/era_5g_client/client.py @@ -81,6 +81,8 @@ def __init__( self.resource_checker: Optional[MiddlewareResourceChecker] = None self.middleware_info: Optional[MiddlewareInfo] = None self.token: Optional[str] = None + self.args: Optional[Dict[str, Any]] = None + self.switching: bool = False def connect_to_middleware(self, middleware_info: MiddlewareInfo) -> None: """Authenticates with the Middleware and obtains a token for future calls. @@ -136,6 +138,7 @@ def run_task( str(self.token), self.action_plan_id, self.middleware_info.build_api_endpoint("orchestrate/orchestrate/plan"), + self.netapp_address_changed, daemon=True, ) @@ -180,16 +183,28 @@ def register( if not self.resource_checker.is_ready(): raise NetAppNotReady("Not ready.") - + self.args = args super().register(netapp_address, args, wait_until_available, wait_timeout) + def netapp_address_changed(self): + self.switching = True + self.load_netapp_address() + self.disconnect() + self.register(self.netapp_address, self.args, True) + self.switching = False + def disconnect(self) -> None: - """Disconnects the WebSocket connection, stop resource checker and delete resources.""" + """Disconnects the WebSocket connection, stop resource checker and delete resources. + + Args: + finish (bool, optional): If true, the resource checker is finished and all resources are deleted from the server + """ super().disconnect() - if self.resource_checker is not None: - self.resource_checker.stop() - self.delete_all_resources() + if not self.switching: + if self.resource_checker is not None: + self.resource_checker.stop() + self.delete_all_resources() def wait_until_netapp_ready(self) -> None: """Blocking wait until the 5G-ERA Network Application is running. diff --git a/era_5g_client/middleware_resource_checker.py b/era_5g_client/middleware_resource_checker.py index ac2c347..0e765cf 100644 --- a/era_5g_client/middleware_resource_checker.py +++ b/era_5g_client/middleware_resource_checker.py @@ -15,7 +15,12 @@ class MiddlewareResourceChecker(Thread): """Class for checking Middleware resources.""" def __init__( - self, token: str, action_plan_id: str, status_endpoint: str, state_callback: Optional[Callable] = None, **kw + self, + token: str, + action_plan_id: str, + status_endpoint: str, + url_changed_callback: Optional[Callable] = None, + **kw, ) -> None: """Constructor. @@ -23,7 +28,7 @@ def __init__( token (str): Login token. action_plan_id (str): Action plan ID. status_endpoint (str): Status endpoint. - state_callback (Callable, optional): Optional state callback. + url_changed_callback (Callable, optional): Trigered if the recieved url has changed. **kw: Thread arguments. """ @@ -32,7 +37,7 @@ def __init__( self.token = token self.action_plan_id = action_plan_id self.resource_state: Optional[Dict] = None - self.state_callback = state_callback + self.url_changed_callback = url_changed_callback self.status_endpoint = status_endpoint self.status: Optional[str] = None # TODO define as enum? self.url: Optional[str] = None @@ -50,7 +55,7 @@ def run(self) -> None: while not self.stop_event.is_set(): resource_state = self.get_resource_status() - + print(resource_state) seq = resource_state.get("actionSequence", []) if seq: services = seq[0].get("Services", []) @@ -58,10 +63,11 @@ def run(self) -> None: self.resource_state = services[0] assert isinstance(self.resource_state, dict) self.status = self.resource_state.get("serviceStatus", None) + old_url = self.url self.url = self.resource_state.get("serviceUrl", None) + if old_url and self.url_changed_callback and old_url != self.url: + self.url_changed_callback() logger.debug(f"{self.status=}, {self.url=}") - if self.state_callback: - self.state_callback(self.resource_state) time.sleep(0.5) # TODO: adjust or use something similar to rospy.rate.sleep() def get_resource_status(self) -> Dict: @@ -82,8 +88,9 @@ def get_resource_status(self) -> Dict: else: logger.debug(e) raise FailedToConnect(f"Could not get the resource status, revisit the log files for more details. {e}") - + print(response) resp = response.json() + print(resp) if isinstance(resp, dict): return resp else: diff --git a/tests/BUILD b/tests/BUILD index dabf212..4224d8a 100644 --- a/tests/BUILD +++ b/tests/BUILD @@ -1 +1,5 @@ python_tests() + +python_sources( + name="tests0", +) diff --git a/tests/dummy_middleware.py b/tests/dummy_middleware.py new file mode 100644 index 0000000..d824489 --- /dev/null +++ b/tests/dummy_middleware.py @@ -0,0 +1,48 @@ +import uuid +from typing import Tuple + +from flask import Flask, Response, jsonify + +app = Flask(__name__) + + +url = "http://localhost:5800" +counter = 1 + + +@app.route("/Login", methods=["POST"]) +def login() -> Tuple[Response, int]: + # Generate a random token + token = str(uuid.uuid4()) + return jsonify({"token": token}), 200 + + +@app.route("/Task/Plan", methods=["POST"]) +def create_task_plan() -> Response: + global counter + counter = 1 + action_plan_id = str(uuid.uuid4()) + response = {"ActionPlanId": action_plan_id} + return jsonify(response) + + +@app.route("/orchestrate/orchestrate/plan/", methods=["GET"]) +def get_orchestrated_plan(action_plan_id: str) -> Response: + global counter + url_local = url + counter += 1 + if counter >= 10: + url_local = "http://localhost:5801" + response = {"actionSequence": [{"Services": [{"serviceStatus": "Active", "serviceUrl": url_local}]}]} + return jsonify(response) + + +@app.route("/orchestrate/orchestrate/plan/", methods=["DELETE"]) +def delete_orchestrated_plan(action_plan_id: str) -> Response: + global counter + counter = 1 + return jsonify({}) + + +if __name__ == "__main__": + app.run(debug=True)