diff --git a/eval/chat_benchmarks/HumanEval/eval_instruct.py b/eval/chat_benchmarks/HumanEval/eval_instruct.py index ad705099..7ddc3470 100644 --- a/eval/chat_benchmarks/HumanEval/eval_instruct.py +++ b/eval/chat_benchmarks/HumanEval/eval_instruct.py @@ -112,7 +112,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: self.logger.info("Generating responses for Human Eval...") outputs = self.compute(model, all_instances) - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: continue generated_examples = [] diff --git a/eval/chat_benchmarks/IFEval/eval_instruct.py b/eval/chat_benchmarks/IFEval/eval_instruct.py index 7e500dbe..5c7b1d8e 100644 --- a/eval/chat_benchmarks/IFEval/eval_instruct.py +++ b/eval/chat_benchmarks/IFEval/eval_instruct.py @@ -115,7 +115,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: self.logger.info("Generating responses...") outputs = self.compute(model, all_instances) - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: return None generated_examples = [] diff --git a/eval/chat_benchmarks/MBPP/eval_instruct.py b/eval/chat_benchmarks/MBPP/eval_instruct.py index 875c8bac..6d589bf6 100644 --- a/eval/chat_benchmarks/MBPP/eval_instruct.py +++ b/eval/chat_benchmarks/MBPP/eval_instruct.py @@ -161,7 +161,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: outputs = self.compute(model, all_instances) # Return None early for non-primary ranks - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: return None generated_examples = [] diff --git a/eval/chat_benchmarks/MTBench/eval_instruct.py b/eval/chat_benchmarks/MTBench/eval_instruct.py index 202662ac..7a61f238 100644 --- a/eval/chat_benchmarks/MTBench/eval_instruct.py +++ b/eval/chat_benchmarks/MTBench/eval_instruct.py @@ -151,7 +151,8 @@ def get_model_answers(self, model: LM, model_id: str, questions: List[Dict[str, all_convs[q_idx].append({"role": "assistant", "content": output}) all_choices[q_idx]["turns"].append(output) - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: continue # Save completed conversations diff --git a/eval/chat_benchmarks/MTBench/fastchat/modules/xfastertransformer.py b/eval/chat_benchmarks/MTBench/fastchat/modules/xfastertransformer.py index 8c95d4d7..557ec4c4 100644 --- a/eval/chat_benchmarks/MTBench/fastchat/modules/xfastertransformer.py +++ b/eval/chat_benchmarks/MTBench/fastchat/modules/xfastertransformer.py @@ -36,7 +36,7 @@ def load_xft_model(model_path, xft_config: XftConfig): tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=False, padding_side="left", trust_remote_code=True) xft_model = xfastertransformer.AutoModel.from_pretrained(model_path, dtype=data_type) model = XftModel(xft_model=xft_model, xft_config=xft_config) - if model.model.rank > 0: + if model.model.accelerator.process_index > 0: while True: model.model.generate() return model, tokenizer diff --git a/eval/chat_benchmarks/MixEval/eval_instruct.py b/eval/chat_benchmarks/MixEval/eval_instruct.py index d92c7e8e..f15f2b52 100644 --- a/eval/chat_benchmarks/MixEval/eval_instruct.py +++ b/eval/chat_benchmarks/MixEval/eval_instruct.py @@ -132,10 +132,12 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: out_dict = {} self.logger.info("Generating responses for MixEval...") + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + for split in splits: self.args.split = split all_results = self._eval_split(model, split) - if model.rank == 0: + if is_main_process: response_file = self._get_response_file() with open(response_file, "w") as f: for result in all_results: @@ -143,7 +145,7 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: out_dict[split] = all_results # Only return results on rank 0 - if model.world_size > 1 and model.rank != 0: + if not is_main_process: return None return out_dict @@ -192,7 +194,8 @@ def _eval_split(self, model: LM, split: str) -> List[Dict[str, Any]]: for idx in list(range(len(eval_dataset.raw_inputs))): eval_dataset.raw_inputs[idx]["response"] = all_responses[idx] - if model.rank == 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if is_main_process: with open(response_file, "w") as f: for item in eval_dataset.raw_inputs: json_line = json.dumps(item) @@ -243,7 +246,8 @@ def run_benchmark(self, model: LM) -> Dict[str, Any]: generation_results = self.generate_responses(model) # Only evaluate on rank 0 - if model.world_size > 1 and model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: return None evaluation_results = self.evaluate_responses(generation_results) diff --git a/eval/chat_benchmarks/RepoBench/eval_instruct.py b/eval/chat_benchmarks/RepoBench/eval_instruct.py index cb207c10..f6760ee6 100644 --- a/eval/chat_benchmarks/RepoBench/eval_instruct.py +++ b/eval/chat_benchmarks/RepoBench/eval_instruct.py @@ -59,7 +59,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: if self.legacy_mode: return self._generate_responses_legacy(model) - if model.rank == 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if is_main_process: temp_dir_obj = tempfile.TemporaryDirectory() temp_dir = temp_dir_obj.name @@ -76,8 +77,14 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: all_instances = [] # Split dataset across ranks for parallel construction - # Get subset of dataset for this rank using built-in slice functionality - rank_dataset = list(islice(dataset, model.rank, len(dataset), model.world_size)) + # Get subset of dataset for this rank using the same slicing strategy as the compute function + if hasattr(model, 'accelerator'): + chunk_size = len(dataset) // model.world_size + start = model.accelerator.process_index * chunk_size + end = start + chunk_size if model.accelerator.process_index < model.world_size - 1 else len(dataset) + rank_dataset = dataset.select(range(start, end)) + else: + rank_dataset = list(islice(dataset, model.rank, len(dataset), model.world_size)) # Process examples for this rank's shard for idx, example in enumerate(rank_dataset): @@ -100,7 +107,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: outputs = self.compute(model, all_instances, do_slice=False) # Only rank 0 should save the results - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: continue generated_examples = [] @@ -118,7 +126,7 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: for ex in generated_examples: fw.write(json.dumps(ex) + "\n") - if model.rank == 0: + if is_main_process: return {"temp_dir_obj": temp_dir_obj} def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]: @@ -129,6 +137,7 @@ def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]: temp_dir_obj = tempfile.TemporaryDirectory() temp_dir = temp_dir_obj.name + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 for lang in self.languages: for subset in self.subsets: dataset = load_data(split="test", task="completion", language=lang, length="2k", setting=subset) @@ -155,8 +164,8 @@ def _generate_responses_legacy(self, model: LM) -> Dict[str, Any]: ) outputs = self.compute(model, all_instances, do_slice=False) - - if model.rank != 0: + + if not is_main_process: continue generated_examples = [] diff --git a/eval/chat_benchmarks/WildBench/eval_instruct.py b/eval/chat_benchmarks/WildBench/eval_instruct.py index 100e6d68..2ff69cea 100644 --- a/eval/chat_benchmarks/WildBench/eval_instruct.py +++ b/eval/chat_benchmarks/WildBench/eval_instruct.py @@ -196,7 +196,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: outputs = self.compute(model, all_instances) # Return None early for non-primary ranks - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: return None outputs = [[output] for output in outputs] diff --git a/eval/chat_benchmarks/alpaca_eval/eval_instruct.py b/eval/chat_benchmarks/alpaca_eval/eval_instruct.py index 0a79b1e8..09f86da4 100644 --- a/eval/chat_benchmarks/alpaca_eval/eval_instruct.py +++ b/eval/chat_benchmarks/alpaca_eval/eval_instruct.py @@ -117,7 +117,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: self.logger.info("Generating responses for Alpaca Eval...") outputs = self.compute(model, all_instances) - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: return None model_outputs = [] diff --git a/eval/chat_benchmarks/zeroeval/eval_instruct.py b/eval/chat_benchmarks/zeroeval/eval_instruct.py index c68a4379..2186b79e 100644 --- a/eval/chat_benchmarks/zeroeval/eval_instruct.py +++ b/eval/chat_benchmarks/zeroeval/eval_instruct.py @@ -144,7 +144,8 @@ def generate_responses(self, model: LM) -> Dict[str, Any]: outputs = self.compute(model, all_instances) - if model.rank != 0: + is_main_process = model.accelerator.process_index == 0 if hasattr(model, 'accelerator') else model.world_size <= 1 + if not is_main_process: continue outputs = [[output] for output in outputs] diff --git a/eval/eval.py b/eval/eval.py index 5bd177cf..6f5fad43 100644 --- a/eval/eval.py +++ b/eval/eval.py @@ -5,9 +5,11 @@ import sys import time from typing import Optional, List, Dict, Union +from pathlib import Path import concurrent.futures import torch.distributed as dist +from huggingface_hub import snapshot_download from lm_eval import utils from lm_eval import evaluator as pretrain_evaluator @@ -26,6 +28,103 @@ from eval.eval_tracker import DCEvaluationTracker +class ModelInitializer: + """Handles model initialization for distributed evaluations.""" + + def __init__(self, cache_dir: Optional[str] = None): + self.cache_dir = cache_dir or os.getenv('HF_HOME', + os.path.join(os.path.expanduser("~"), ".cache", "huggingface")) + self._ensure_directory(self.cache_dir) + + def _ensure_directory(self, path: str) -> None: + """Safely create directory if it doesn't exist.""" + Path(path).mkdir(parents=True, exist_ok=True) + + def download_model(self, model_id: str) -> None: + """Download model files with proper error handling.""" + try: + snapshot_download( + repo_id=model_id, + cache_dir=self.cache_dir, + local_files_only=False, + resume_download=True + ) + except Exception as e: + raise RuntimeError(f"Failed to download model {model_id}: {str(e)}") + + +def initialize_model_for_eval( + model: Union[str, LM], + model_args: Optional[str] = None, + batch_size: int = None, + max_batch_size: Optional[int] = None, + device: Optional[str] = None, + cache_dir: Optional[str] = None +) -> LM: + """ + Initialize model for distributed evaluation where each node runs independent evaluations. + + Args: + model (Union[str, LM]): + Either a string identifier for the model to load from registry, + or an already instantiated LM object. + model_args (Optional[str], optional): + Additional arguments for model initialization as a string. + Only used if model is provided as a string. Defaults to None. + batch_size (Optional[int], optional): + Batch size for model inference. Defaults to None. + max_batch_size (Optional[int], optional): + Maximum allowed batch size. Defaults to None. + device (Optional[str], optional): + Device to load the model on (e.g., 'cuda', 'cpu'). Defaults to None. + + Returns: + LM: + Initialized language model instance with configured parameters + and a sanitized model identifier. + """ + local_rank = int(os.getenv('LOCAL_RANK', '0')) + + if isinstance(model, str): + initializer = ModelInitializer(cache_dir) + + try: + initializer.download_model(model) + except Exception as e: + print(f"Rank {local_rank} failed to initialize model: {str(e)}") + if dist.is_initialized(): + dist.barrier() # Ensure all ranks fail together + raise e + + if dist.is_initialized(): + dist.barrier() + + if model_args is None: + model_args = "" + + config = { + "batch_size": batch_size, + "max_batch_size": max_batch_size, + "device": device, + } + + try: + lm = lm_eval.api.registry.get_model(model).create_from_arg_string( + model_args, + config, + ) + except Exception as e: + print(f"Rank {local_rank} failed to create model: {str(e)}") + if dist.is_initialized(): + dist.barrier() + raise e + else: + lm = model + + lm.model_identifier = sanitize_model_name(f"model_{model}_model_args_{model_args}") + return lm + + def setup_custom_parser(): """ Create a custom argument parser that extends lm-eval-harness parser. @@ -148,7 +247,7 @@ def evaluate( cpu_count = os.cpu_count() max_workers = min(len(valid_tasks), cpu_count * 2) - if lm.world_size <= 1 or lm.rank == 0: + if (hasattr(lm, 'accelerator') and lm.accelerator.process_index == 0) or lm.world_size <= 1 or : with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: evaluate_results = list( executor.map( @@ -302,7 +401,8 @@ def cli_evaluate(args: Optional[argparse.Namespace] = None) -> None: ) # Add metadata to results - if lm.rank == 0: + is_main_process = lm.accelerator.process_index == 0 if hasattr(lm, 'accelerator') else lm.world_size <= 1 + if is_main_process: add_results_metadata(results, args, lm) handle_evaluation_output(results, args, evaluation_tracker, wandb_logger) diff --git a/eval/task.py b/eval/task.py index aec96eca..337dd6d4 100644 --- a/eval/task.py +++ b/eval/task.py @@ -20,23 +20,23 @@ def __init__(self, logger: Optional[logging.Logger] = None): def compute(self, model: LM, inputs: List[Instance], do_slice: bool = True) -> List[str]: if model.world_size > 1 and do_slice: - prompts = list(islice(inputs, model.rank, len(inputs), model.world_size)) + chunk_size = len(inputs) // model.world_size + start = model.accelerator.process_index * chunk_size + end = start + chunk_size if model.accelerator.process_index < model.world_size - 1 else len(inputs) + prompts = inputs[start:end] else: prompts = inputs results = model.generate_until(prompts) if model.world_size > 1: all_results = [None for _ in range(model.world_size)] - dist.all_gather_object(all_results, results) - # Merge results from all ranks - length = sum(len(res) for res in all_results if res is not None) - merged = [None] * length - for rank, sub_results in enumerate(all_results): + # Simply concatenate results in rank order + merged = [] + for sub_results in all_results: if sub_results is not None: - for i, item in enumerate(sub_results): - merged[i * model.world_size + rank] = item + merged.extend(sub_results) return merged else: return results