Skip to content

Commit

Permalink
feat: added support for the edge switchover
Browse files Browse the repository at this point in the history
  version bump
  • Loading branch information
Kapim committed Apr 24, 2024
1 parent 374591d commit c59b26e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 13 deletions.
2 changes: 1 addition & 1 deletion era_5g_client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python_distribution(
sdist=True,
provides=setup_py(
name="era_5g_client",
version="0.11.0",
version="1.0.0",
description="A client for 5G-ERA Network Applications",
author="Michal Kapinus",
author_email="[email protected]",
Expand Down
25 changes: 20 additions & 5 deletions era_5g_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def __init__(
self.resource_checker: Optional[MiddlewareResourceChecker] = None
self.middleware_info: Optional[MiddlewareInfo] = None
self.token: Optional[str] = None
self.args: Optional[Dict[str, Any]] = None
self.switching: bool = False

def connect_to_middleware(self, middleware_info: MiddlewareInfo) -> None:
"""Authenticates with the Middleware and obtains a token for future calls.
Expand Down Expand Up @@ -136,6 +138,7 @@ def run_task(
str(self.token),
self.action_plan_id,
self.middleware_info.build_api_endpoint("orchestrate/orchestrate/plan"),
self.netapp_address_changed,
daemon=True,
)

Expand Down Expand Up @@ -180,16 +183,28 @@ def register(

if not self.resource_checker.is_ready():
raise NetAppNotReady("Not ready.")

self.args = args
super().register(netapp_address, args, wait_until_available, wait_timeout)

def netapp_address_changed(self):
self.switching = True
self.load_netapp_address()
self.disconnect()
self.register(self.netapp_address, self.args, True)
self.switching = False

def disconnect(self) -> None:
"""Disconnects the WebSocket connection, stop resource checker and delete resources."""
"""Disconnects the WebSocket connection, stop resource checker and delete resources.
Args:
finish (bool, optional): If true, the resource checker is finished and all resources are deleted from the server
"""

super().disconnect()
if self.resource_checker is not None:
self.resource_checker.stop()
self.delete_all_resources()
if not self.switching:
if self.resource_checker is not None:
self.resource_checker.stop()
self.delete_all_resources()

def wait_until_netapp_ready(self) -> None:
"""Blocking wait until the 5G-ERA Network Application is running.
Expand Down
21 changes: 14 additions & 7 deletions era_5g_client/middleware_resource_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ class MiddlewareResourceChecker(Thread):
"""Class for checking Middleware resources."""

def __init__(
self, token: str, action_plan_id: str, status_endpoint: str, state_callback: Optional[Callable] = None, **kw
self,
token: str,
action_plan_id: str,
status_endpoint: str,
url_changed_callback: Optional[Callable] = None,
**kw,
) -> None:
"""Constructor.
Args:
token (str): Login token.
action_plan_id (str): Action plan ID.
status_endpoint (str): Status endpoint.
state_callback (Callable, optional): Optional state callback.
url_changed_callback (Callable, optional): Trigered if the recieved url has changed.
**kw: Thread arguments.
"""

Expand All @@ -32,7 +37,7 @@ def __init__(
self.token = token
self.action_plan_id = action_plan_id
self.resource_state: Optional[Dict] = None
self.state_callback = state_callback
self.url_changed_callback = url_changed_callback
self.status_endpoint = status_endpoint
self.status: Optional[str] = None # TODO define as enum?
self.url: Optional[str] = None
Expand All @@ -50,18 +55,19 @@ def run(self) -> None:

while not self.stop_event.is_set():
resource_state = self.get_resource_status()

print(resource_state)
seq = resource_state.get("actionSequence", [])
if seq:
services = seq[0].get("Services", [])
if services:
self.resource_state = services[0]
assert isinstance(self.resource_state, dict)
self.status = self.resource_state.get("serviceStatus", None)
old_url = self.url
self.url = self.resource_state.get("serviceUrl", None)
if old_url and self.url_changed_callback and old_url != self.url:
self.url_changed_callback()
logger.debug(f"{self.status=}, {self.url=}")
if self.state_callback:
self.state_callback(self.resource_state)
time.sleep(0.5) # TODO: adjust or use something similar to rospy.rate.sleep()

def get_resource_status(self) -> Dict:
Expand All @@ -82,8 +88,9 @@ def get_resource_status(self) -> Dict:
else:
logger.debug(e)
raise FailedToConnect(f"Could not get the resource status, revisit the log files for more details. {e}")

print(response)
resp = response.json()
print(resp)
if isinstance(resp, dict):
return resp
else:
Expand Down
4 changes: 4 additions & 0 deletions tests/BUILD
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
python_tests()

python_sources(
name="tests0",
)
48 changes: 48 additions & 0 deletions tests/dummy_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import uuid
from typing import Tuple

from flask import Flask, Response, jsonify

app = Flask(__name__)


url = "http://localhost:5800"
counter = 1


@app.route("/Login", methods=["POST"])
def login() -> Tuple[Response, int]:
# Generate a random token
token = str(uuid.uuid4())
return jsonify({"token": token}), 200


@app.route("/Task/Plan", methods=["POST"])
def create_task_plan() -> Response:
global counter
counter = 1
action_plan_id = str(uuid.uuid4())
response = {"ActionPlanId": action_plan_id}
return jsonify(response)


@app.route("/orchestrate/orchestrate/plan/<string:action_plan_id>", methods=["GET"])
def get_orchestrated_plan(action_plan_id: str) -> Response:
global counter
url_local = url
counter += 1
if counter >= 10:
url_local = "http://localhost:5801"
response = {"actionSequence": [{"Services": [{"serviceStatus": "Active", "serviceUrl": url_local}]}]}
return jsonify(response)


@app.route("/orchestrate/orchestrate/plan/<string:action_plan_id>", methods=["DELETE"])
def delete_orchestrated_plan(action_plan_id: str) -> Response:
global counter
counter = 1
return jsonify({})


if __name__ == "__main__":
app.run(debug=True)

0 comments on commit c59b26e

Please sign in to comment.