Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-node #29

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/HumanEval/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
self.logger.info("Generating responses for Human Eval...")
outputs = self.compute(model, all_instances)

if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
continue

generated_examples = []
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/IFEval/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
self.logger.info("Generating responses...")
outputs = self.compute(model, all_instances)

if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
return None

generated_examples = []
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/MBPP/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
outputs = self.compute(model, all_instances)

# Return None early for non-primary ranks
if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
return None

generated_examples = []
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/MTBench/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ def get_model_answers(self, model: LM, model_id: str, questions: List[Dict[str,
all_convs[q_idx].append({"role": "assistant", "content": output})
all_choices[q_idx]["turns"].append(output)

if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
continue

# Save completed conversations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def load_xft_model(model_path, xft_config: XftConfig):
tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=False, padding_side="left", trust_remote_code=True)
xft_model = xfastertransformer.AutoModel.from_pretrained(model_path, dtype=data_type)
model = XftModel(xft_model=xft_model, xft_config=xft_config)
if model.model.rank > 0:
if model.model.accelerator.process_index > 0:
while True:
model.model.generate()
return model, tokenizer
12 changes: 8 additions & 4 deletions eval/chat_benchmarks/MixEval/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,20 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
out_dict = {}

self.logger.info("Generating responses for MixEval...")
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1

for split in splits:
self.args.split = split
all_results = self._eval_split(model, split)
if model.rank == 0:
if is_main_process:
response_file = self._get_response_file()
with open(response_file, "w") as f:
for result in all_results:
f.write(json.dumps(result) + "\n")
out_dict[split] = all_results

# Only return results on rank 0
if model.world_size > 1 and model.rank != 0:
if not is_main_process:
return None
return out_dict

Expand Down Expand Up @@ -192,7 +194,8 @@ def _eval_split(self, model: LM, split: str) -> List[Dict[str, Any]]:
for idx in list(range(len(eval_dataset.raw_inputs))):
eval_dataset.raw_inputs[idx]["response"] = all_responses[idx]

if model.rank == 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if is_main_process:
with open(response_file, "w") as f:
for item in eval_dataset.raw_inputs:
json_line = json.dumps(item)
Expand Down Expand Up @@ -243,7 +246,8 @@ def run_benchmark(self, model: LM) -> Dict[str, Any]:
generation_results = self.generate_responses(model)

# Only evaluate on rank 0
if model.world_size > 1 and model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
return None

evaluation_results = self.evaluate_responses(generation_results)
Expand Down
23 changes: 16 additions & 7 deletions eval/chat_benchmarks/RepoBench/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
if self.legacy_mode:
return self._generate_responses_legacy(model)

if model.rank == 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if is_main_process:
temp_dir_obj = tempfile.TemporaryDirectory()
temp_dir = temp_dir_obj.name

Expand All @@ -76,8 +77,14 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:

all_instances = []
# Split dataset across ranks for parallel construction
# Get subset of dataset for this rank using built-in slice functionality
rank_dataset = list(islice(dataset, model.rank, len(dataset), model.world_size))
# Get subset of dataset for this rank using the same slicing strategy as the compute function
if hasattr(model, 'accelerator'):
chunk_size = len(dataset) // model.world_size
start = model.accelerator.process_index * chunk_size
end = start + chunk_size if model.accelerator.process_index < model.world_size - 1 else len(dataset)
rank_dataset = dataset.select(range(start, end))
else:
rank_dataset = list(islice(dataset, model.rank, len(dataset), model.world_size))

# Process examples for this rank's shard
for idx, example in enumerate(rank_dataset):
Expand All @@ -100,7 +107,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
outputs = self.compute(model, all_instances, do_slice=False)

# Only rank 0 should save the results
if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
continue

generated_examples = []
Expand All @@ -118,7 +126,7 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
for ex in generated_examples:
fw.write(json.dumps(ex) + "\n")

if model.rank == 0:
if is_main_process:
return {"temp_dir_obj": temp_dir_obj}

def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]:
Expand All @@ -129,6 +137,7 @@ def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]:
temp_dir_obj = tempfile.TemporaryDirectory()
temp_dir = temp_dir_obj.name

is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
for lang in self.languages:
for subset in self.subsets:
dataset = load_data(split="test", task="completion", language=lang, length="2k", setting=subset)
Expand All @@ -155,8 +164,8 @@ def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]:
)

outputs = self.compute(model, all_instances, do_slice=False)

if model.rank != 0:
if not is_main_process:
continue

generated_examples = []
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/WildBench/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
outputs = self.compute(model, all_instances)

# Return None early for non-primary ranks
if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
return None

outputs = [[output] for output in outputs]
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/alpaca_eval/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:
self.logger.info("Generating responses for Alpaca Eval...")
outputs = self.compute(model, all_instances)

if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
return None

model_outputs = []
Expand Down
3 changes: 2 additions & 1 deletion eval/chat_benchmarks/zeroeval/eval_instruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]:

outputs = self.compute(model, all_instances)

if model.rank != 0:
is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1
if not is_main_process:
continue

outputs = [[output] for output in outputs]
Expand Down
104 changes: 102 additions & 2 deletions eval/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import sys
import time
from typing import Optional, List, Dict, Union
from pathlib import Path

