From eeb5ee04f6293387bfdea7f298ef224587178839 Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Thu, 3 Oct 2024 11:58:47 -0700 Subject: [PATCH 1/2] Support zip file without .zip suffix --- .../scheduler_core/scheduler_base_job_runner.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py index 6e0010f55..5451656b3 100755 --- a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py +++ b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py @@ -205,6 +205,16 @@ def retrieve_and_unzip_package(self, package_name, package_url): os.remove(local_package_file) ssl._create_default_https_context = ssl._create_unverified_context + # HOT FIX: + print("HOT FIX: package_url: ", package_url) + + # we detect a substring called: "?X-Amz-Algorithm=" + # we remove this substring and everything after it + if "?X-Amz-Algorithm=" in package_url: + package_url = package_url.split("?X-Amz-Algorithm=")[0] + + print("AFTER HOT FIX: package_url: ", package_url) + # Open a process to download the package so that we can avoid the request is blocked and check the timeout. from multiprocessing import Process completed_event = multiprocessing.Event() From fe1bd5242dde72cfb83accf967b7ceb35ea8c507 Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Tue, 8 Oct 2024 15:11:43 -0700 Subject: [PATCH 2/2] Allow usr indicated public ip. --- python/fedml/api/__init__.py | 8 ++++---- python/fedml/api/modules/device.py | 8 +++++--- python/fedml/cli/modules/login.py | 12 ++++++++++-- .../model_scheduler/device_server_constants.py | 1 + .../model_scheduler/master_job_runner.py | 4 +++- .../scheduler_core/general_constants.py | 16 ++++++++++++++++ 6 files changed, 39 insertions(+), 10 deletions(-) diff --git a/python/fedml/api/__init__.py b/python/fedml/api/__init__.py index 6c82c9b9b..4256d8c21 100755 --- a/python/fedml/api/__init__.py +++ b/python/fedml/api/__init__.py @@ -215,9 +215,9 @@ def login(api_key, computing, server, supplier, master_inference_gateway_port: int = ServerConstants.MODEL_INFERENCE_DEFAULT_PORT, worker_inference_proxy_port: int = ClientConstants.LOCAL_CLIENT_API_PORT, worker_connection_type: str = ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT, - marketplace_type: str = MarketplaceType.SECURE.name, price_per_hour: float = 0.0, name=""): + marketplace_type: str = MarketplaceType.SECURE.name, price_per_hour: float = 0.0, name="", node_ip=""): device_bind(api_key, computing, server, supplier, master_inference_gateway_port, worker_inference_proxy_port, - worker_connection_type, marketplace_type, price_per_hour, name) + worker_connection_type, marketplace_type, price_per_hour, name, node_ip) def logout(computing, server): @@ -225,11 +225,11 @@ def logout(computing, server): def device_bind(api_key, computing, server, supplier, master_inference_gateway_port, worker_inference_proxy_port, - worker_connection_type, marketplace_type, price_per_hour, name): + worker_connection_type, marketplace_type, price_per_hour, name, node_ip): device.bind(api_key=api_key, computing=computing, server=server, supplier=supplier, master_inference_gateway_port=master_inference_gateway_port, worker_inference_proxy_port=worker_inference_proxy_port, worker_connection_type=worker_connection_type, - marketplace_type=marketplace_type, price_per_hour=price_per_hour, name=name) + marketplace_type=marketplace_type, price_per_hour=price_per_hour, name=name, node_ip=node_ip) def device_unbind(computing, server): diff --git a/python/fedml/api/modules/device.py b/python/fedml/api/modules/device.py index 27b2d0d19..2d347047b 100644 --- a/python/fedml/api/modules/device.py +++ b/python/fedml/api/modules/device.py @@ -22,7 +22,8 @@ def bind( api_key, computing, server, supplier, marketplace_type, price_per_hour, name, master_inference_gateway_port=DeviceServerConstants.MODEL_INFERENCE_DEFAULT_PORT, worker_inference_proxy_port=DeviceClientConstants.LOCAL_CLIENT_API_PORT, - worker_connection_type=DeviceClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): + worker_connection_type=DeviceClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT, + node_ip=""): userid = api_key runner_cmd = "{}" device_id = "0" @@ -48,12 +49,12 @@ def bind( userid, computing, server, api_key, role, runner_cmd, device_id, os_name, docker, master_inference_gateway_port, worker_inference_proxy_port, worker_connection_type, marketplace_type, - price_per_hour, name) + price_per_hour, name, node_ip) def _bind( userid, computing, server, api_key, role, runner_cmd, device_id, os_name, docker, master_inference_gateway_port, - worker_inference_proxy_port, worker_connection_type, marketplace_type, price_per_hour, name): + worker_inference_proxy_port, worker_connection_type, marketplace_type, price_per_hour, name, node_ip): fedml.load_env() if os.getenv(ModuleConstants.ENV_FEDML_INFER_HOST) is None: fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_HOST, SchedulerConstants.REDIS_INFER_HOST) @@ -67,6 +68,7 @@ def _bind( fedml.set_env_kv(DeviceServerConstants.ENV_MASTER_INFERENCE_PORT_KEY, str(master_inference_gateway_port)) fedml.set_env_kv(DeviceClientConstants.ENV_CLIENT_PROXY_PORT_KEY, str(worker_inference_proxy_port)) fedml.set_env_kv(DeviceClientConstants.ENV_CONNECTION_TYPE_KEY, worker_connection_type) + fedml.set_env_kv(DeviceServerConstants.ENV_NODE_PUBLIC_IP_KEY, node_ip) url = fedml._get_backend_service() platform_name = platform.system() diff --git a/python/fedml/cli/modules/login.py b/python/fedml/cli/modules/login.py index b76346ec1..5ab1395f1 100644 --- a/python/fedml/cli/modules/login.py +++ b/python/fedml/cli/modules/login.py @@ -55,6 +55,13 @@ default=80, help="The port for local on-premise Nexus AI Platform.", ) +@click.option( + "--node_ip", + "-nip", + type=str, + default="", + help="The IP address for the node.", +) @click.option( "--master_inference_gateway_port", "-mgp", @@ -107,7 +114,7 @@ def fedml_login( api_key, version, compute_node, server, provider, deploy_worker_num, local_on_premise_platform, local_on_premise_platform_port, master_inference_gateway_port, worker_inference_proxy_port, worker_connection_type, marketplace_type, - price_per_hour, name + price_per_hour, name, node_ip ): fedml.set_env_version(version) fedml.set_local_on_premise_platform_host(local_on_premise_platform) @@ -129,7 +136,8 @@ def fedml_login( pass os.environ["FEDML_MODEL_WORKER_NUM"] = str(deploy_worker_num) fedml.api.login(api_key, compute_node, server, provider, master_inference_gateway_port, - worker_inference_proxy_port, worker_connection_type, marketplace_type, price_per_hour, name) + worker_inference_proxy_port, worker_connection_type, marketplace_type, price_per_hour, name, + node_ip) def __validate_mpt_pph(marketplace_type, price_per_hour): diff --git a/python/fedml/computing/scheduler/model_scheduler/device_server_constants.py b/python/fedml/computing/scheduler/model_scheduler/device_server_constants.py index 44eaeb937..b0fbcd729 100644 --- a/python/fedml/computing/scheduler/model_scheduler/device_server_constants.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_server_constants.py @@ -102,6 +102,7 @@ class ServerConstants(object): FEDML_RUNNING_SOURCE_ENV_VALUE_K8S = "k8s" AUTO_DETECT_PUBLIC_IP = "auto_detect_public_ip" + ENV_NODE_PUBLIC_IP_KEY = "FEDML_NODE_PUBLIC_IP" MODEL_INFERENCE_DEFAULT_PORT = 2203 ENV_MASTER_INFERENCE_PORT_KEY = "FEDML_MASTER_INFERENCE_GATEWAY_PORT" MODEL_CACHE_KEY_EXPIRE_TIME = 1 * 10 diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index 00b08acfb..0326c5e26 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -757,7 +757,9 @@ def build_dynamic_constrain_variables(self, run_id, run_config): def construct_final_gateway_url(self, end_point_id): inference_port_external = ServerConstants.get_inference_master_gateway_port() - ip = GeneralConstants.get_ip_address(self.request_json) + + # TODO(Raphael): Check whether get_ingress_address can replace get_ip_address + ip = GeneralConstants.get_ingress_address() identifier = "inference" if self.deployed_replica_payload is not None: diff --git a/python/fedml/computing/scheduler/scheduler_core/general_constants.py b/python/fedml/computing/scheduler/scheduler_core/general_constants.py index 0fbd4881d..26a3d2d75 100755 --- a/python/fedml/computing/scheduler/scheduler_core/general_constants.py +++ b/python/fedml/computing/scheduler/scheduler_core/general_constants.py @@ -224,6 +224,22 @@ def get_ip_address(request_json, infer_host=None): return ip + @staticmethod + def get_ingress_address(): + # This only for virtual ip if it is different from the result from https://checkip.amazonaws.com + # Priority 1: Auto detect public ip + logging.info("Checking public ip for master") + ip = GeneralConstants.get_public_ip() + + # Priority 2: Use user indicated ip, it should be from the environment variable + infer_host = os.getenv(device_server_constants.ServerConstants.ENV_NODE_PUBLIC_IP_KEY, "") + if (infer_host is not None and infer_host != "" and + infer_host != "127.0.0.1" and infer_host != "localhost"): + logging.info("Use user indicated (Env Var) ip for master: " + infer_host) + ip = infer_host + + return ip + @staticmethod def get_topic_complete_job(server_id): topic_complete_job = f"status_center/master_agent_{server_id}/complete_job"