Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError in Ray TorchTrainer with StreamSplitDataIterator #271

Open
AliHaiderAhmad001 opened this issue Oct 10, 2024 · 0 comments
Open

Comments

@AliHaiderAhmad001
Copy link

What happened + What you expected to happen

Description:

I encountered a TypeError when running TorchTrainer in a Ray Tune experiment. The error occurs due to an issue with StreamSplitDataIterator, which does not have a defined len() method. This issue causes the trial to fail and the training process to stop.

Error Traceback:

---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
 File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
   raise skipped from exception_cause(skipped)
 File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
   ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
 File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
   raise skipped from exception_cause(skipped)
 File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
   train_func(*args, **kwargs)
 File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()

Expected Behavior:

I expected the training loop to handle the StreamSplitDataIterator properly without raising the TypeError.

Actual Behavior:

The TypeError prevents the training process from completing, as it attempts to calculate the length of an object that lacks a __len__ method.

Detailed Traceback

2024-10-10 16:42:48,349	DEBUG resource_updater.py:258 -- Checking Ray cluster resources.
2024-10-10 16:42:48,673	DEBUG tune_controller.py:1240 -- Future TRAIN FAILED for trial TorchTrainer_ccdf1_00000: ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()
2024-10-10 16:42:48,674	ERROR tune_controller.py:1331 -- Trial task failed for trial TorchTrainer_ccdf1_00000
Traceback (most recent call last):
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 2691, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()
2024-10-10 16:42:48,843	DEBUG tune_controller.py:1367 -- Requesting to STOP actor for trial TorchTrainer_ccdf1_00000
2024-10-10 16:42:48,845	DEBUG tune_controller.py:735 -- Setting status for trial TorchTrainer_ccdf1_00000 from RUNNING to ERROR
2024-10-10 16:42:48,845	DEBUG tune_controller.py:1396 -- Terminating actor for trial TorchTrainer_ccdf1_00000: <TrackedActor 55643445138953405172927010354651748243>
2024-10-10 16:42:48,852	DEBUG experiment_state.py:122 -- Experiment state snapshotting took 0.00 seconds. Adjusting snapshotting period to 10.00 seconds.
2024-10-10 16:42:48,941	DEBUG experiment_state.py:122 -- Experiment state snapshotting took 0.08 seconds. Adjusting snapshotting period to 10.00 seconds.
2024-10-10 16:42:48,942	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/ali/Desktop/projects/AIUpdateHub/efs/llm' in 0.0838s.
2024-10-10 16:42:48,949	DEBUG tune_controller.py:784 -- CLEANING UP all trials
2024-10-10 16:42:48,952	DEBUG tune_controller.py:800 -- Waiting for actor manager to clean up final state [dedup]
== Status ==
Current time: 2024-10-10 16:42:48 (running for 00:00:50.81)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/4 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                     |
|--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TorchTrainer_ccdf1_00000 |            1 | /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts/TorchTrainer_ccdf1_00000_0_2024-10-10_16-41-58/error.txt |
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

== Status ==
Current time: 2024-10-10 16:42:48 (running for 00:00:50.90)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/4 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                     |
|--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TorchTrainer_ccdf1_00000 |            1 | /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts/TorchTrainer_ccdf1_00000_0_2024-10-10_16-41-58/error.txt |
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

2024-10-10 16:42:49,091	DEBUG tune_controller.py:1149 -- Actor STOPPED: <TrackedActor 55643445138953405172927010354651748243>
2024-10-10 16:42:49,094	DEBUG tune_controller.py:805 -- Force cleanup of remaining actors
2024-10-10 16:42:49,099	ERROR tune.py:1037 -- Trials did not complete: [TorchTrainer_ccdf1_00000]
2024-10-10 16:42:49,103	INFO tune.py:1041 -- Total run time: 51.81 seconds (50.81 seconds for the tuning loop).
---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()

The above exception was the direct cause of the following exception:

TrainingFailedError                       Traceback (most recent call last)
File <timed exec>:2

File ~/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/base_trainer.py:638, in BaseTrainer.fit(self)
    634 result = result_grid[0]
    635 if result.error:
    636     # Raise trainable errors to the user with a message to restore
    637     # or configure FailureConfig in a new run.
