Skip to content

Commit

Permalink
Add troubleshooting section to README. Add checkpoint and remove shar…
Browse files Browse the repository at this point in the history
…ed storage refs in training script
  • Loading branch information
sfloresk committed Feb 8, 2024
1 parent 7d13157 commit e7f9bb4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
9 changes: 6 additions & 3 deletions terraform/ec2-examples/distributed-ml-training/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ Wrapping provided model in DistributedDataParallel.
Result(
metrics={'loss': 0.4192830347106792, 'accuracy': 0.8852},
path='dt-results-EXAMPLE/ecs_dt_results/TorchTrainer_d1824_00000_0_(...)',
filesystem='s3',
checkpoint=None
(...)
)
```

Expand All @@ -166,6 +164,11 @@ terraform destroy

```

## Troubleshooting

* Error: creating ECS Service (...): InvalidParameterException: The specified capacity provider (...) was not found: There are some cases where the capacity provider is still being created and is not ready to be used by a service. Execute "terraform apply" again to solve the issue.


## Support

Please open an issue for questions or unexpected behavior
28 changes: 17 additions & 11 deletions terraform/ec2-examples/distributed-ml-training/training_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import ray
import time
import argparse

import tempfile
# Get arguments

parser = argparse.ArgumentParser()
Expand All @@ -24,12 +24,6 @@
# Connect to the Ray cluster
ray.init()

# Download the data in the shared storage
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
train_data = FashionMNIST(root='./data',
train=True, download=True,
transform=transform)

# Define the training function that the distributed processes will run
def train_func(config):
import os
Expand All @@ -50,11 +44,11 @@ def train_func(config):
# Setup loss and optimizer
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
# Retrieve the data from the shared storage.

# Prepare data
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
with FileLock(os.path.expanduser("./data.lock")):
train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform)
# Download test data from open datasets
test_data = FashionMNIST(root="./data",train=False,download=True,transform=transform)
batch_size=128
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
Expand Down Expand Up @@ -89,8 +83,20 @@ def train_func(config):

test_loss /= len(test_loader)
accuracy = num_correct / num_total
# Report metrics and checkpoint to Ray.
ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})

# Save the checkpoint
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
checkpoint = None
# Only the global rank 0 worker saves the checkpoint
if ray.train.get_context().get_world_rank() == 0:
torch.save(
model.module.state_dict(),
os.path.join(temp_checkpoint_dir, "model.pt"),
)
checkpoint = ray.train.Checkpoint.from_directory(os.path.join(temp_checkpoint_dir, "model.pt"))

# Report metrics and checkpoint to Ray
ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy},checkpoint=checkpoint)

# The scaling config defines how many worker processes to use for the training. Usually equals to the number of GPUs
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
Expand Down

0 comments on commit e7f9bb4

Please sign in to comment.