Skip to content

CUDA Streams, Events, and NCCL Fundamentals: The Foundation Layer of Multi-GPU Communication

Concurrency on GPU: parallel kernel execution with streams, fine-grained synchronization with events, NCCL collective operations (allreduce, broadcast, all-gather, reduce-scatter). The infrastructure layer of distributed training. Preparation for Module 17 (Distributed Training).

Şükrü Yusuf KAYA
55 min read
Advanced
CUDA Streams, Events ve NCCL Temelleri: Multi-GPU Communication'ın Alt Katmanı
🌊 GPU'da paralellik dans pisti
Bir tek GPU'da bile yüzlerce paralel kernel çalışıyor. Multi-GPU'da daha karmaşık: kernel'lar paralel ama communication da paralel — birbirleriyle dans ediyorlar. Bu ders altyapı katmanı: stream, event, NCCL. Modül 17 (FSDP, DeepSpeed) bu katmanın üzerine inşa ediliyor. 55 dakika sonra: `torch.distributed` neden böyle çalışıyor, allreduce nasıl hızlandırılıyor — production-grade understanding.

Ders Haritası#

  1. CUDA streams — paralel execution
  2. Default stream vs explicit streams
  3. CUDA events — fine-grained sync
  4. Async memcpy — overlap data transfer
  5. NCCL nedir, niye gerek?
  6. Collective operations: allreduce, broadcast, all-gather, reduce-scatter
  7. Ring allreduce algorithm
  8. Communication overlap with computation
  9. torch.distributed
    API
  10. Production patterns: gradient bucketing, NVLink optimization

1. CUDA Streams — Paralel Execution#

GPU'da kernel execution sequence: CUDA stream.

Default behavior#

PyTorch default'ta tek stream kullanır (stream 0). Kernel'lar sıralı execute olur:
Kernel A → Kernel B → Kernel C → ...

Multiple streams = parallelism#

Birden çok stream → kernel'lar paralel çalışabilir (GPU yeterli resource varsa).
import torch stream1 = torch.cuda.Stream() stream2 = torch.cuda.Stream() with torch.cuda.stream(stream1): a = torch.matmul(x1, y1) # stream1'de with torch.cuda.stream(stream2): b = torch.matmul(x2, y2) # stream2'de paralel torch.cuda.synchronize() # iki stream tamamlanana kadar bekle

Niye performans kazandırıyor?#

