From a77525decdc4d3614069100949e43a63d279f081 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Mon, 19 Aug 2024 23:36:46 +0800 Subject: [PATCH 1/8] add dynamic_dest_dir into pythonpath before loading modules Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index e13650ee63..5309e11d0e 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -6,6 +6,7 @@ import pathlib import signal import subprocess +import sys import tempfile import traceback from sys import exit @@ -376,6 +377,8 @@ def _execute_task( dynamic_addl_distro, dynamic_dest_dir, ) as ctx: + if all(os.path.realpath(path) != dynamic_dest_dir for path in sys.path): + sys.path.append(dynamic_dest_dir) resolver_obj = load_object_from_module(resolver) # Use the resolver to load the actual task object _task_def = resolver_obj.load_task(loader_args=resolver_args) From ed5cc27f32d8ddee6b05f1671edfbac19061d386 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Fri, 23 Aug 2024 00:37:06 +0800 Subject: [PATCH 2/8] Add dest_dir to pythonpath before creating subprocess Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 5309e11d0e..87e87f60aa 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -377,8 +377,6 @@ def _execute_task( dynamic_addl_distro, dynamic_dest_dir, ) as ctx: - if all(os.path.realpath(path) != dynamic_dest_dir for path in sys.path): - sys.path.append(dynamic_dest_dir) resolver_obj = load_object_from_module(resolver) # Use the resolver to load the actual task object _task_def = resolver_obj.load_task(loader_args=resolver_args) @@ -545,7 +543,14 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # Use the commandline to run the task execute command rather than calling it directly in python code # since the current runtime bytecode references the older user code, rather than the downloaded distribution. - p = subprocess.Popen(cmd) + env = os.environ.copy() + if dest_dir is not None: + if all(os.path.realpath(path) != dest_dir for path in sys.path): + if "PYTHONPATH" in env: + env["PYTHONPATH"] += os.pathsep + dest_dir + else: + env["PYTHONPATH"] = dest_dir + p = subprocess.Popen(cmd,env=env) def handle_sigterm(signum, frame): logger.info(f"passing signum {signum} [frame={frame}] to subprocess") From b62938df27ea41d16af14f69efe856f51deaaae4 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Fri, 23 Aug 2024 01:13:30 +0800 Subject: [PATCH 3/8] improve code Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index d13ba93698..8b8ed3edc0 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -550,13 +550,12 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # Use the commandline to run the task execute command rather than calling it directly in python code # since the current runtime bytecode references the older user code, rather than the downloaded distribution. env = os.environ.copy() - if dest_dir is not None: - if all(os.path.realpath(path) != dest_dir for path in sys.path): - if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + dest_dir - else: - env["PYTHONPATH"] = dest_dir - p = subprocess.Popen(cmd,env=env) + if all(os.path.realpath(path) != dest_dir for path in sys.path) and dest_dir is not None: + if "PYTHONPATH" in env: + env["PYTHONPATH"] += os.pathsep + dest_dir + else: + env["PYTHONPATH"] = dest_dir + p = subprocess.Popen(cmd, env=env) def handle_sigterm(signum, frame): logger.info(f"passing signum {signum} [frame={frame}] to subprocess") From 05ea7f8a031862f5aa409d35ce2b6e20fbe53141 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Sat, 24 Aug 2024 00:38:37 +0800 Subject: [PATCH 4/8] resolve dest_dir Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 8b8ed3edc0..495512d23f 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -550,11 +550,11 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # Use the commandline to run the task execute command rather than calling it directly in python code # since the current runtime bytecode references the older user code, rather than the downloaded distribution. env = os.environ.copy() - if all(os.path.realpath(path) != dest_dir for path in sys.path) and dest_dir is not None: + if dest_dir is not None: if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + dest_dir + env["PYTHONPATH"] += os.pathsep + os.path.realpath(dest_dir) else: - env["PYTHONPATH"] = dest_dir + env["PYTHONPATH"] = os.path.realpath(dest_dir) p = subprocess.Popen(cmd, env=env) def handle_sigterm(signum, frame): From 198294018e3ad315495292b632d2ec7ec7e08c4b Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Tue, 27 Aug 2024 08:08:23 +0800 Subject: [PATCH 5/8] add dest_dir to test Signed-off-by: Nelson Chen --- tests/flytekit/integration/remote/test_remote.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index ef47aa3529..1ac55fa2ad 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -31,6 +31,7 @@ PROJECT = "flytesnacks" DOMAIN = "development" VERSION = f"v{os.getpid()}" +DEST_DIR = "/tmp" @pytest.fixture(scope="session") @@ -65,6 +66,8 @@ def run(file_name, wf_name, *args): CONFIG, "run", "--remote", + "--destination-dir", + DEST_DIR, "--image", IMAGE, "--project", From 66ad2214840dd091e792a469dd38487c617857f8 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Tue, 17 Sep 2024 20:50:16 +0800 Subject: [PATCH 6/8] consider ~ is in path Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 495512d23f..1ade6a5473 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -551,10 +551,11 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # since the current runtime bytecode references the older user code, rather than the downloaded distribution. env = os.environ.copy() if dest_dir is not None: + dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + os.path.realpath(dest_dir) + env["PYTHONPATH"] += os.pathsep + dest_dir_resolved else: - env["PYTHONPATH"] = os.path.realpath(dest_dir) + env["PYTHONPATH"] = dest_dir_resolved p = subprocess.Popen(cmd, env=env) def handle_sigterm(signum, frame): From 5557af743d4c7f7ef1bba6b485698ed479dd82d7 Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Tue, 17 Sep 2024 22:29:45 +0800 Subject: [PATCH 7/8] test Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 74069af0a0..345fd3a459 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -564,11 +564,10 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # since the current runtime bytecode references the older user code, rather than the downloaded distribution. env = os.environ.copy() if dest_dir is not None: - dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + dest_dir_resolved + env["PYTHONPATH"] += os.pathsep + os.path.realpath(dest_dir) else: - env["PYTHONPATH"] = dest_dir_resolved + env["PYTHONPATH"] = os.path.realpath(dest_dir) p = subprocess.Popen(cmd, env=env) def handle_sigterm(signum, frame): From 65ac03f86d393f208574b1e4972cb6ca28ae7f5d Mon Sep 17 00:00:00 2001 From: Nelson Chen Date: Tue, 17 Sep 2024 22:47:09 +0800 Subject: [PATCH 8/8] consider if ~ is in dest_dir Signed-off-by: Nelson Chen --- flytekit/bin/entrypoint.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 345fd3a459..74069af0a0 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -564,10 +564,11 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # since the current runtime bytecode references the older user code, rather than the downloaded distribution. env = os.environ.copy() if dest_dir is not None: + dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + os.path.realpath(dest_dir) + env["PYTHONPATH"] += os.pathsep + dest_dir_resolved else: - env["PYTHONPATH"] = os.path.realpath(dest_dir) + env["PYTHONPATH"] = dest_dir_resolved p = subprocess.Popen(cmd, env=env) def handle_sigterm(signum, frame):