CUDA Streams, Events, and NCCL Fundamentals: The Foundation Layer of Multi-GPU Communication
Concurrency on GPU: parallel kernel execution with streams, fine-grained synchronization with events, NCCL collective operations (allreduce, broadcast, all-gather, reduce-scatter). The infrastructure layer of distributed training. Preparation for Module 17 (Distributed Training).
Şükrü Yusuf KAYA
55 min read
Advanced🌊 GPU'da paralellik dans pisti
Bir tek GPU'da bile yüzlerce paralel kernel çalışıyor. Multi-GPU'da daha karmaşık: kernel'lar paralel ama communication da paralel — birbirleriyle dans ediyorlar. Bu ders altyapı katmanı: stream, event, NCCL. Modül 17 (FSDP, DeepSpeed) bu katmanın üzerine inşa ediliyor. 55 dakika sonra: `torch.distributed` neden böyle çalışıyor, allreduce nasıl hızlandırılıyor — production-grade understanding.
Ders Haritası#
- CUDA streams — paralel execution
- Default stream vs explicit streams
- CUDA events — fine-grained sync
- Async memcpy — overlap data transfer
- NCCL nedir, niye gerek?
- Collective operations: allreduce, broadcast, all-gather, reduce-scatter
- Ring allreduce algorithm
- Communication overlap with computation
- API
torch.distributed - Production patterns: gradient bucketing, NVLink optimization
1. CUDA Streams — Paralel Execution#
GPU'da kernel execution sequence: CUDA stream.
Default behavior#
PyTorch default'ta tek stream kullanır (stream 0). Kernel'lar sıralı execute olur:
Kernel A → Kernel B → Kernel C → ...
Multiple streams = parallelism#
Birden çok stream → kernel'lar paralel çalışabilir (GPU yeterli resource varsa).
import torch stream1 = torch.cuda.Stream() stream2 = torch.cuda.Stream() with torch.cuda.stream(stream1): a = torch.matmul(x1, y1) # stream1'de with torch.cuda.stream(stream2): b = torch.matmul(x2, y2) # stream2'de paralel torch.cuda.synchronize() # iki stream tamamlanana kadar bekle
Niye performans kazandırıyor?#
GPU'da multiple SM (Streaming Multiprocessor) var. Tek kernel tüm SM'leri doldurmuyorsa, paralel başka kernel çalışabilir.
Pratik etki: 5-30% throughput artışı (workload'a göre).
LLM'de uygulama#
- Forward pass + previous gradient communication paralel
- Different layer'lar farklı stream'lerde
- Data loading + compute overlap (CPU→GPU async copy)
Modül 17'de FSDP bu pattern'i yoğun kullanıyor.
2. Default Stream vs Explicit Streams#
PyTorch'ta dikkat edilmesi gereken:
Default stream (stream 0)#
- Sync per-default
- Kernel A bitmeden Kernel B başlamaz
- Pratik için simple ama performance bırakıyor
Explicit stream'ler#
torch.cuda.current_stream() # şu anki stream torch.cuda.set_stream(stream) # change current
Stream dependency#
Stream'ler arası dependency manuel kurulur:
event = torch.cuda.Event() with torch.cuda.stream(stream1): a = compute_something() event.record(stream1) # stream1'de bu noktayı işaretle with torch.cuda.stream(stream2): event.wait(stream2) # stream2 bekle, event'e ulaşana kadar b = use_a_result(a)
Common pattern: pipelining#
# Layer-wise pipelining for layer in model.layers: with torch.cuda.stream(compute_stream): output = layer(input) with torch.cuda.stream(comm_stream): # Sync gradient'ler async olarak all_reduce(gradient)
Sınırlar#
- Memory stream'ler arası paylaşılıyor → race condition riski
- Synchronization doğru yapılmazsa data corruption
- Debug zor — stream'ler nondeterministic kompozisyon
3. CUDA Events — Fine-Grained Synchronization#
Event: GPU timeline'da bir nokta. Kernel'lardan bağımsız async signaling.
event_a = torch.cuda.Event() event_b = torch.cuda.Event() event_a.record() # şu anki point'i kaydet a = compute_something() # stream çalışıyor event_b.record() # işlem bitiş point'i # Wait event_b.synchronize() # CPU bu event'e kadar bekle
Timing#
Events precise timing için:
start = torch.cuda.Event(enable_timing=True) end = torch.cuda.Event(enable_timing=True) start.record() # ... workload ... end.record() torch.cuda.synchronize() ms = start.elapsed_time(end) print(f"Took {ms:.2f} ms")
time.perf_counter()Cross-stream synchronization#
Events ile bir stream başka stream'i bekleyebilir:
producer_stream = torch.cuda.Stream() consumer_stream = torch.cuda.Stream() with torch.cuda.stream(producer_stream): a = produce_data() event = torch.cuda.Event() event.record(producer_stream) with torch.cuda.stream(consumer_stream): event.wait(consumer_stream) # producer bitene kadar bekle b = consume_data(a)
Use case'ler#
- Pipelining: stream'ler arası dependency
- Timing: precise kernel duration
- Async copy + compute overlap
- Multi-GPU sync: cross-device events
4. Async Memcpy — Data Transfer Overlap#
CPU↔GPU veri transferi async yapılabilir → compute ile overlap.
Sync copy (slow)#
data_cpu = torch.randn(1000, 1000) data_gpu = data_cpu.cuda() # sync: CPU bekler transfer bitsin result = compute(data_gpu)
Async copy (fast)#
import torch.cuda data_cpu = torch.randn(1000, 1000).pin_memory() # pinned memory data_gpu = torch.empty_like(data_cpu, device="cuda") stream = torch.cuda.Stream() with torch.cuda.stream(stream): data_gpu.copy_(data_cpu, non_blocking=True) # async # While copy is happening, CPU can do other work do_cpu_work() # Wait for copy stream.synchronize() result = compute(data_gpu)
Pinned memory neden?#
CPU memory pageable default. Async DMA için pinned olmalı. veya DataLoader'da.
.pin_memory()pin_memory=TrueLLM training'de#
- Data loading: GPU eğitim yaparken sonraki batch yükleniyor
- PyTorch DataLoader +
num_workers > 0standartpin_memory=True - CPU offload: ZeRO Stage 3 optimizer states CPU'ya, async transfer
Pattern#
train_loader = DataLoader( dataset, batch_size=32, num_workers=4, # paralel data loading pin_memory=True, # pinned CPU memory ) for batch in train_loader: batch = batch.cuda(non_blocking=True) # async transfer output = model(batch) loss = criterion(output, target) loss.backward()
5. NCCL Nedir, Niye Gerek?#
NCCL = NVIDIA Collective Communications Library.
Problem#
Multi-GPU training'de GPU'lar veri paylaşmalı: gradient'leri sync etmek, weight broadcast vb. Naive yöntem:
Her GPU CPU'ya gönder → CPU sumla → CPU'dan GPU'lara dağıt
Çok yavaş: CPU bandwidth, PCIe latency, no GPU-direct.
NCCL çözüm#
GPU'lar doğrudan birbirleriyle haberleşir:
- NVLink (within node): GPU-GPU direct, ~600 GB/s
- InfiniBand / RDMA (across nodes): GPU memory direct over network
- PCIe (fallback): GPU-CPU-GPU, slower
NCCL operations#
NCCL collective communication primitive'leri sunar:
- all_reduce: tüm GPU'ların değerini sumla + herkese dağıt
- broadcast: bir GPU'dan diğerlerine
- all_gather: her GPU'nun parçasını birleştir + herkese ver
- reduce_scatter: sumla + parçalara ayrılıp dağıt
- send/recv: point-to-point
NCCL vs alternatif#
- NCCL: NVIDIA GPUs, en hızlı
- MPI: CPU + GPU, daha generic, biraz yavaş
- Gloo: CPU primary, multi-platform, yavaş
- OpenSHMEM: scientific computing legacy
PyTorch distributed default: NCCL (NVIDIA) veya Gloo (CPU).
6. Collective Operations Detayda#
All-reduce#
Distributed training'in en sık op'u. Gradient sync için.
GPU 0: [1, 2, 3] GPU 1: [4, 5, 6] GPU 2: [7, 8, 9] GPU 3: [10, 11, 12] After all_reduce (sum): GPU 0: [22, 26, 30] GPU 1: [22, 26, 30] GPU 2: [22, 26, 30] GPU 3: [22, 26, 30]
Her GPU lokal gradient'i hesaplar, all_reduce ile global average alır, parameter update sync.
Broadcast#
Bir GPU'dan diğerlerine veri kopyalama. Initial weight'lerin master'dan dağıtımı.
GPU 0 (master): [1, 2, 3] GPU 1: [0, 0, 0] GPU 2: [0, 0, 0] After broadcast from GPU 0: GPU 0: [1, 2, 3] GPU 1: [1, 2, 3] GPU 2: [1, 2, 3]
All-gather#
Her GPU'nun shard'ını birleştir, bütününü herkese ver.
GPU 0: [1] GPU 1: [2] GPU 2: [3] GPU 3: [4] After all_gather: GPU 0: [1, 2, 3, 4] GPU 1: [1, 2, 3, 4] GPU 2: [1, 2, 3, 4] GPU 3: [1, 2, 3, 4]
FSDP'de forward pass için.
Reduce-scatter#
All-reduce'un tersi: sum ve scatter.
GPU 0: [1, 2, 3, 4] GPU 1: [5, 6, 7, 8] GPU 2: [9, 10, 11, 12] GPU 3: [13, 14, 15, 16] After reduce_scatter (sum): GPU 0: [28] (1+5+9+13) GPU 1: [32] (2+6+10+14) GPU 2: [36] GPU 3: [40]
FSDP'de backward gradient için.
Complexity#
| Operation | Bytes communicated | Time |
|---|---|---|
| All-reduce | 2(N-1)/N × M | O(M) |
| Broadcast | M | O(log N × M) |
| All-gather | (N-1) × M / N | O(M) |
| Reduce-scatter | (N-1) × M / N | O(M) |
M: message size, N: number of GPUs.
7. Ring All-Reduce Algoritması#
NCCL'in en sık kullandığı algorithm. Multi-GPU gradient sync için optimal.
Naive (kötü)#
Master worker pattern:
Tüm GPU'lar → GPU 0 (master) → tüm GPU'lar
Master bandwidth bottleneck. Worse: O(N) latency.
Ring all-reduce#
GPU'lar ring topology'de organize:
GPU 0 ↔ GPU 1 ↔ GPU 2 ↔ GPU 3 ↔ GPU 0
İki phase:
Phase 1: scatter-reduce
Her GPU 1/N data alır komşudan, ekler kendi share'ine.
N-1 step.
Phase 2: all-gather
Her GPU kendi reduced share'ini ring etrafında yayar.
N-1 step.
Toplam communication#
2 × (N-1)/N × M bytes per GPU
N=8 GPU, M=100MB gradient: ~175MB per GPU. NVLink 600 GB/s → ~0.3 ms.
Optimal niye?#
- Hiçbir GPU bottleneck değil — herkes paralel
- Bandwidth optimal: 2(N-1)/N tüm ring algorithm'ler için tight bound
- Scalable: N büyüdükçe latency artmıyor (constant)
Modern alternative: tree all-reduce#
Bazı topology'lerde (özellikle InfiniBand) tree daha hızlı. NCCL otomatik seçiyor.
NVIDIA InfiniBand SHARP#
InfiniBand network switch'lerinin kendisi reduce yapabiliyor (in-network reduction). En hızlı large-scale.
8. Communication Overlap with Computation#
Multi-GPU training'in kritik optimization'u.
Naive (slow)#
Step 1: forward pass (1 sec) Step 2: backward pass (2 sec) Step 3: all_reduce gradient (0.5 sec) Step 4: optimizer step (0.1 sec) Total: 3.6 sec per iteration
Overlap (fast)#
Forward + previous all_reduce paralel Backward + current all_reduce paralel (gradient ready oldukça) Total: 3 sec per iteration (~15% faster)
Implementation#
PyTorch DDP, FSDP otomatik handle ediyor: gradient hazır oldukça bucket'lar halinde allreduce başlatıyor.
import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP model = DDP(model, device_ids=[local_rank]) # DDP backward sırasında gradient'leri bucket'larla allreduce eder # Compute continues, comm happens
Gradient bucketing#
Tüm gradient'leri tek allreduce'a koymak yerine, chunks halinde:
Bucket 1 (25MB): allreduce start Bucket 2 (25MB): backward continuing Bucket 3 (25MB): backward continuing ...
Bucket bitince allreduce başlar, backward devam eder. Overlap maksimize.
Bucket size#
PyTorch DDP default: 25MB. Tuning gerekirse:
DDP(model, device_ids=[local_rank], bucket_cap_mb=50)
Büyük bucket: az NCCL call ama az overlap. Küçük: çok overlap ama overhead.
Profiling#
Nsight Systems ile NCCL ops + computation timeline → overlap görsel olarak. Optimal pattern: solid GPU utilization, gaps minimal.
9. torch.distributed API#
torch.distributedPyTorch'un multi-GPU/multi-node API'si. NCCL backend'i kullanır.
Initialization#
import torch.distributed as dist import os # Environment variables (torchrun otomatik set eder): # MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE, LOCAL_RANK dist.init_process_group(backend="nccl") local_rank = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(local_rank) # Cleanup dist.destroy_process_group()
Launch#
# Single node, 8 GPU torchrun --nproc_per_node=8 train.py # Multi-node (örn. 2 node × 8 GPU) torchrun --nnodes=2 --nproc_per_node=8 \ --rdzv_id=foo --rdzv_backend=c10d \ --rdzv_endpoint=master_addr:29500 \ train.py
Basic ops#
# All-reduce tensor = torch.tensor([1.0, 2.0]).cuda() dist.all_reduce(tensor, op=dist.ReduceOp.SUM) # Şimdi tüm rank'lar aynı sum'ı alır # Broadcast if dist.get_rank() == 0: tensor = torch.tensor([3.0, 4.0]).cuda() else: tensor = torch.empty(2).cuda() dist.broadcast(tensor, src=0) # Gather (all_gather) tensor = torch.tensor([dist.get_rank()]).cuda() gathered = [torch.empty(1).cuda() for _ in range(dist.get_world_size())] dist.all_gather(gathered, tensor)
DistributedSampler#
Each GPU different data shard:
from torch.utils.data.distributed import DistributedSampler sampler = DistributedSampler(dataset, shuffle=True) loader = DataLoader(dataset, batch_size=32, sampler=sampler) for epoch in range(epochs): sampler.set_epoch(epoch) # different shuffle per epoch for batch in loader: ...
Common patterns#
DDP, FSDP, ZeRO — hepsi torch.distributed üzerine inşa. Modül 17 detayda.
10. Production Patterns: Gradient Bucketing, NVLink Optimization#
Pattern 1: Gradient bucketing#
Default PyTorch DDP bucketing yeterli — Llama 3 ölçeğine kadar. Custom tuning:
# Llama 3 8B for 8x A100 DDP(model, device_ids=[local_rank], bucket_cap_mb=50, gradient_as_bucket_view=True)
- : memory saving + faster
gradient_as_bucket_view=True - : orta-büyük bucket (overlap dengeli)
bucket_cap_mb=50
Pattern 2: NVLink topology awareness#
8x A100/H100 node'ta NVLink topology'sini check et:
nvidia-smi topo -m
Çıktı:
GPU0 GPU1 GPU2 ... GPU0 X NV12 NV12 ... GPU1 NV12 X NV12 ... ...
NV12PIXPattern 3: Cross-node InfiniBand#
Multi-node training'de InfiniBand kullanılıyor mu kontrol et:
ibstat # InfiniBand status ibping # latency check nccl-tests # NCCL benchmark
InfiniBand 200-400 Gbps. Ethernet 10-100 Gbps. 10x difference.
Pattern 4: NCCL env tuning#
export NCCL_DEBUG=INFO # debug logs export NCCL_IB_DISABLE=0 # InfiniBand enable export NCCL_NET_GDR_LEVEL=PHB # GPU Direct RDMA optimization export NCCL_TREE_THRESHOLD=0 # tree algorithm always export NCCL_BUFFSIZE=8388608 # 8MB buffer
Cluster-specific tuning. NVIDIA dokümantasyonu detaylı.
Pattern 5: Topology-aware sharding#
FSDP/ZeRO'da, node-internal sharding NVLink kullanıyor (hızlı). Cross-node sharding InfiniBand (yavaş). Akıllı strategy: large tensor'ları node-internal shard, cross-node minimum communication. Modül 17 detayda.
11. Mini Egzersizler#
-
Stream pattern: 2 GPU, 4 layer'lı model. Stream'lerle pipeline nasıl?
-
All-reduce bandwidth: 8 GPU, 1GB gradient. Naive vs ring all-reduce communication?
-
Communication overlap: 8 GPU, allreduce 0.5s, backward 1.5s. Overlap yoksa total? Overlap'le maksimum saving?
-
NCCL debug: Multi-node training'de NCCL hata veriyor. Investigation adımları?
-
NVLink vs PCIe: NVLink 600 GB/s, PCIe 64 GB/s. 1GB allreduce için fark?
Bu Derste Neler Öğrendik?#
✓ CUDA streams — paralel kernel execution
✓ Default vs explicit streams — pattern'ler
✓ CUDA events — fine-grained sync + precise timing
✓ Async memcpy — data transfer overlap (pinned memory + non_blocking)
✓ NCCL — NVIDIA collective communication library
✓ Collective ops: all_reduce, broadcast, all_gather, reduce_scatter
✓ Ring all-reduce — optimal multi-GPU gradient sync
✓ Communication overlap with computation — production pattern
✓ API — DDP, FSDP foundation
✓ NVLink, InfiniBand, GPU Direct — hardware layers
torch.distributedSıradaki Ders#
5.5 — Triton ile Custom GPU Kernels: Softmax, Matmul, FlashAttention Mini
NVIDIA CUDA C++ yerine Python syntax ile GPU kernel yazma. Triton'un sırrı: pedagogically pure, performans olarak hand-tuned CUDA'ya yakın. FlashAttention'ın mini-implementation'ı.
Frequently Asked Questions
**Automatic**. If you wrap with `DDP(model)`, gradient buckets all_reduce in background during backward. You just write normal training loop. No manual intervention in 95% of cases. Edge cases: (1) Custom autograd Functions with dist calls in backward can have issues. (2) Multiple loss → multiple backward can be problematic. Usually bypassed with `@torch.no_sync()` context in accumulate scenarios.
Yorumlar & Soru-Cevap
(0)Yorum yazmak için giriş yap.
Yorumlar yükleniyor...
Related Content
Module 0: Course Framework & Workshop Setup
Who Is an LLM Engineer? The AI Engineering Career Ladder from Junior to Staff
Start LearningModule 0: Course Framework & Workshop Setup
Course Philosophy: Why This Path, Why This Order — The Skeleton of an 8-Month Curriculum
Start LearningModule 0: Course Framework & Workshop Setup