İçeriğe geç

Multi-Node Run + Fault-Tolerant Training: 2 Node × 8 H100 NCCL Cluster

Cluster training'in gerçeği: node failure'lar olur, NCCL hang olur, checkpoint corrupted olabilir. Cookbook'un fault-tolerant reçetesi: NCCL_TIMEOUT, watchdog, signal handling (SIGUSR1), elastic launcher (torchrun --rdzv_backend=c10d), graceful preemption resume. 70B model 2 günlük training'in 'survival kit'i.

Şükrü Yusuf KAYA
32 dakikalık okuma
İleri
Multi-Node Run + Fault-Tolerant Training: 2 Node × 8 H100 NCCL Cluster

1. Cluster Failure Pattern'ları#

FailureBelirtiMitigation
Node OOM killTüm run hang olur, NCCL timeoutMemory headroom %20+
GPU thermal throttleThroughput aniden düşerCooling check, nvidia-smi loops
NCCL bandwidth degradationStep time 2-3x artarIB topology kontrol
Disk full (logs/ckpt)Save hataNFS quota monitor
Network partitionBazı rank'ler hangRetry/elastic
Power outageTüm cluster downCloud UPS / cold start
Spot preemption30 sn'lik notice + killGraceful checkpoint
Cookbook'un kuralı: Her 100 step'te checkpoint + graceful preemption handler + NCCL watchdog.
python
# === Fault-tolerant training framework ===
import os, signal, time
import torch
import torch.distributed as dist
from pathlib import Path
 
class GracefulPreemption:
def __init__(self):
self.requested = False
signal.signal(signal.SIGUSR1, self._handler) # Slurm preempt
signal.signal(signal.SIGTERM, self._handler) # Docker/RunPod kill
signal.signal(signal.SIGINT, self._handler) # Ctrl+C
 
def _handler(self, signum, frame):
print(f"[preempt] signal {signum} received, flushing")
self.requested = True
 
class NCCLWatchdog:
"""NCCL'i monitor et — eğer 10 dk inactive ise abort."""
def __init__(self, threshold_min=10):
self.last_step = time.time()
self.threshold = threshold_min * 60
self.thread = None
 
def heartbeat(self):
self.last_step = time.time()
 
def check(self):
if time.time() - self.last_step > self.threshold:
print(f"[watchdog] NCCL inactive {self.threshold/60} min — aborting")
os._exit(1) # force exit, all ranks killed
 
# Setup
preempt = GracefulPreemption()
watchdog = NCCLWatchdog(threshold_min=10)
 
# NCCL config
os.environ["NCCL_TIMEOUT"] = "3600" # 1 hour timeout (default 30 min)
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
os.environ["TORCH_NCCL_BLOCKING_WAIT"] = "1"
 
# Training loop with fault-tolerance
ckpt_dir = Path("ckpt")
last_ckpt = ckpt_dir / "latest"
 
if last_ckpt.exists():
print(f"[resume] loading from {last_ckpt}")
state = torch.load(last_ckpt / "state.pt", map_location="cpu")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optim"])
start_step = state["step"]
else:
start_step = 0
 
for step in range(start_step, total_steps):
try:
batch = next(loader_iter)
loss = train_step(model, batch, optimizer)
watchdog.heartbeat()
 
if step % 100 == 0 or preempt.requested:
# Atomic checkpoint
staging = ckpt_dir / f"step-{step:08d}.partial"
staging.mkdir(parents=True, exist_ok=True)
 
if dist.get_rank() == 0:
torch.save({
"step": step,
"model": model.state_dict(),
"optim": optimizer.state_dict(),
}, staging / "state.pt")
 
dist.barrier()
if dist.get_rank() == 0:
final = ckpt_dir / f"step-{step:08d}"
if final.exists():
import shutil; shutil.rmtree(final)
os.rename(staging, final)
# Update latest symlink
if last_ckpt.exists() or last_ckpt.is_symlink():
last_ckpt.unlink()
last_ckpt.symlink_to(final.relative_to(ckpt_dir))
 
dist.barrier()
 
if preempt.requested:
print(f"[preempt] saved at step {step}, exiting cleanly")
break
 
except Exception as e:
print(f"[error] step {step}: {e}")
# Cluster-wide abort + Slurm requeue
os._exit(1)
fault-tolerant training framework — preempt + watchdog + atomic ckpt
✅ Part IV tamamlandı
  1. Yukarıdaki framework'ü kendi train.py'ına entegre et. 2) SIGUSR1 manuel simulate ederek graceful save test. 3) Sonraki Part: Part V — MoE Internals & Fine-Tuning. Mixtral, DeepSeek-V3, Qwen3-MoE.

Yorumlar & Soru-Cevap

(0)
Yorum yazmak için giriş yap.
Yorumlar yükleniyor...

İlgili İçerikler