GPU'da multiple SM (Streaming Multiprocessor) var. Tek kernel tüm SM'leri doldurmuyorsa, paralel başka kernel çalışabilir.
Pratik etki: 5-30% throughput artışı (workload'a göre).

LLM'de uygulama#

  • Forward pass + previous gradient communication paralel
  • Different layer'lar farklı stream'lerde
  • Data loading + compute overlap (CPU→GPU async copy)
Modül 17'de FSDP bu pattern'i yoğun kullanıyor.

2. Default Stream vs Explicit Streams#

PyTorch'ta dikkat edilmesi gereken:

Default stream (stream 0)#

  • Sync per-default
  • Kernel A bitmeden Kernel B başlamaz
  • Pratik için simple ama performance bırakıyor

Explicit stream'ler#

torch.cuda.current_stream() # şu anki stream torch.cuda.set_stream(stream) # change current

Stream dependency#

Stream'ler arası dependency manuel kurulur:
event = torch.cuda.Event() with torch.cuda.stream(stream1): a = compute_something() event.record(stream1) # stream1'de bu noktayı işaretle with torch.cuda.stream(stream2): event.wait(stream2) # stream2 bekle, event'e ulaşana kadar b = use_a_result(a)

Common pattern: pipelining#

# Layer-wise pipelining for layer in model.layers: with torch.cuda.stream(compute_stream): output = layer(input) with torch.cuda.stream(comm_stream): # Sync gradient'ler async olarak all_reduce(gradient)

Sınırlar#

  • Memory stream'ler arası paylaşılıyor → race condition riski
  • Synchronization doğru yapılmazsa data corruption
  • Debug zor — stream'ler nondeterministic kompozisyon

3. CUDA Events — Fine-Grained Synchronization#

Event: GPU timeline'da bir nokta. Kernel'lardan bağımsız async signaling.
event_a = torch.cuda.Event() event_b = torch.cuda.Event() event_a.record() # şu anki point'i kaydet a = compute_something() # stream çalışıyor event_b.record() # işlem bitiş point'i # Wait event_b.synchronize() # CPU bu event'e kadar bekle

Timing#

Events precise timing için:
start = torch.cuda.Event(enable_timing=True) end = torch.cuda.Event(enable_timing=True) start.record() # ... workload ... end.record() torch.cuda.synchronize() ms = start.elapsed_time(end) print(f"Took {ms:.2f} ms")
time.perf_counter()
'dan daha doğru GPU operations için.

Cross-stream synchronization#

Events ile bir stream başka stream'i bekleyebilir:
producer_stream = torch.cuda.Stream() consumer_stream = torch.cuda.Stream() with torch.cuda.stream(producer_stream): a = produce_data() event = torch.cuda.Event() event.record(producer_stream) with torch.cuda.stream(consumer_stream): event.wait(consumer_stream) # producer bitene kadar bekle b = consume_data(a)

Use case'ler#

  1. Pipelining: stream'ler arası dependency
  2. Timing: precise kernel duration
  3. Async copy + compute overlap
  4. Multi-GPU sync: cross-device events

4. Async Memcpy — Data Transfer Overlap#

CPU↔GPU veri transferi async yapılabilir → compute ile overlap.

Sync copy (slow)#

data_cpu = torch.randn(1000, 1000) data_gpu = data_cpu.cuda() # sync: CPU bekler transfer bitsin result = compute(data_gpu)

Async copy (fast)#

import torch.cuda data_cpu = torch.randn(1000, 1000).pin_memory() # pinned memory data_gpu = torch.empty_like(data_cpu, device="cuda") stream = torch.cuda.Stream() with torch.cuda.stream(stream): data_gpu.copy_(data_cpu, non_blocking=True) # async # While copy is happening, CPU can do other work do_cpu_work() # Wait for copy stream.synchronize() result = compute(data_gpu)

Pinned memory neden?#

CPU memory pageable default. Async DMA için pinned olmalı.
.pin_memory()
veya
pin_memory=True
DataLoader'da.

LLM training'de#

  • Data loading: GPU eğitim yaparken sonraki batch yükleniyor
  • PyTorch DataLoader
    num_workers > 0
    +
    pin_memory=True
    standart
  • CPU offload: ZeRO Stage 3 optimizer states CPU'ya, async transfer

Pattern#

train_loader = DataLoader( dataset, batch_size=32, num_workers=4, # paralel data loading pin_memory=True, # pinned CPU memory ) for batch in train_loader: batch = batch.cuda(non_blocking=True) # async transfer output = model(batch) loss = criterion(output, target) loss.backward()

5. NCCL Nedir, Niye Gerek?#

NCCL = NVIDIA Collective Communications Library.

Problem#

Multi-GPU training'de GPU'lar veri paylaşmalı: gradient'leri sync etmek, weight broadcast vb. Naive yöntem:
Her GPU CPU'ya gönder → CPU sumla → CPU'dan GPU'lara dağıt
Çok yavaş: CPU bandwidth, PCIe latency, no GPU-direct.

NCCL çözüm#

GPU'lar doğrudan birbirleriyle haberleşir:
  • NVLink (within node): GPU-GPU direct, ~600 GB/s
  • InfiniBand / RDMA (across nodes): GPU memory direct over network
  • PCIe (fallback): GPU-CPU-GPU, slower

NCCL operations#

NCCL collective communication primitive'leri sunar:
  1. all_reduce: tüm GPU'ların değerini sumla + herkese dağıt
  2. broadcast: bir GPU'dan diğerlerine
  3. all_gather: her GPU'nun parçasını birleştir + herkese ver
  4. reduce_scatter: sumla + parçalara ayrılıp dağıt
  5. send/recv: point-to-point

NCCL vs alternatif#

  • NCCL: NVIDIA GPUs, en hızlı
  • MPI: CPU + GPU, daha generic, biraz yavaş
  • Gloo: CPU primary, multi-platform, yavaş
  • OpenSHMEM: scientific computing legacy
PyTorch distributed default: NCCL (NVIDIA) veya Gloo (CPU).

6. Collective Operations Detayda#

All-reduce#

Distributed training'in en sık op'u. Gradient sync için.
GPU 0: [1, 2, 3] GPU 1: [4, 5, 6] GPU 2: [7, 8, 9] GPU 3: [10, 11, 12] After all_reduce (sum): GPU 0: [22, 26, 30] GPU 1: [22, 26, 30] GPU 2: [22, 26, 30] GPU 3: [22, 26, 30]
Her GPU lokal gradient'i hesaplar, all_reduce ile global average alır, parameter update sync.

Broadcast#

Bir GPU'dan diğerlerine veri kopyalama. Initial weight'lerin master'dan dağıtımı.
GPU 0 (master): [1, 2, 3] GPU 1: [0, 0, 0] GPU 2: [0, 0, 0] After broadcast from GPU 0: GPU 0: [1, 2, 3] GPU 1: [1, 2, 3] GPU 2: [1, 2, 3]

All-gather#

Her GPU'nun shard'ını birleştir, bütününü herkese ver.
GPU 0: [1] GPU 1: [2] GPU 2: [3] GPU 3: [4] After all_gather: GPU 0: [1, 2, 3, 4] GPU 1: [1, 2, 3, 4] GPU 2: [1, 2, 3, 4] GPU 3: [1, 2, 3, 4]
FSDP'de forward pass için.

Reduce-scatter#

All-reduce'un tersi: sum ve scatter.
GPU 0: [1, 2, 3, 4] GPU 1: [5, 6, 7, 8] GPU 2: [9, 10, 11, 12] GPU 3: [13, 14, 15, 16] After reduce_scatter (sum): GPU 0: [28] (1+5+9+13) GPU 1: [32] (2+6+10+14) GPU 2: [36] GPU 3: [40]
FSDP'de backward gradient için.

Complexity#

OperationBytes communicatedTime
All-reduce2(N-1)/N × MO(M)
BroadcastMO(log N × M)
All-gather(N-1) × M / NO(M)
Reduce-scatter(N-1) × M / NO(M)
M: message size, N: number of GPUs.

7. Ring All-Reduce Algoritması#

NCCL'in en sık kullandığı algorithm. Multi-GPU gradient sync için optimal.

Naive (kötü)#

Master worker pattern:
Tüm GPU'lar → GPU 0 (master) → tüm GPU'lar
Master bandwidth bottleneck. Worse: O(N) latency.

Ring all-reduce#

GPU'lar ring topology'de organize:
GPU 0 ↔ GPU 1 ↔ GPU 2 ↔ GPU 3 ↔ GPU 0
İki phase:

Phase 1: scatter-reduce

Her GPU 1/N data alır komşudan, ekler kendi share'ine. N-1 step.

Phase 2: all-gather

Her GPU kendi reduced share'ini ring etrafında yayar. N-1 step.

Toplam communication#

2 × (N-1)/N × M bytes per GPU
N=8 GPU, M=100MB gradient: ~175MB per GPU. NVLink 600 GB/s → ~0.3 ms.

Optimal niye?#

  • Hiçbir GPU bottleneck değil — herkes paralel
  • Bandwidth optimal: 2(N-1)/N tüm ring algorithm'ler için tight bound
  • Scalable: N büyüdükçe latency artmıyor (constant)

Modern alternative: tree all-reduce#

Bazı topology'lerde (özellikle InfiniBand) tree daha hızlı. NCCL otomatik seçiyor.

NVIDIA InfiniBand SHARP#

InfiniBand network switch'lerinin kendisi reduce yapabiliyor (in-network reduction). En hızlı large-scale.

8. Communication Overlap with Computation#

Multi-GPU training'in kritik optimization'u.

Naive (slow)#

Step 1: forward pass (1 sec) Step 2: backward pass (2 sec) Step 3: all_reduce gradient (0.5 sec) Step 4: optimizer step (0.1 sec) Total: 3.6 sec per iteration

Overlap (fast)#

Forward + previous all_reduce paralel Backward + current all_reduce paralel (gradient ready oldukça) Total: 3 sec per iteration (~15% faster)

Implementation#

PyTorch DDP, FSDP otomatik handle ediyor: gradient hazır oldukça bucket'lar halinde allreduce başlatıyor.
import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP model = DDP(model, device_ids=[local_rank]) # DDP backward sırasında gradient'leri bucket'larla allreduce eder # Compute continues, comm happens

Gradient bucketing#

Tüm gradient'leri tek allreduce'a koymak yerine, chunks halinde:
Bucket 1 (25MB): allreduce start Bucket 2 (25MB): backward continuing Bucket 3 (25MB): backward continuing ...
Bucket bitince allreduce başlar, backward devam eder. Overlap maksimize.

Bucket size#

PyTorch DDP default: 25MB. Tuning gerekirse:
DDP(model, device_ids=[local_rank], bucket_cap_mb=50)
Büyük bucket: az NCCL call ama az overlap. Küçük: çok overlap ama overhead.

Profiling#

Nsight Systems ile NCCL ops + computation timeline → overlap görsel olarak. Optimal pattern: solid GPU utilization, gaps minimal.

9.
torch.distributed
API#

PyTorch'un multi-GPU/multi-node API'si. NCCL backend'i kullanır.

Initialization#

import torch.distributed as dist import os # Environment variables (torchrun otomatik set eder): # MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE, LOCAL_RANK dist.init_process_group(backend="nccl") local_rank = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(local_rank) # Cleanup dist.destroy_process_group()

Launch#

# Single node, 8 GPU torchrun --nproc_per_node=8 train.py # Multi-node (örn. 2 node × 8 GPU) torchrun --nnodes=2 --nproc_per_node=8 \ --rdzv_id=foo --rdzv_backend=c10d \ --rdzv_endpoint=master_addr:29500 \ train.py

Basic ops#

# All-reduce tensor = torch.tensor([1.0, 2.0]).cuda() dist.all_reduce(tensor, op=dist.ReduceOp.SUM) # Şimdi tüm rank'lar aynı sum'ı alır # Broadcast if dist.get_rank() == 0: tensor = torch.tensor([3.0, 4.0]).cuda() else: tensor = torch.empty(2).cuda() dist.broadcast(tensor, src=0) # Gather (all_gather) tensor = torch.tensor([dist.get_rank()]).cuda() gathered = [torch.empty(1).cuda() for _ in range(dist.get_world_size())] dist.all_gather(gathered, tensor)

DistributedSampler#

Each GPU different data shard:
from torch.utils.data.distributed import DistributedSampler sampler = DistributedSampler(dataset, shuffle=True) loader = DataLoader(dataset, batch_size=32, sampler=sampler) for epoch in range(epochs): sampler.set_epoch(epoch) # different shuffle per epoch for batch in loader: ...

Common patterns#

DDP, FSDP, ZeRO — hepsi torch.distributed üzerine inşa. Modül 17 detayda.

Pattern 1: Gradient bucketing#

Default PyTorch DDP bucketing yeterli — Llama 3 ölçeğine kadar. Custom tuning:
# Llama 3 8B for 8x A100 DDP(model, device_ids=[local_rank], bucket_cap_mb=50, gradient_as_bucket_view=True)
  • gradient_as_bucket_view=True
    : memory saving + faster
  • bucket_cap_mb=50
    : orta-büyük bucket (overlap dengeli)
8x A100/H100 node'ta NVLink topology'sini check et:
nvidia-smi topo -m
Çıktı:
GPU0 GPU1 GPU2 ... GPU0 X NV12 NV12 ... GPU1 NV12 X NV12 ... ...
NV12
= 12 NVLink connections (en hızlı).
PIX
= PCIe (yavaş). NCCL otomatik en hızlı yolu seçiyor ama topology'i bilmek debug için.

Pattern 3: Cross-node InfiniBand#

Multi-node training'de InfiniBand kullanılıyor mu kontrol et:
ibstat # InfiniBand status ibping # latency check nccl-tests # NCCL benchmark
InfiniBand 200-400 Gbps. Ethernet 10-100 Gbps. 10x difference.

Pattern 4: NCCL env tuning#

export NCCL_DEBUG=INFO # debug logs export NCCL_IB_DISABLE=0 # InfiniBand enable export NCCL_NET_GDR_LEVEL=PHB # GPU Direct RDMA optimization export NCCL_TREE_THRESHOLD=0 # tree algorithm always export NCCL_BUFFSIZE=8388608 # 8MB buffer
Cluster-specific tuning. NVIDIA dokümantasyonu detaylı.

Pattern 5: Topology-aware sharding#

FSDP/ZeRO'da, node-internal sharding NVLink kullanıyor (hızlı). Cross-node sharding InfiniBand (yavaş). Akıllı strategy: large tensor'ları node-internal shard, cross-node minimum communication. Modül 17 detayda.

11. Mini Egzersizler#

  1. Stream pattern: 2 GPU, 4 layer'lı model. Stream'lerle pipeline nasıl?
  2. All-reduce bandwidth: 8 GPU, 1GB gradient. Naive vs ring all-reduce communication?
  3. Communication overlap: 8 GPU, allreduce 0.5s, backward 1.5s. Overlap yoksa total? Overlap'le maksimum saving?
  4. NCCL debug: Multi-node training'de NCCL hata veriyor. Investigation adımları?
  5. NVLink vs PCIe: NVLink 600 GB/s, PCIe 64 GB/s. 1GB allreduce için fark?

Bu Derste Neler Öğrendik?#

CUDA streams — paralel kernel execution ✓ Default vs explicit streams — pattern'ler ✓ CUDA events — fine-grained sync + precise timing ✓ Async memcpy — data transfer overlap (pinned memory + non_blocking) ✓ NCCL — NVIDIA collective communication library ✓ Collective ops: all_reduce, broadcast, all_gather, reduce_scatter ✓ Ring all-reduce — optimal multi-GPU gradient sync ✓ Communication overlap with computation — production pattern ✓
torch.distributed
API — DDP, FSDP foundation ✓ NVLink, InfiniBand, GPU Direct — hardware layers

Sıradaki Ders#

5.5 — Triton ile Custom GPU Kernels: Softmax, Matmul, FlashAttention Mini NVIDIA CUDA C++ yerine Python syntax ile GPU kernel yazma. Triton'un sırrı: pedagogically pure, performans olarak hand-tuned CUDA'ya yakın. FlashAttention'ın mini-implementation'ı.

Frequently Asked Questions

**Automatic**. If you wrap with `DDP(model)`, gradient buckets all_reduce in background during backward. You just write normal training loop. No manual intervention in 95% of cases. Edge cases: (1) Custom autograd Functions with dist calls in backward can have issues. (2) Multiple loss → multiple backward can be problematic. Usually bypassed with `@torch.no_sync()` context in accumulate scenarios.

Yorumlar & Soru-Cevap

(0)
Yorum yazmak için giriş yap.
Yorumlar yükleniyor...

Related Content