Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve failure status reason #117

Merged
merged 14 commits into from
Oct 26, 2023
2 changes: 1 addition & 1 deletion base/jobs/docker/1.0/py3/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
amazon-braket-default-simulator==1.20.1
amazon-braket-schemas==1.19.1
amazon-braket-pennylane-plugin==1.21.0
amazon-braket-sdk==1.58.0
amazon-braket-sdk==1.59.1
awscli==1.29.53
botocore==1.31.53
boto3==1.28.53
Expand Down
2 changes: 1 addition & 1 deletion pytorch/jobs/docker/2.0/py3/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
amazon-braket-default-simulator==1.20.1
amazon-braket-schemas==1.19.1
amazon-braket-pennylane-plugin==1.21.0
amazon-braket-sdk==1.58.0
amazon-braket-sdk==1.59.1
awscli==1.29.53
botocore==1.31.53
boto3==1.28.53
Expand Down
154 changes: 84 additions & 70 deletions src/braket_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import contextlib
import errno
import importlib
import inspect
import os
import json
import runpy
import shutil
import subprocess
import sys
Expand All @@ -38,10 +39,10 @@
print("Boto3 Version: ", boto3.__version__)


def log_failure_and_exit(*args):
def _log_failure(*args, display=True):
"""
Log failures to a file so that it can be parsed by the backend service and included in
failure messages for a job. Exists with code 0.
failure messages for a job.

Args:
args: variable list of text to write to the file.
Expand All @@ -50,7 +51,19 @@ def log_failure_and_exit(*args):
with open(ERROR_LOG_FILE, 'a') as error_log:
for text in args:
error_log.write(text)
print(text)
if display:
print(text)


def log_failure_and_exit(*args):
"""
Log failures to a file so that it can be parsed by the backend service and included in
failure messages for a job. Exists with code 0.

Args:
args: variable list of text to write to the file.
"""
_log_failure(*args)
sys.exit(0)
krneta marked this conversation as resolved.
Show resolved Hide resolved


Expand Down Expand Up @@ -136,34 +149,6 @@ def unpack_code_and_add_to_path(local_s3_file: str, compression_type: str):
sys.path.append(EXTRACTED_CUSTOMER_CODE_PATH)


def kick_off_customer_script(entry_point: str) -> multiprocessing.Process:
"""
Runs the customer script as a separate process.

Args:
entry_point (str): the entry point to the customer code, represented as <module>:<method>.

Returns:
Process: the process handle to the running process.
"""
try:
str_module, _, str_method = entry_point.partition(":")
customer_module = importlib.import_module(str_module)
customer_method = getattr(customer_module, str_method)

process_kwargs = {"target": customer_method}

function_args = try_bind_hyperparameters_to_customer_method(customer_method)
if function_args is not None:
process_kwargs["kwargs"] = function_args

customer_code_process = multiprocessing.Process(**process_kwargs)
customer_code_process.start()
except Exception as e:
log_failure_and_exit(f"Unable to run job at entry point {entry_point}\nException: {e}")
return customer_code_process


def try_bind_hyperparameters_to_customer_method(customer_method: Callable):
hp_file = os.getenv("AMZN_BRAKET_HP_FILE")
if hp_file is None:
Expand All @@ -186,19 +171,6 @@ def try_bind_hyperparameters_to_customer_method(customer_method: Callable):
return function_args


def join_customer_script(customer_code_process: multiprocessing.Process):
"""
Joins the process running the customer code.

Args:
customer_code_process (Process): the process running the customer code.
"""
try:
customer_code_process.join()
except Exception as e:
log_failure_and_exit(f"Job did not exit gracefully.\nException: {e}")


def get_code_setup_parameters() -> Tuple[str, str, str]:
"""
Returns the code setup parameters:
Expand Down Expand Up @@ -254,41 +226,84 @@ def install_additional_requirements() -> None:
log_failure_and_exit(f"Unable to install requirements.\nException: {e}")


def run_customer_code_as_process(entry_point: str) -> int:
def extract_customer_code(entry_point: str) -> Callable:
"""
Converts entry point to a runnable function.
"""
if entry_point.find(":") >= 0:
str_module, _, str_method = entry_point.partition(":")
customer_module = importlib.import_module(str_module)
customer_code = getattr(customer_module, str_method)
else:
def customer_code():
# equivalent to `python -m entry_point`
return runpy.run_module(entry_point, run_name="__main__")
krneta marked this conversation as resolved.
Show resolved Hide resolved
return customer_code


@contextlib.contextmanager
def in_extracted_code_dir():
current_dir = os.getcwd()
try:
os.chdir(EXTRACTED_CUSTOMER_CODE_PATH)
yield
finally:
os.chdir(current_dir)


def wrap_customer_code(customer_method: Callable) -> Callable:
def wrapped_customer_code(**kwargs):
try:
with in_extracted_code_dir():
return customer_method(**kwargs)
except Exception as e:
exception_type = type(e).__name__
krneta marked this conversation as resolved.
Show resolved Hide resolved
exception_string = (
exception_type
krneta marked this conversation as resolved.
Show resolved Hide resolved
if not str(e)
else f"{exception_type}: {e}"
)
_log_failure(exception_string, display=False)
krneta marked this conversation as resolved.
Show resolved Hide resolved
raise e
return wrapped_customer_code


def kick_off_customer_script(customer_code: Callable) -> multiprocessing.Process:
"""
When provided the name of the package and the method to run, we run them as a process.
Runs the customer script as a separate process.

Args:
entry_point (str): the code to run in the format <package>:<method>.
customer_code (Callable): The customer method to be run.

Returns:
int: The exit code of the customer code run.
Process: the process handle to the running process.
"""
print("Running Code As Process")
customer_code_process = kick_off_customer_script(entry_point)
join_customer_script(customer_code_process)
print("Code Run Finished")
return customer_code_process.exitcode
wrapped_customer_code = wrap_customer_code(customer_code)
process_kwargs = {"target": wrapped_customer_code}

function_args = try_bind_hyperparameters_to_customer_method(customer_code)
if function_args is not None:
process_kwargs["kwargs"] = function_args

customer_code_process = multiprocessing.Process(**process_kwargs)
customer_code_process.start()
return customer_code_process

def run_customer_code_as_subprocess(entry_point: str) -> int:

def join_customer_script(customer_code_process: multiprocessing.Process):
"""
When provided just the name of the module to run, we run it as a subprocess.
Joins the process running the customer code.

Args:
entry_point (str): the name of the module to run.

Returns:
int: The exit code of the customer code run.
customer_code_process (Process): the process running the customer code.
"""
print("Running Code As Subprocess")
try:
result = subprocess.run(["python", "-m", entry_point], cwd=EXTRACTED_CUSTOMER_CODE_PATH)
customer_code_process.join()
except Exception as e:
log_failure_and_exit(f"Unable to run job at entry point {entry_point}\nException: {e}")
log_failure_and_exit(f"Job did not exit gracefully.\nException: {e}")
krneta marked this conversation as resolved.
Show resolved Hide resolved
print("Code Run Finished")
return_code = result.returncode
return return_code
return customer_code_process.exitcode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're actually not using this returned value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated



def run_customer_code() -> None:
Expand All @@ -301,12 +316,11 @@ def run_customer_code() -> None:
local_s3_file = download_customer_code(s3_uri)
unpack_code_and_add_to_path(local_s3_file, compression_type)
install_additional_requirements()
if entry_point.find(":") >= 0:
exit_code = run_customer_code_as_process(entry_point)
else:
exit_code = run_customer_code_as_subprocess(entry_point)
if exit_code != 0:
log_failure_and_exit(f"Job at {entry_point} exited with exit code: {exit_code}")
customer_executable = extract_customer_code(entry_point)
customer_process = kick_off_customer_script(customer_executable)
join_customer_script(customer_process)
if customer_process.exitcode != 0:
sys.exit(customer_process.exitcode)


def setup_and_run():
Expand Down
2 changes: 1 addition & 1 deletion tensorflow/jobs/docker/2.13/py3/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
amazon-braket-default-simulator==1.20.1
amazon-braket-schemas==1.19.1
amazon-braket-pennylane-plugin==1.21.0
amazon-braket-sdk==1.58.0
amazon-braket-sdk==1.59.1
awscli==1.29.53
botocore==1.31.53
boto3==1.28.53
Expand Down
Loading