--> 638     raise TrainingFailedError(
    639         "\n".join([restore_msg, TrainingFailedError._FAILURE_CONFIG_MSG])
    640     ) from result.error
    641 return result

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: trainer = TorchTrainer.restore("/home/ali/Desktop/projects/AIUpdateHub/efs/llm").
To start a new run that will retry on training failures, set train.RunConfig(failure_config=train.FailureConfig(max_failures)) in the Trainer's run_config with max_failures > 0, or max_failures = -1 for unlimited retries

Versions / Dependencies

Environment:

Ray version: 2.37.0
Python version: 3.10
OS: Ubuntu
Hardware: CPU-based training on local laptop

Reproduction script

Related codes

----------- Trainer -----------------

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    metadata={"class_to_index": preprocessor.class_to_index}
)

------------------------ train_step -------------------------

def train_step(ds, batch_size, model, num_classes, loss_fn, optimizer):
    """Train step."""
    model.train()  # Set model to training mode
    cumulative_loss = 0.0  # Initialize cumulative loss
    #ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)  # Batch-wise generator
    ds_generator = ds.iter_batches(batch_size=batch_size, batch_format="torch", collate_fn=collate_fn)
    # Loop over batches
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()  # Reset gradients before each batch
        z = model(batch)  # Forward pass
        
        # Ensure that targets are one-hot encoded properly
        targets = F.one_hot(batch["targets"], num_classes=num_classes).float()
        
        # Calculate loss
        loss = loss_fn(z, targets)
        
        # Backpropagation
        loss.backward()  # Backward pass
        optimizer.step()  # Update model weights
        
        # Calculate cumulative loss
        cumulative_loss += loss.detach().item()
    
    # Return the average loss over all batches
    return cumulative_loss / (i + 1)

----------------- train_loop_per_worker -------------------------

# Set up logging
logging.basicConfig(level=logging.INFO)  # You can set this to DEBUG for more detail
logger = logging.getLogger(__name__)

# Training loop
def train_loop_per_worker(config):
    # Hyperparameters
    dropout_p = config["dropout_p"]
    lr = config["lr"]
    lr_factor = config["lr_factor"]
    lr_patience = config["lr_patience"]
    num_epochs = config["num_epochs"]
    batch_size = config["batch_size"]
    num_classes = config["num_classes"]

    # Get datasets
    set_seeds()
    logger.info("Loading dataset shards...")
    train_ds = train.get_dataset_shard("train")
    val_ds = train.get_dataset_shard("val")
    logger.info(f"Dataset shards loaded. Training data size: {len(train_ds)}, Validation data size: {len(val_ds)}")

    # Model
    llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    model = FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
    model = train.torch.prepare_model(model)
    logger.info("Model initialized.")

    # Training components
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)
    logger.info("Optimizer and scheduler initialized.")

    # Training
    num_workers = train.get_context().get_world_size()
    batch_size_per_worker = batch_size // num_workers
    logger.info(f"Training started with {num_workers} workers and batch size per worker: {batch_size_per_worker}")

    for epoch in range(num_epochs):
        logger.info(f"Epoch {epoch + 1}/{num_epochs}")
        train_loss = train_step(train_ds, batch_size_per_worker, model, num_classes, loss_fn, optimizer)
        logger.info(f"Training loss for epoch {epoch + 1}: {train_loss}")

        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, num_classes, loss_fn)
        logger.info(f"Validation loss for epoch {epoch + 1}: {val_loss}")
        scheduler.step(val_loss)

        # Checkpoint
        with tempfile.TemporaryDirectory() as dp:
            if isinstance(model, torch.nn.parallel.DistributedDataParallel):  # CPU case
                model.module.save(dp=dp)
            else:
                model.save(dp=dp)
            metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
            checkpoint = Checkpoint.from_directory(dp)
            train.report(metrics, checkpoint=checkpoint)
            logger.info(f"Epoch {epoch + 1} completed and checkpoint saved.")

--------- load_data ----------

def load_data(dataset_loc: str, num_samples: int = None) -> Dataset:
    """Load data from source into a Ray Dataset.

    Args:
        dataset_loc (str): Location of the dataset.
        num_samples (int, optional): The number of samples to load. Defaults to None.

    Returns:
        Dataset: Our dataset represented by a Ray Dataset.
    """
    ds = ray.data.read_csv(dataset_loc)
    ds = ds.random_shuffle(seed=1234)
    ds = ray.data.from_items(ds.take(num_samples)) if num_samples else ds
    return ds
    ```

### Issue Severity

High: It blocks me from completing my task.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant