CS336-2025-lec6
2025年10月22日
15:05
Kernels,Triton
两个函数
benchmarking() # How long does it take? 检查总的执行时间
profiling() # Where time is being spent? 检查具体每一个步骤的执行时间
def benchmark(description: str, run: Callable, num_warmups: int = 1, num_trials: int = 3):
"""Benchmark `func` by running it `num_trials`, and return all the times."""
# Warmup: first times might be slower due to compilation, things not cached.
# Since we will run the kernel multiple times, the timing that matters is steady state.
for _ in range(num_warmups):
run()
if torch.cuda.is_available():
torch.cuda.synchronize() # Wait for CUDA threads to finish (important!)
# Time it for real now!
times: list[float] = [] # @inspect times, @inspect description
for trial in range(num_trials): # Do it multiple times to capture variance
start_time = time.time()
run() # Actually perform computation
if torch.cuda.is_available():
torch.cuda.synchronize() # Wait for CUDA threads to finish (important!)
end_time = time.time()
times.append((end_time - start_time) * 1000) # @inspect times
mean_time = mean(times) # @inspect mean_time
return mean_time
def profile(description: str, run: Callable, num_warmups: int = 1, with_stack: bool = False):
# Warmup
# 用pytorch内置的函数 torch.profiler来检查每个步骤执行的时间
for _ in range(num_warmups):
run()
if torch.cuda.is_available():
torch.cuda.synchronize() # Wait for CUDA threads to finish (important!)
# Run the code with the profiler
with torch.profiler.profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
# Output stack trace for visualization
with_stack=with_stack,
# Needed to export stack trace for visualization
experimental_config=torch._C._profiler._ExperimentalConfig(verbose=True)) as prof:
run()
if torch.cuda.is_available():
torch.cuda.synchronize() # Wait for CUDA threads to finish (important!)
# Print out table
table = prof.key_averages().table(sort_by="cuda_time_total",
max_name_column_width=80,
row_limit=10)
#text(f"## {description}")
#text(table, verbatim=True)
# Write stack trace visualization
if with_stack:
text_path = f"var/stacks_{description}.txt"
svg_path = f"var/stacks_{description}.svg"
prof.export_stacks(text_path, "self_cuda_time_total")
return table
执行profile的结果:
matmul(dim=128)
Name Self CPU % Self CPU CPU total % CPU total
CPU time avg Self CUDA Self CUDA % CUDA total
CUDA time avg # of Calls
aten::matmul 1.17% 4.912us 98.24% 413.723us 413.723us 0.000us 0.00% 4.992us 4.992us 1
aten::mm 42.40% 178.581us 97.07% 408.811us 408.811us 4.992us 100.00% 4.992us 4.992us 1
(cuda kernel名称:)sm80_xmma_gemm_f32f32_f32f32_f32_nn_n_tilesize32x32x8_stage3_warpsize1x2x1_ff... 0.00% 0.000us 0.00% 0.000us 0.000us 4.992us 100.00% 4.992us 4.992us 1
cudaFuncGetAttributes 0.96% 4.023us 0.96% 4.023us 4.023us 0.000us 0.00% 0.000us 0.000us 1
cudaLaunchKernelExC 53.71% 226.207us 53.71% 226.207us 226.207us 0.000us 0.00% 0.000us 0.000us 1
cudaDeviceSynchronize 1.76% 7.413us 1.76% 7.413us 3.707us 0.000us 0.00% 0.000us 0.000us 2
Self CPU time
total: 421.136usSelf CUDA time total: 4.992us
不同dim的矩阵乘法,调用的cuda kernel是不同的。
CPU不断发送kernel到gpu中,不需要等待gpu执行完,再发送下一个kernel。如果加入了print(loss)这种操作(或cuda.sychronize),cpu需要等待gpu执行完的结果,才能继续。
1. CPU 发送 Kernel 到 GPU 的默认行为:异步执行
在深度学习框架(如 PyTorch、TensorFlow)中,当 CPU 调用 GPU 操作(如矩阵乘法、卷积,即 “Kernel”)时,默认是异步的:
2. 同步操作(如print(loss)或cuda.synchronize())会强制等待 GPU 执行完成
当代码中出现需要CPU 获取 GPU 计算结果的操作时,CPU 必须等待 GPU 完成所有已提交的任务,才能获取结果并继续执行,这就是 “同步”:
来自 <https://www.doubao.com/chat/24863282932688386>
|
GPU |
整个工厂 |
拥有所有的生产资源。 |
|
SM |
一条生产线 |
一条生产线有多个工位(CUDA Cores)、一个共享的工具柜(Shared Memory)和一条传送带(Warp Scheduler)。 |
|
Block |
一个任务包 |
例如“组装100辆自行车”的任务包。这个包被完整地分配到一条生产线上,包内的工人可以互相传递零件(通过Shared Memory通信)。 |
|
Warp |
一组32个工人 |
这是最关键的比喻。这32个工人被绑在一起,必须同时做完全相同的动作。如果其中一个人需要做不同的动作(如装铃铛),其他31个人也得等着他,或者陪他一起做,导致效率低下(这就是分支发散)。 |
|
Thread |
单个工人 |
每个工人负责组装一辆完整的自行车(处理一个数据元素)。 |
|
Stream |
任务调度单 |
普通的Stream:一个调度单,上面的任务必须按顺序执行。 多个Stream:多个独立的调度单,可以同时下发给工厂,让不同的生产线同时处理不同的任务,极大提升工厂的吞吐量。 |
手动编写Gelu的cuda kernel
CUDA is an extension of C/C++ with APIs for managing GPUs.
Simplified picture: write f(i), CUDA kernel computes f(i) for all i.
Grid: collection of thread blocks: numBlocks = (2, 4), blockDim = (1, 8)
Thread block: collection of threads: blockIdx = (0, 1)
Thread: single unit of operation: threadIdx = (0, 3).
You write code that a thread execute, using (blockIdx, blockDim, threadIdx) to determine what to do.
Set CUDA_LAUNCH_BLOCKING so that if there are errors, CUDA will tell you what went wrong.
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
The load_inline function makes it convenient to write CUDA code and bind it to a Python module for immediate use.
# CUDA code: has the full logic
cuda_gelu_src = open("gelu.cu").read()
#include <math.h>#include <torch/extension.h>#include <c10/cuda/CUDAException.h>global void gelu_kernel(float* in, float* out, int num_elements) {
// Get the index into the tensor
int i = blockIdx.x * blockDim.x + threadIdx.x; #所以i表示 工人(thread)的索引,每个thread处理一个数,矩阵中的一个元素。这个kernel是逐元素的操作。
|
blockIdx.x |
组装100辆自行车的任务包ID |
✅ 完全正确。这是Block在Grid中的索引,标识是第几个任务包。 |
|
blockDim.x |
每个任务包中包含的thread数量 |
✅ 完全正确。在代码中这是1024,即每个Block有1024个线程。 |
|
threadIdx.x |
warp中的thread id |
⚠️ 需要修正:这是Block内的thread id,不是warp内的id。 |
全局工人编号 = 任务包ID × 每包工人数 + 包内工人ID
例如:
来自 <https://chat.deepseek.com/a/chat/s/94c85ecb-8e1d-4708-ba6b-0a00d1aa558d>
if (i < num_elements) { // To handle the case when n < numBlocks * blockDim
// Do the actual computation
out[i] = 0.5 * in[i] * (1.0 + tanh(0.79788456 * (in[i] + 0.044715 * in[i] * in[i] * in[i])));
}
}inline unsigned int cdiv(unsigned int a, unsigned int b) {
// Compute ceil(a / b)
return (a + b - 1) / b;
}torch::Tensor gelu(torch::Tensor x) {
TORCH_CHECK(x.device().is_cuda());
TORCH_CHECK(x.is_contiguous());
// Allocate empty tensor
torch::Tensor y = torch::empty_like(x);
// Determine grid (elements divided into blocks)
int num_elements = x.numel();
int block_size = 1024; // Number of threads
int num_blocks = cdiv(num_elements, block_size);
// Launch the kernel
gelu_kernel<<<num_blocks, block_size>>>(x.data_ptr<float>(), y.data_ptr<float>(), num_elements);
C10_CUDA_KERNEL_LAUNCH_CHECK(); // Catch errors immediately
return y;
}
# C++ code: defines the gelu function
cpp_gelu_src = "torch::Tensor gelu(torch::Tensor x);"
Compile the CUDA code and bind it to a Python module.
ensure_directory_exists("var/cuda_gelu")
if not torch.cuda.is_available():
return None
module = load_inline(
cuda_sources=[cuda_gelu_src],
cpp_sources=[cpp_gelu_src],
functions=["gelu"],
extra_cflags=["-O2"],
verbose=True,
name="inline_gelu",
build_directory="var/cuda_gelu",
)
cuda_gelu = getattr(module, "gelu")
return cuda_gelu
所以说,对于gelu函数,如果你不直接调用torch的官方函数
torch.nn.functional.gelu(x, approximate="tanh"),而是自己写一个函数
return 0.5 * x * (1 + torch.tanh(0.79788456 * (x + 0.044715 * x * x * x))),那么执行速度会由1ms变成8ms,原因就在于torch官方函数进行了kernel fusion,只执行了一个kernel,而自己定义的函数没有进行Kernel fusion,gpu执行时自定义函数时,需要执行多个kernel,所以速度慢。可以通过自己写kernel的方式,手动进行kernel fusion,达到和官方函数相近的执行速度。
✅ torch.compile可以自动为你进行优化,如kernel fusion。
Triton
Developed by OpenAI in 2021
https://openai.com/research/triton
Make GPU programming more accessible
@triton.jit
def triton_gelu_kernel(x_ptr, y_ptr, num_elements, BLOCK_SIZE: tl.constexpr):
# Input is at `x_ptr` and output is at `y_ptr`
# | Block 0 | Block 1 | ... |
# BLOCK_SIZE num_elements
pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE
# Indices where this thread block should operate
offsets = block_start + tl.arange(0, BLOCK_SIZE) #注意,triton这里是对整个block操作,而不是对单个thread操作,这里没有threadid,offsets是一个向量
Triton vs CUDA 的关键区别
|
特性 |
CUDA |
Triton |
|
编程粒度 |
Thread-level(线程级) |
Block-level(块级) |
|
索引方式 |
threadIdx.x(标量) |
tl.arange(0, BLOCK_SIZE)(向量) |
|
数据加载 |
每个线程加载1个元素 |
整个block一次性加载一批元素 |
|
操作单位 |
标量操作 |
向量化操作 |
# Handle boundary
mask = offsets < num_elements
# Read
x = tl.load(x_ptr + offsets, mask=mask)
# Approx gelu is 0.5 * x * (1 + tanh(sqrt(2/pi) * (x + 0.044715 * x^3)))
# Compute (tl.tanh doesn't exist, use tanh(a) = (exp(2a) - 1) / (exp(2a) + 1)
a = 0.79788456 * (x + 0.044715 * x * x * x)
exp = tl.exp(2 * a)
tanh = (exp - 1) / (exp + 1)
y = 0.5 * x * (1 + tanh)
# Store
tl.store(y_ptr + offsets, y, mask=mask)
已使用 OneNote 创建。