progressive-llm/scripts/train_multi_gpu.py
2025-07-10 22:47:07 +09:00

212 lines
No EOL
6.4 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Multi-GPU training launcher for progressive reasoning model
Supports DDP, FSDP, and DeepSpeed strategies
"""
import os
import sys
import argparse
import subprocess
import torch
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent.parent))
def get_gpu_count():
"""Get the number of available GPUs"""
if torch.cuda.is_available():
return torch.cuda.device_count()
return 0
def setup_environment_for_strategy(strategy):
"""Set up environment variables for different strategies"""
if strategy == "deepspeed":
# DeepSpeed specific environment
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
os.environ["WORLD_SIZE"] = str(get_gpu_count())
elif strategy in ["ddp", "fsdp"]:
# Standard distributed training environment
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
# Let torchrun handle the rest
# General optimizations
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
os.environ["NCCL_DEBUG"] = "INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"
def launch_ddp_training(config_path, num_gpus):
"""Launch DDP training using torchrun"""
print(f"Launching DDP training on {num_gpus} GPUs...")
setup_environment_for_strategy("ddp")
# Use torchrun for DDP
python_cmd = ["python", "scripts/train_progressive.py"]
cmd = [
"torchrun",
"--nproc_per_node", str(num_gpus),
"--master_port", "12355",
] + python_cmd + [
"--config", config_path,
"--distributed"
]
print(f"Running command: {' '.join(cmd)}")
return subprocess.run(cmd, cwd=Path(__file__).parent.parent)
def launch_fsdp_training(config_path, num_gpus):
"""Launch FSDP training using accelerate"""
print(f"Launching FSDP training on {num_gpus} GPUs...")
setup_environment_for_strategy("fsdp")
# Create accelerate config for FSDP
accelerate_config = f"""
compute_environment: LOCAL_MACHINE
distributed_type: FSDP
fsdp_config:
fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
fsdp_backward_prefetch: BACKWARD_PRE
fsdp_cpu_ram_efficient_loading: true
fsdp_forward_prefetch: false
fsdp_offload_params: false
fsdp_sharding_strategy: FULL_SHARD
fsdp_state_dict_type: SHARDED_STATE_DICT
fsdp_sync_module_states: true
fsdp_transformer_layer_cls_to_wrap: Gemma2DecoderLayer
fsdp_use_orig_params: true
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: {num_gpus}
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
"""
# Save config temporarily
config_file = Path(__file__).parent.parent / "accelerate_config.yaml"
with open(config_file, "w") as f:
f.write(accelerate_config)
python_cmd = ["python", "scripts/train_progressive.py"]
cmd = [
"accelerate", "launch",
"--config_file", str(config_file),
] + python_cmd + [
"--config", config_path
]
print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, cwd=Path(__file__).parent.parent)
# Clean up config file
config_file.unlink(missing_ok=True)
return result
def launch_deepspeed_training(config_path, num_gpus):
"""Launch DeepSpeed training"""
print(f"Launching DeepSpeed training on {num_gpus} GPUs...")
setup_environment_for_strategy("deepspeed")
python_cmd = ["python", "scripts/train_progressive.py"]
# Use --num_gpus without hostfile for single node
cmd = [
"deepspeed",
"--num_gpus", str(num_gpus),
] + python_cmd + [
"--config", config_path,
"--deepspeed"
]
print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, cwd=Path(__file__).parent.parent)
return result
def main():
parser = argparse.ArgumentParser(description="Multi-GPU Progressive LLM Training")
parser.add_argument("--config", type=str, required=True,
help="Path to training configuration file")
parser.add_argument("--strategy", type=str, default="ddp",
choices=["ddp", "fsdp", "deepspeed"],
help="Multi-GPU strategy to use")
parser.add_argument("--num_gpus", type=int, default=None,
help="Number of GPUs to use (default: all available)")
parser.add_argument("--dry_run", action="store_true",
help="Print commands without executing")
args = parser.parse_args()
# Get GPU count
available_gpus = get_gpu_count()
if available_gpus == 0:
print("❌ No GPUs available!")
sys.exit(1)
num_gpus = args.num_gpus or available_gpus
if num_gpus > available_gpus:
print(f"❌ Requested {num_gpus} GPUs but only {available_gpus} available")
sys.exit(1)
# Check config file exists
if not Path(args.config).exists():
print(f"❌ Config file not found: {args.config}")
sys.exit(1)
print("Progressive LLM Training - Multi-GPU Launcher")
print("=" * 60)
print(f"Strategy: {args.strategy}")
print(f"GPUs: {num_gpus} / {available_gpus}")
print(f"Config: {args.config}")
print("=" * 60)
if args.dry_run:
print("DRY RUN - Commands that would be executed:")
# Show what would be run
if args.strategy == "ddp":
print("torchrun --nproc_per_node", num_gpus, "scripts/train_progressive.py")
elif args.strategy == "fsdp":
print("accelerate launch --config_file accelerate_config.yaml scripts/train_progressive.py")
elif args.strategy == "deepspeed":
print("deepspeed --num_gpus", num_gpus, "scripts/train_progressive.py")
return
# Launch training
if args.strategy == "ddp":
result = launch_ddp_training(args.config, num_gpus)
elif args.strategy == "fsdp":
result = launch_fsdp_training(args.config, num_gpus)
elif args.strategy == "deepspeed":
result = launch_deepspeed_training(args.config, num_gpus)
if result.returncode == 0:
print("✅ Training completed successfully!")
else:
print("❌ Training failed!")
sys.exit(result.returncode)
if __name__ == "__main__":
main()