diff --git a/terraform/ec2-examples/distributed-ml-training/README.md b/terraform/ec2-examples/distributed-ml-training/README.md index 28eab556..92e567e5 100644 --- a/terraform/ec2-examples/distributed-ml-training/README.md +++ b/terraform/ec2-examples/distributed-ml-training/README.md @@ -144,9 +144,7 @@ Wrapping provided model in DistributedDataParallel. Result( metrics={'loss': 0.4192830347106792, 'accuracy': 0.8852}, - path='dt-results-EXAMPLE/ecs_dt_results/TorchTrainer_d1824_00000_0_(...)', - filesystem='s3', - checkpoint=None + (...) ) ``` @@ -166,6 +164,11 @@ terraform destroy ``` +## Troubleshooting + +* Error: creating ECS Service (...): InvalidParameterException: The specified capacity provider (...) was not found: There are some cases where the capacity provider is still being created and is not ready to be used by a service. Execute "terraform apply" again to solve the issue. + + ## Support Please open an issue for questions or unexpected behavior diff --git a/terraform/ec2-examples/distributed-ml-training/training_example.py b/terraform/ec2-examples/distributed-ml-training/training_example.py index c9f435bd..e7f5b1c8 100644 --- a/terraform/ec2-examples/distributed-ml-training/training_example.py +++ b/terraform/ec2-examples/distributed-ml-training/training_example.py @@ -13,7 +13,7 @@ import ray import time import argparse - +import tempfile # Get arguments parser = argparse.ArgumentParser() @@ -24,12 +24,6 @@ # Connect to the Ray cluster ray.init() -# Download the data in the shared storage -transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))]) -train_data = FashionMNIST(root='./data', - train=True, download=True, - transform=transform) - # Define the training function that the distributed processes will run def train_func(config): import os @@ -50,11 +44,11 @@ def train_func(config): # Setup loss and optimizer criterion = CrossEntropyLoss() optimizer = Adam(model.parameters(), lr=0.001) - # Retrieve the data from the shared storage. + + # Prepare data transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))]) with FileLock(os.path.expanduser("./data.lock")): train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform) - # Download test data from open datasets test_data = FashionMNIST(root="./data",train=False,download=True,transform=transform) batch_size=128 train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True) @@ -89,8 +83,20 @@ def train_func(config): test_loss /= len(test_loader) accuracy = num_correct / num_total - # Report metrics and checkpoint to Ray. - ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy}) + + # Save the checkpoint + with tempfile.TemporaryDirectory() as temp_checkpoint_dir: + checkpoint = None + # Only the global rank 0 worker saves the checkpoint + if ray.train.get_context().get_world_rank() == 0: + torch.save( + model.module.state_dict(), + os.path.join(temp_checkpoint_dir, "model.pt"), + ) + checkpoint = ray.train.Checkpoint.from_directory(os.path.join(temp_checkpoint_dir, "model.pt")) + + # Report metrics and checkpoint to Ray + ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy},checkpoint=checkpoint) # The scaling config defines how many worker processes to use for the training. Usually equals to the number of GPUs scaling_config = ScalingConfig(num_workers=2, use_gpu=True)