import concurrent.futures
import torch.distributed as dist
from huggingface_hub import snapshot_download

from lm_eval import utils
from lm_eval import evaluator as pretrain_evaluator
Expand All @@ -26,6 +28,103 @@
from eval.eval_tracker import DCEvaluationTracker


class ModelInitializer:
"""Handles model initialization for distributed evaluations."""

def __init__(self, cache_dir: Optional[str] = None):
self.cache_dir = cache_dir or os.getenv('HF_HOME',
os.path.join(os.path.expanduser("~"), ".cache", "huggingface"))
self._ensure_directory(self.cache_dir)

def _ensure_directory(self, path: str) -> None:
"""Safely create directory if it doesn't exist."""
Path(path).mkdir(parents=True, exist_ok=True)

def download_model(self, model_id: str) -> None:
"""Download model files with proper error handling."""
try:
snapshot_download(
repo_id=model_id,
cache_dir=self.cache_dir,
local_files_only=False,
resume_download=True
)
except Exception as e:
raise RuntimeError(f"Failed to download model {model_id}: {str(e)}")


def initialize_model_for_eval(
model: Union[str, LM],
model_args: Optional[str] = None,
batch_size: int = None,
max_batch_size: Optional[int] = None,
device: Optional[str] = None,
cache_dir: Optional[str] = None
) -> LM:
"""
Initialize model for distributed evaluation where each node runs independent evaluations.

Args:
model (Union[str, LM]):
Either a string identifier for the model to load from registry,
or an already instantiated LM object.
model_args (Optional[str], optional):
Additional arguments for model initialization as a string.
Only used if model is provided as a string. Defaults to None.
batch_size (Optional[int], optional):
Batch size for model inference. Defaults to None.
max_batch_size (Optional[int], optional):
Maximum allowed batch size. Defaults to None.
device (Optional[str], optional):
Device to load the model on (e.g., 'cuda', 'cpu'). Defaults to None.

Returns:
LM:
Initialized language model instance with configured parameters
and a sanitized model identifier.
"""
local_rank = int(os.getenv('LOCAL_RANK', '0'))

if isinstance(model, str):
initializer = ModelInitializer(cache_dir)

try:
initializer.download_model(model)
except Exception as e:
print(f"Rank {local_rank} failed to initialize model: {str(e)}")
if dist.is_initialized():
dist.barrier() # Ensure all ranks fail together
raise e

if dist.is_initialized():
dist.barrier()

if model_args is None:
model_args = ""

config = {
"batch_size": batch_size,
"max_batch_size": max_batch_size,
"device": device,
}

try:
lm = lm_eval.api.registry.get_model(model).create_from_arg_string(
model_args,
config,
)
except Exception as e:
print(f"Rank {local_rank} failed to create model: {str(e)}")
if dist.is_initialized():
dist.barrier()
raise e
else:
lm = model

lm.model_identifier = sanitize_model_name(f"model_{model}_model_args_{model_args}")
return lm


def setup_custom_parser():
"""
Create a custom argument parser that extends lm-eval-harness parser.
Expand Down Expand Up @@ -148,7 +247,7 @@ def evaluate(
cpu_count = os.cpu_count()

max_workers = min(len(valid_tasks), cpu_count * 2)
if lm.world_size <= 1 or lm.rank == 0:
if (hasattr(lm, 'accelerator') and lm.accelerator.process_index == 0) or lm.world_size <= 1 or :
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
evaluate_results = list(
executor.map(
Expand Down Expand Up @@ -302,7 +401,8 @@ def cli_evaluate(args: Optional[argparse.Namespace] = None) -> None:
)

# Add metadata to results
if lm.rank == 0:
is_main_process = lm.accelerator.process_index == 0 if hasattr(lm, 'accelerator') else lm.world_size <= 1
if is_main_process:
add_results_metadata(results, args, lm)
handle_evaluation_output(results, args, evaluation_tracker, wandb_logger)

Expand Down
16 changes: 8 additions & 8 deletions eval/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ def __init__(self, logger: Optional[logging.Logger] = None):

def compute(self, model: LM, inputs: List[Instance], do_slice: bool = True) -> List[str]:
if model.world_size > 1 and do_slice:
prompts = list(islice(inputs, model.rank, len(inputs), model.world_size))
chunk_size = len(inputs) // model.world_size
start = model.accelerator.process_index * chunk_size
end = start + chunk_size if model.accelerator.process_index < model.world_size - 1 else len(inputs)
prompts = inputs[start:end]
else:
prompts = inputs

results = model.generate_until(prompts)
if model.world_size > 1:
all_results = [None for _ in range(model.world_size)]

dist.all_gather_object(all_results, results)

# Merge results from all ranks
length = sum(len(res) for res in all_results if res is not None)
merged = [None] * length
for rank, sub_results in enumerate(all_results):
# Simply concatenate results in rank order
merged = []
for sub_results in all_results:
if sub_results is not None:
for i, item in enumerate(sub_results):
merged[i * model.world_size + rank] = item
merged.extend(sub_results)
return merged
else:
return results
Expand Down