vllm源码解析(五):LLM模型推理
self.model_executor.execute_model,调用与vllm耦合后的LLM模型进行推理。这是本篇要讲解内容,我们先来看下模型输入长什么样,execute_model_req:从调度系统中获得,可以用于做推理的seq_groups, 对seq_groups及可用到的各种属性做了封装,暂时不必管都是什么意思,用到时再现场分析。8.1中完成了资源调度工作,接下来该送入初始化好的模型
八 模型推理细节探索
8.1 回顾下step的流程
def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
# 多GPU并行推理时走AsyncLLMEngine分支。如果进入当前LLMEngine,性能会下降,这里会抛出异常。
if self.parallel_config.pipeline_parallel_size > 1:
raise NotImplementedError(
"Pipeline parallelism is only supported through AsyncLLMEngine "
"as performance will be severely degraded otherwise.")
# 上述if判断表明,只有一个GPU可用。因此self.scheduler也只有一个元素,是当前GPU的调度
# 该函数调用改变调度的内部状态(self.running、self.swapped 和 self.waiting)
seq_group_metadata_list, scheduler_outputs = self.scheduler[0].schedule()
if not scheduler_outputs.is_empty():
finished_requests_ids = self.scheduler[0].get_and_reset_finished_requests_ids()
execute_model_req = ExecuteModelRequest(...)
output = self.model_executor.execute_model(execute_model_req=execute_model_req)
else:
output = []
request_outputs = self._process_model_outputs(
output, scheduler_outputs.scheduled_seq_groups,
scheduler_outputs.ignored_seq_groups, seq_group_metadata_list)
...
return request_outputs
self.model_executor.execute_model,调用与vllm耦合后的LLM模型进行推理。这是本篇要讲解内容,我们先来看下模型输入长什么样,
execute_model_req:
从调度系统中获得,可以用于做推理的seq_groups, 对seq_groups及可用到的各种属性做了封装,暂时不必管都是什么意思,用到时再现场分析。
8.2 如何使用具体模型
8.1中完成了资源调度工作,接下来该送入初始化好的模型进行推理了。不过vllm对具体模型的又做了多层封装:
8.1中模型调用指向gpu_executor:
- vllm_module/executor/gpu_executor.py
def execute_model(
self, execute_model_req: ExecuteModelRequest
) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
output = self.driver_worker.execute_model(execute_model_req)
return output
self.driver_worker.execute_model指向work_base实例的方法, 这个execute_model方法主要对输入数据进行预处理:
- vllm/worker/worker_base.py class LocalOrDistributedWorkerBase(WorkerBase)
def execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> Optional[List[SamplerOutput]]:
"""Executes at least one model step on the given sequences, unless no
sequences are provided."""
if self.is_driver_worker:
...
model_input: ModelRunnerInputBase = (
self.model_runner.prepare_model_input(
execute_model_req.seq_group_metadata_list,
execute_model_req.virtual_engine,
execute_model_req.finished_requests_ids))
num_steps = execute_model_req.num_steps
...
else:
...
self.execute_worker(worker_input)
...
output = self.model_runner.execute_model(
model_input, self.kv_cache[worker_input.virtual_engine]
if self.kv_cache is not None else None, intermediate_tensors, num_steps)
...
# output is List[SamplerOutput]
return output
8.2 input_ids预处理与block槽位填充
self.model_runner.prepare_model_input主要功能是合并input,本次共传入3条数据,但在输入模型前,vllm把它们的token全部合在一起了。它们的位置关系通过position区分,这部分代码比较简单,不再贴出了(代码多次跳转后,在vllm_module/worker/model_runner.py def build(…)中完成)
input_ids.shape=[num_tokens, ] 假如输入的3条prompt长度分别为48,44,43,那么num_tokens=135
但是transformers中的推理模式输入shape为[batch_size, num_tokens], vllm 为什么要这样处理呢?
我认为目的是为了避免seq的pad步骤, 因为transformers的推理格式需要对seq做pad,处理为同一长度才能进行batch推理。vllm合并后相当于每个token就是一个batch,不需要再做pad和去pad操作(input_ids做embedding后才做推理,这时的shape为[num_tokens, embed_size],此时num_tokens成为形式上的batchsize)。
input_ids合并后计算结果与transformers是一样的,因为线性变换是逐元素进行的(只是shape有所不同)。
vllm与transformers对输入input的处理方式不同,对应的模型结构也要改变,在第四篇文章,在load_weight中有对hf模型是如何转换到vllm规格的有详细描述。
在进行推理前,我们还需要把准备prefill的prompt的每个token(就是上面的input_ids, 这时还没做embedding操作)映射到block中,如seq_id=0的prompt长度为48,由于block_size=16, 所以他刚好能填充3个block(编号为2759,2758,2757)。映射关系会写入到slot_mapping列表中,那么这个操作如何来做呢?
- vllm/attention/backends/utils.py
经过多次跳转后(头都绕晕了),槽位填充的核心代码如下,代码比较简单,就是给标记已使用的block对应的槽位,在全局blocks中的索引号。以索引号2759的block来说,它的第一个槽位号是275916=44144,对应着该prompt(或者说decode阶段的1个待输出的token) 的第一个token。,这么做的目的是为以后把计算好的kv值直接填入这些槽位,起到索引作用。
最后slot_mapping填充效果如下面有图所示,第一个prompt有48个token,刚好能填满3个block,那么decode阶段,该seq生成第一个token时就要申请一个新的block了。第三个prompt有43个token,填不满3个block,它的最后一个block使用了11个槽位(43-162),即从44016到44026。
8.3 模型推理
8.2节 中self.model_runner.execute_model指向如下代码,这段的代码的主要功能是分配推理模型model_executable,
- vllm/worker/model_runner.py
model_input:
def execute_model(
self,
model_input: ModelInputForGPUWithSamplingMetadata,
kv_caches: List[torch.Tensor],
intermediate_tensors: Optional[IntermediateTensors] = None,
num_steps: int = 1,
) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]:
...
# Currently cuda graph is only supported by the decode phase.
assert model_input.attn_metadata is not None
prefill_meta = model_input.attn_metadata.prefill_metadata
decode_meta = model_input.attn_metadata.decode_metadata
# TODO(andoorve): We can remove this once all
# virtual engines share the same kv cache.
virtual_engine = model_input.virtual_engine
if prefill_meta is None and decode_meta.use_cuda_graph:
assert model_input.input_tokens is not None
graph_batch_size = model_input.input_tokens.shape[0]
model_executable = self.graph_runners[virtual_engine][graph_batch_size]
else:
model_executable = self.model
...
hidden_or_intermediate_states = model_executable(
input_ids=model_input.input_tokens,
positions=model_input.input_positions,
kv_caches=kv_caches,
attn_metadata=model_input.attn_metadata,
intermediate_tensors=intermediate_tensors,
**MultiModalInputs.as_kwargs(multi_modal_kwargs, device=self.device),
**seqlen_agnostic_kwargs)
...
还记得第四篇文章的get_model的操作吗,也是在model_runner.py中完成的,所以这里的self.model之前初始化过的模型。
我们使用第四篇文章用过的llama3.1来剖析剩余代码,model_executable最终执行llama模型的forward代码。
- vllm/model_executor/models/llama.py class LlamaForCausalLM
llama结构类型的大模型的推理,可分为两个阶段:prompt和generate, 在使用kv-cache的情况下,二者的区别仅是输入数据维度的差异,即generate阶段seq序列长度始终为1, 不过在vllm中却有不一样的处理,prefill之后,会把模型构建为cuda计算图,这样计算会更加高效。
经过漫长的准备工作,终于可以开始具体的推理工作,为了这个时刻,整整铺垫了四篇文章!
vllm最终调用的模型推理代码:
- vllm/model_executor/models/llama.py class LlamaModel
8.31 第一次推理(prefill)
又称为预填充
输入参数:
def forward(
self,
input_ids: Optional[torch.Tensor],
positions: torch.Tensor,
kv_caches: List[torch.Tensor],
attn_metadata: AttentionMetadata,
intermediate_tensors: Optional[IntermediateTensors],
inputs_embeds: Optional[torch.Tensor] = None,
) -> Union[torch.Tensor, IntermediateTensors]:
if get_pp_group().is_first_rank:
if inputs_embeds is not None:
hidden_states = inputs_embeds
else:
# 输入的通常都是未embedding的token,在这里进行词嵌入
hidden_states = self.get_input_embeddings(input_ids)
residual = None
else:
assert intermediate_tensors is not None
hidden_states = intermediate_tensors["hidden_states"]
residual = intermediate_tensors["residual"]
for i in range(self.start_layer, self.end_layer):
layer = self.layers[i]
hidden_states, residual = layer(
positions, # shape=[num_tokens,]
hidden_states, # shape=[num_tokens,embed_size]
kv_caches[i - self.start_layer], # 当前layer对应的kv-cache
attn_metadata, # 保存着slot_mapping, 通过这个map向kv-cache中填值
residual,
)
...
return hidden_states
计算模块注意发生在layer层的attention部分:
- vllm_module/model_executor/models/llama.py class LlamaAttention
def forward(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
kv_cache: torch.Tensor,
attn_metadata: AttentionMetadata,
) -> torch.Tensor:
qkv, _ = self.qkv_proj(hidden_states)
q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
q, k = self.rotary_emb(positions, q, k)
attn_output = self.attn(q, k, v, kv_cache, attn_metadata)
output, _ = self.o_proj(attn_output)
return output
计算过程中产生变量:
k,v的shape为[135,1024], q的shape为[135,4096], 说明使用了GQA技术,即4个q共享一个kv
接下来我们看下最重要的self.attn(…)的计算模块:
- vllm_module/attention/backends/flash_attn.py class FlashAttentionImpl(AttentionImpl)
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
kv_cache: torch.Tensor,
attn_metadata: FlashAttentionMetadata,
k_scale: float = 1.0,
v_scale: float = 1.0,
attn_type: AttentionType = AttentionType.DECODER,
) -> torch.Tensor:
...
num_tokens, hidden_size = query.shape
# Reshape the query, key, and value tensors.
# query.shape=[135, 32, 128]
query = query.view(-1, self.num_heads, self.head_size)
# key.shape=[135, 8, 128]
key = key.view(-1, self.num_kv_heads, self.head_size)
# value.shape=[135, 8, 128]
value = value.view(-1, self.num_kv_heads, self.head_size)
if kv_cache is not None:
# 取出该层缓存key的block,key_cache.shape=[1756, 16, 8, 128]
# 关于这个shape的维度含义,再第四篇文章中已经讲过了
key_cache = kv_cache[0]
value_cache = kv_cache[1]
# 调用cuda核函数缓存kv值
ops.reshape_and_cache_flash(
key,
value,
key_cache,
value_cache,
attn_metadata.slot_mapping.flatten(),
self.kv_cache_dtype,
k_scale,
v_scale,
)
...
if prefill_meta := attn_metadata.prefill_metadata:
# Prompt run.
if (kv_cache is None or prefill_meta.block_tables is None
or prefill_meta.block_tables.numel() == 0):
# 计算attention值
out = flash_attn_varlen_func(
q=query,
k=key,
v=value,
cu_seqlens_q=prefill_meta.seq_start_loc,
cu_seqlens_k=prefill_meta.seq_start_loc,
max_seqlen_q=prefill_meta.max_prefill_seq_len,
max_seqlen_k=prefill_meta.max_prefill_seq_len,
softmax_scale=self.scale,
causal=True,
window_size=self.sliding_window,
alibi_slopes=self.alibi_slopes,
softcap=self.logits_soft_cap,
)
assert output[:num_prefill_tokens].shape == out.shape
output[:num_prefill_tokens] = out
else:
...
# Reshape the output tensor.
return output.view(num_tokens, hidden_size)
该模块主要完成两个功能:缓存kv值和计算attention。
保存kv-cache的操作发生在ops.reshape_and_cache_flash(…)中:
- vllm/_custom_ops.py
def reshape_and_cache_flash(
key: torch.Tensor,
value: torch.Tensor,
key_cache: torch.Tensor,
value_cache: torch.Tensor,
slot_mapping: torch.Tensor,
kv_cache_dtype: str,
k_scale: float,
v_scale: float,
) -> None:
torch.ops._C_cache_ops.reshape_and_cache_flash(key, value, key_cache,
value_cache, slot_mapping,
kv_cache_dtype, k_scale,
v_scale)
很可惜,torch.ops._C_cache_ops.reshape_and_cache_flash已经被打包到.so文件中,不能断点调试。这是用CUDA实现的核函数,我们可以找到编译前的源码:
- csrc/cache_kernels.cu
void reshape_and_cache_flash(
torch::Tensor& key, // [num_tokens, num_heads, head_size]
torch::Tensor& value, // [num_tokens, num_heads, head_size]
torch::Tensor& key_cache, // [num_blocks, block_size, num_heads, head_size]
torch::Tensor&
value_cache, // [num_blocks, block_size, num_heads, head_size]
torch::Tensor& slot_mapping, // [num_tokens]
const std::string& kv_cache_dtype, const double k_scale,
const double v_scale) {
...
TORCH_CHECK(key_cache.stride(0) == value_cache.stride(0));
dim3 grid(num_tokens);
dim3 block(std::min(num_heads * head_size, 512));
const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
DISPATCH_BY_KV_CACHE_DTYPE(key.dtype(), kv_cache_dtype,
CALL_RESHAPE_AND_CACHE_FLASH);
}
这是个数据预处理函数,真正工作的是被CALL_RESHAPE_AND_CACHE_FLASH宏定义的函数:
#define CALL_RESHAPE_AND_CACHE_FLASH(KV_T, CACHE_T, KV_DTYPE) \
vllm::reshape_and_cache_flash_kernel<KV_T, CACHE_T, KV_DTYPE> \
<<<grid, block, 0, stream>>>( \
reinterpret_cast<KV_T*>(key.data_ptr()), \
reinterpret_cast<KV_T*>(value.data_ptr()), \
reinterpret_cast<CACHE_T*>(key_cache.data_ptr()), \
reinterpret_cast<CACHE_T*>(value_cache.data_ptr()), \
slot_mapping.data_ptr<int64_t>(), block_stride, key_stride, \
value_stride, num_heads, head_size, block_size, k_scale, v_scale);
__global__ void reshape_and_cache_flash_kernel(
const scalar_t* __restrict__ key, // [num_tokens, num_heads, head_size]
const scalar_t* __restrict__ value, // [num_tokens, num_heads, head_size]
cache_t* __restrict__ key_cache, // [num_blocks, block_size, num_heads,
// head_size]
cache_t* __restrict__ value_cache, // [num_blocks, block_size, num_heads,
// head_size]
const int64_t* __restrict__ slot_mapping, // [num_tokens]
const int block_stride, const int key_stride, const int value_stride,
const int num_heads, const int head_size, const int block_size,
const float k_scale, const float v_scale) {
// 每个cuda block处理一个token
const int64_t token_idx = blockIdx.x;
const int64_t slot_idx = slot_mapping[token_idx];
// 如果槽索引小于 0,表示 token 被填充(padding),则直接返回
if (slot_idx < 0) {
return;
}
// 计算 block 索引和 block 内的偏移量
const int64_t block_idx = slot_idx / block_size;
const int64_t block_offset = slot_idx % block_size;
// 计算每个注意力头和每个头的总数据量
const int n = num_heads * head_size;
// 每个线程处理数据中的一个元素
for (int i = threadIdx.x; i < n; i += blockDim.x) {
// 计算当前线程处理的 key 和 value 数据在输入数组中的索引
const int64_t src_key_idx = token_idx * key_stride + i;
const int64_t src_value_idx = token_idx * value_stride + i;
// 计算当前元素对应的注意力头索引和头内的偏移量
const int head_idx = i / head_size;
const int head_offset = i % head_size;
// 计算在缓存中目标位置的索引
const int64_t tgt_key_value_idx = block_idx * block_stride +
block_offset * num_heads * head_size +
head_idx * head_size + head_offset;
// 从输入数组中加载当前的 key 和 value 数据
scalar_t tgt_key = key[src_key_idx];
scalar_t tgt_value = value[src_value_idx];
// 缓存kv值
// 如果使用自动类型,不进行额外的缩放和转换,直接存储
if constexpr (kv_dt == Fp8KVCacheDataType::kAuto) {
key_cache[tgt_key_value_idx] = tgt_key;
value_cache[tgt_key_value_idx] = tgt_value;
} else { // 否则,使用指定的缩放因子对数据进行转换后存储
key_cache[tgt_key_value_idx] =
fp8::scaled_convert<cache_t, scalar_t, kv_dt>(tgt_key, k_scale);
value_cache[tgt_key_value_idx] =
fp8::scaled_convert<cache_t, scalar_t, kv_dt>(tgt_value, v_scale);
}
}
}
通过写的reshape_and_cache_flash_kernel的注释已经清楚看到pagedAttention缓存kv的真实过程。
关于attention的计算,经过多次跳转好,由如下代码实现:
- /usr/local/miniconda3/lib/python3.11/site-packages/vllm_flash_attn/flash_attn_interface.py
def _flash_attn_varlen_forward(q, k, v, cu_seqlens_q,cu_seqlens_k, max_seqlen_q, max_seqlen_k, dropout_p,
softmax_scale, causal, window_size, softcap, alibi_slopes, return_softmax, block_table,
*, out=None
):
q, k, v = [maybe_contiguous(x) for x in (q, k, v)]
out, q, k, v, out_padded, softmax_lse, S_dmask, rng_state = flash_attn_cuda.varlen_fwd( q, k, v, out,
cu_seqlens_q, cu_seqlens_k, None, block_table,alibi_slopes, max_seqlen_q, max_seqlen_k,
dropout_p, softmax_scale, False, causal, window_size[0], window_size[1], softcap,
return_softmax, None,
)
# if out.isnan().any() or softmax_lse.isnan().any():
# breakpoint()
return out, q, k, v, out_padded, softmax_lse, S_dmask, rng_state
flash_attn_cuda函数来自. so包, 没找到源码!
8.32 非第一次推理(decode阶段)
经过预填充阶段后,vllm会把模型本身及推理过程处理成cuda计算图,正式的解码阶段,会直接使用计算图获得推理结果。
对应8.3开始代码中的model_executable 选择分支:
在decode推理前,我们先来看下输入参数与prefill有什么不同:
在初始阶段我们设定每个seq生成4条output,关于拼接原理,在第一篇文章由详细讲过了。
从model_input数据结构看,此时的模型输入只有一个token(这是prefill后生成的第一个token)。
看上图中input_tokens,有没有发现什么奇怪的事?
我们输入的prompt数量为3,设定每个prompt生成4条output,为什么这里是16个token? 这是因为decode使用的是cuda计算图,图需要固定大小的张量,这部分细节不想在此深究了~,有兴趣的自行探索吧。
计算图执行的推理流程如下:
- vllm/worker/model_runner.py class CUDAGraphRunner
def forward(
self,
input_ids: torch.Tensor, # 输入的 token IDs 张量
positions: torch.Tensor, # 输入的位置信息张量
kv_caches: List[torch.Tensor], # KV cache 列表(这里被删除,不再使用)
attn_metadata: AttentionMetadata, # 注意力元数据,包含 slot_mapping 和其他解码元数据
intermediate_tensors: Optional[IntermediateTensors], # 中间张量,可能包含中间结果的数据
**kwargs, # 其他关键字参数,用于额外的自定义操作
) -> torch.Tensor:
# KV caches 是固定的张量,因此在后续操作中不需要复制它们
del kv_caches # 删除 kv_caches,因为它们不再需要
# 将输入张量复制到模型的输入缓冲区
self.input_buffers["input_ids"].copy_(input_ids, non_blocking=True) # 复制输入 token IDs
self.input_buffers["positions"].copy_(positions, non_blocking=True) # 复制位置信息
self.input_buffers["slot_mapping"].copy_(attn_metadata.slot_mapping, non_blocking=True) # 复制 slot_mapping
# 根据后端的不同,处理额外的输入数据
if self.backend_name != "flashinfer":
# 如果后端不是 "flashinfer",复制解码元数据中的序列长度和块表
self.input_buffers["seq_lens_tensor"].copy_(
attn_metadata.decode_metadata.seq_lens_tensor,
non_blocking=True)
self.input_buffers["block_tables"].copy_(attn_metadata.decode_metadata.block_tables, non_blocking=True)
# 如果 input_buffers 包含 "seqlen_agnostic_capture_inputs",在 CUDA 图之前复制输入
if "seqlen_agnostic_capture_inputs" in self.input_buffers:
self.model.copy_inputs_before_cuda_graphs(self.input_buffers, **kwargs)
# 如果提供了 intermediate_tensors,复制这些中间张量到输入缓冲区
if intermediate_tensors is not None:
for key in intermediate_tensors.tensors:
self.input_buffers[key].copy_(intermediate_tensors[key], non_blocking=True)
# 执行计算图,计算存储在self的各个属性中
# 这个计算图是核心代码,可惜这里看不到。
self.graph.replay()
# 如果 input_buffers 包含 "seqlen_agnostic_capture_inputs",在 CUDA 图之后复制输出
if "seqlen_agnostic_capture_inputs" in self.input_buffers:
self.model.copy_outputs_after_cuda_graphs(self.input_buffers, **kwargs)
# 返回输出张量
if get_pp_group().is_last_rank:
return self.output_buffers["hidden_states"] # 如果是最后一个进程,返回隐藏状态张量
return self.output_buffers # 否则返回输出缓冲区
后记
本篇文章仅梳理了vllm大致的模型推理流程,省去了很多代码细节;即使如此,这仍是一个极其复杂的耦合过程。在写作本篇文章时,官网又把vllm更新到了0.6.0,与0.5.4做了比较,又有很多改动。这个系列的文章还没写完,就要过时了???
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)