#!/usr/bin/env python3 """ Multi-GPU training launcher for progressive reasoning model Supports DDP, FSDP, and DeepSpeed strategies """ import os import sys import argparse import subprocess import torch from pathlib import Path # Add src to path sys.path.append(str(Path(__file__).parent.parent)) def get_gpu_count(): """Get the number of available GPUs""" if torch.cuda.is_available(): return torch.cuda.device_count() return 0 def setup_environment_for_strategy(strategy): """Set up environment variables for different strategies""" if strategy == "deepspeed": # DeepSpeed specific environment os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "12355" os.environ["RANK"] = "0" os.environ["LOCAL_RANK"] = "0" os.environ["WORLD_SIZE"] = str(get_gpu_count()) elif strategy in ["ddp", "fsdp"]: # Standard distributed training environment os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "12355" # Let torchrun handle the rest # General optimizations os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" os.environ["NCCL_DEBUG"] = "INFO" os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO" def launch_ddp_training(config_path, num_gpus): """Launch DDP training using torchrun""" print(f"Launching DDP training on {num_gpus} GPUs...") setup_environment_for_strategy("ddp") # Use torchrun for DDP cmd = [ "torchrun", "--nproc_per_node", str(num_gpus), "--master_port", "12355", "scripts/train_progressive.py", "--config", config_path, "--distributed" ] print(f"Running command: {' '.join(cmd)}") return subprocess.run(cmd, cwd=Path(__file__).parent.parent) def launch_fsdp_training(config_path, num_gpus): """Launch FSDP training using accelerate""" print(f"Launching FSDP training on {num_gpus} GPUs...") setup_environment_for_strategy("fsdp") # Create accelerate config for FSDP accelerate_config = f""" compute_environment: LOCAL_MACHINE distributed_type: FSDP fsdp_config: fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP fsdp_backward_prefetch: BACKWARD_PRE fsdp_cpu_ram_efficient_loading: true fsdp_forward_prefetch: false fsdp_offload_params: false fsdp_sharding_strategy: FULL_SHARD fsdp_state_dict_type: SHARDED_STATE_DICT fsdp_sync_module_states: true fsdp_transformer_layer_cls_to_wrap: Gemma2DecoderLayer fsdp_use_orig_params: true machine_rank: 0 main_training_function: main mixed_precision: bf16 num_machines: 1 num_processes: {num_gpus} rdzv_backend: static same_network: true tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false """ # Save config temporarily config_file = Path(__file__).parent.parent / "accelerate_config.yaml" with open(config_file, "w") as f: f.write(accelerate_config) cmd = [ "accelerate", "launch", "--config_file", str(config_file), "scripts/train_progressive.py", "--config", config_path ] print(f"Running command: {' '.join(cmd)}") result = subprocess.run(cmd, cwd=Path(__file__).parent.parent) # Clean up config file config_file.unlink(missing_ok=True) return result def launch_deepspeed_training(config_path, num_gpus): """Launch DeepSpeed training""" print(f"Launching DeepSpeed training on {num_gpus} GPUs...") setup_environment_for_strategy("deepspeed") # Use --num_gpus without hostfile for single node cmd = [ "deepspeed", "--num_gpus", str(num_gpus), "scripts/train_progressive.py", "--config", config_path, "--deepspeed" ] print(f"Running command: {' '.join(cmd)}") result = subprocess.run(cmd, cwd=Path(__file__).parent.parent) return result def main(): parser = argparse.ArgumentParser(description="Multi-GPU Progressive LLM Training") parser.add_argument("--config", type=str, required=True, help="Path to training configuration file") parser.add_argument("--strategy", type=str, default="ddp", choices=["ddp", "fsdp", "deepspeed"], help="Multi-GPU strategy to use") parser.add_argument("--num_gpus", type=int, default=None, help="Number of GPUs to use (default: all available)") parser.add_argument("--dry_run", action="store_true", help="Print commands without executing") args = parser.parse_args() # Get GPU count available_gpus = get_gpu_count() if available_gpus == 0: print("❌ No GPUs available!") sys.exit(1) num_gpus = args.num_gpus or available_gpus if num_gpus > available_gpus: print(f"❌ Requested {num_gpus} GPUs but only {available_gpus} available") sys.exit(1) # Check config file exists if not Path(args.config).exists(): print(f"❌ Config file not found: {args.config}") sys.exit(1) print("Progressive LLM Training - Multi-GPU Launcher") print("=" * 60) print(f"Strategy: {args.strategy}") print(f"GPUs: {num_gpus} / {available_gpus}") print(f"Config: {args.config}") print("=" * 60) if args.dry_run: print("DRY RUN - Commands that would be executed:") # Show what would be run if args.strategy == "ddp": print("torchrun --nproc_per_node", num_gpus, "scripts/train_progressive.py") elif args.strategy == "fsdp": print("accelerate launch --config_file accelerate_config.yaml scripts/train_progressive.py") elif args.strategy == "deepspeed": print("deepspeed --num_gpus", num_gpus, "scripts/train_progressive.py") return # Launch training if args.strategy == "ddp": result = launch_ddp_training(args.config, num_gpus) elif args.strategy == "fsdp": result = launch_fsdp_training(args.config, num_gpus) elif args.strategy == "deepspeed": result = launch_deepspeed_training(args.config, num_gpus) if result.returncode == 0: print("✅ Training completed successfully!") else: print("❌ Training failed!") sys.exit(result.returncode) if __name__ == "__main__": main()