八 模型推理细节探索

8.1 回顾下step的流程

在这里插入图片描述

    def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
        # 多GPU并行推理时走AsyncLLMEngine分支。如果进入当前LLMEngine,性能会下降,这里会抛出异常。
        if self.parallel_config.pipeline_parallel_size > 1:
            raise NotImplementedError(
                    "Pipeline parallelism is only supported through AsyncLLMEngine "
                    "as performance will be severely degraded otherwise.")

        # 上述if判断表明,只有一个GPU可用。因此self.scheduler也只有一个元素,是当前GPU的调度
        # 该函数调用改变调度的内部状态(self.running、self.swapped 和 self.waiting)
        seq_group_metadata_list, scheduler_outputs = self.scheduler[0].schedule()

        if not scheduler_outputs.is_empty():
            finished_requests_ids = self.scheduler[0].get_and_reset_finished_requests_ids()
            execute_model_req = ExecuteModelRequest(...)

            output = self.model_executor.execute_model(execute_model_req=execute_model_req)
        else:
            output = []

        request_outputs = self._process_model_outputs(
                output, scheduler_outputs.scheduled_seq_groups,
                scheduler_outputs.ignored_seq_groups, seq_group_metadata_list)
		...
        return request_outputs

self.model_executor.execute_model,调用与vllm耦合后的LLM模型进行推理。这是本篇要讲解内容,我们先来看下模型输入长什么样,

execute_model_req:
在这里插入图片描述
从调度系统中获得,可以用于做推理的seq_groups, 对seq_groups及可用到的各种属性做了封装,暂时不必管都是什么意思,用到时再现场分析。

8.2 如何使用具体模型

在这里插入图片描述

8.1中完成了资源调度工作,接下来该送入初始化好的模型进行推理了。不过vllm对具体模型的又做了多层封装:

8.1中模型调用指向gpu_executor:

  • vllm_module/executor/gpu_executor.py
    def execute_model(
            self, execute_model_req: ExecuteModelRequest
    ) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
        output = self.driver_worker.execute_model(execute_model_req)
        return output

self.driver_worker.execute_model指向work_base实例的方法, 这个execute_model方法主要对输入数据进行预处理:

  • vllm/worker/worker_base.py class LocalOrDistributedWorkerBase(WorkerBase)
    def execute_model(
            self,
            execute_model_req: Optional[ExecuteModelRequest] = None
    ) -> Optional[List[SamplerOutput]]:
        """Executes at least one model step on the given sequences, unless no
        sequences are provided."""
        if self.is_driver_worker:
			...
            model_input: ModelRunnerInputBase = (
                self.model_runner.prepare_model_input(
                        execute_model_req.seq_group_metadata_list,
                        execute_model_req.virtual_engine,
                        execute_model_req.finished_requests_ids))
            num_steps = execute_model_req.num_steps
            ...
        else:
			...

        self.execute_worker(worker_input)
		...
        output = self.model_runner.execute_model(
                model_input, self.kv_cache[worker_input.virtual_engine]
                if self.kv_cache is not None else None, intermediate_tensors, num_steps)
		...
        # output is List[SamplerOutput]
        return output

8.2 input_ids预处理与block槽位填充

self.model_runner.prepare_model_input主要功能是合并input,本次共传入3条数据,但在输入模型前,vllm把它们的token全部合在一起了。它们的位置关系通过position区分,这部分代码比较简单,不再贴出了(代码多次跳转后,在vllm_module/worker/model_runner.py def build(…)中完成)

input_ids.shape=[num_tokens, ] 假如输入的3条prompt长度分别为48,44,43,那么num_tokens=135
但是transformers中的推理模式输入shape为[batch_size, num_tokens], vllm 为什么要这样处理呢?

我认为目的是为了避免seq的pad步骤, 因为transformers的推理格式需要对seq做pad,处理为同一长度才能进行batch推理。vllm合并后相当于每个token就是一个batch,不需要再做pad和去pad操作(input_ids做embedding后才做推理,这时的shape为[num_tokens, embed_size],此时num_tokens成为形式上的batchsize)。

input_ids合并后计算结果与transformers是一样的,因为线性变换是逐元素进行的(只是shape有所不同)。

vllm与transformers对输入input的处理方式不同,对应的模型结构也要改变,在第四篇文章,在load_weight中有对hf模型是如何转换到vllm规格的有详细描述。

在这里插入图片描述

在进行推理前,我们还需要把准备prefill的prompt的每个token(就是上面的input_ids, 这时还没做embedding操作)映射到block中,如seq_id=0的prompt长度为48,由于block_size=16, 所以他刚好能填充3个block(编号为2759,2758,2757)。映射关系会写入到slot_mapping列表中,那么这个操作如何来做呢?
在这里插入图片描述

  • vllm/attention/backends/utils.py
    经过多次跳转后(头都绕晕了),槽位填充的核心代码如下,代码比较简单,就是给标记已使用的block对应的槽位,在全局blocks中的索引号。以索引号2759的block来说,它的第一个槽位号是275916=44144,对应着该prompt(或者说decode阶段的1个待输出的token) 的第一个token。,这么做的目的是为以后把计算好的kv值直接填入这些槽位,起到索引作用。
    在这里插入图片描述
    最后slot_mapping填充效果如下面有图所示,第一个prompt有48个token,刚好能填满3个block,那么decode阶段,该seq生成第一个token时就要申请一个新的block了。第三个prompt有43个token,填不满3个block,它的最后一个block使用了11个槽位(43-16
    2),即从44016到44026。
    在这里插入图片描述

8.3 模型推理

8.2节 中self.model_runner.execute_model指向如下代码,这段的代码的主要功能是分配推理模型model_executable,

  • vllm/worker/model_runner.py
    model_input:
    在这里插入图片描述
    def execute_model(
            self,
            model_input: ModelInputForGPUWithSamplingMetadata,
            kv_caches: List[torch.Tensor],
            intermediate_tensors: Optional[IntermediateTensors] = None,
            num_steps: int = 1,
    ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]:
		...
        # Currently cuda graph is only supported by the decode phase.
        assert model_input.attn_metadata is not None
        prefill_meta = model_input.attn_metadata.prefill_metadata
        decode_meta = model_input.attn_metadata.decode_metadata
        # TODO(andoorve): We can remove this once all
        # virtual engines share the same kv cache.
        virtual_engine = model_input.virtual_engine
        if prefill_meta is None and decode_meta.use_cuda_graph:
            assert model_input.input_tokens is not None
            graph_batch_size = model_input.input_tokens.shape[0]
            model_executable = self.graph_runners[virtual_engine][graph_batch_size]
        else:
            model_executable = self.model
		...
        hidden_or_intermediate_states = model_executable(
                input_ids=model_input.input_tokens,
                positions=model_input.input_positions,
                kv_caches=kv_caches,
                attn_metadata=model_input.attn_metadata,
                intermediate_tensors=intermediate_tensors,
                **MultiModalInputs.as_kwargs(multi_modal_kwargs, device=self.device),
                **seqlen_agnostic_kwargs)
		...

还记得第四篇文章的get_model的操作吗,也是在model_runner.py中完成的,所以这里的self.model之前初始化过的模型。
我们使用第四篇文章用过的llama3.1来剖析剩余代码,model_executable最终执行llama模型的forward代码。

  • vllm/model_executor/models/llama.py class LlamaForCausalLM
    在这里插入图片描述

llama结构类型的大模型的推理,可分为两个阶段:prompt和generate, 在使用kv-cache的情况下,二者的区别仅是输入数据维度的差异,即generate阶段seq序列长度始终为1, 不过在vllm中却有不一样的处理,prefill之后,会把模型构建为cuda计算图,这样计算会更加高效。

经过漫长的准备工作,终于可以开始具体的推理工作,为了这个时刻,整整铺垫了四篇文章!

vllm最终调用的模型推理代码:

  • vllm/model_executor/models/llama.py class LlamaModel

8.31 第一次推理(prefill)
又称为预填充

输入参数:
在这里插入图片描述

    def forward(
        self,
        input_ids: Optional[torch.Tensor],
        positions: torch.Tensor,
        kv_caches: List[torch.Tensor],
        attn_metadata: AttentionMetadata,
        intermediate_tensors: Optional[IntermediateTensors],
        inputs_embeds: Optional[torch.Tensor] = None,
    ) -> Union[torch.Tensor, IntermediateTensors]:
        if get_pp_group().is_first_rank:
            if inputs_embeds is not None:
                hidden_states = inputs_embeds
            else:
            	# 输入的通常都是未embedding的token,在这里进行词嵌入
                hidden_states = self.get_input_embeddings(input_ids)
            residual = None
        else:
            assert intermediate_tensors is not None
            hidden_states = intermediate_tensors["hidden_states"]
            residual = intermediate_tensors["residual"]

        for i in range(self.start_layer, self.end_layer):
            layer = self.layers[i]
            hidden_states, residual = layer(
                positions,	# shape=[num_tokens,]
                hidden_states,	# shape=[num_tokens,embed_size]
                kv_caches[i - self.start_layer],	# 当前layer对应的kv-cache
                attn_metadata,	# 保存着slot_mapping, 通过这个map向kv-cache中填值
                residual,
            )
		...
        return hidden_states

计算模块注意发生在layer层的attention部分:

  • vllm_module/model_executor/models/llama.py class LlamaAttention
    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        kv_cache: torch.Tensor,
        attn_metadata: AttentionMetadata,
    ) -> torch.Tensor:
        qkv, _ = self.qkv_proj(hidden_states)
        q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
        q, k = self.rotary_emb(positions, q, k)
        attn_output = self.attn(q, k, v, kv_cache, attn_metadata)
        output, _ = self.o_proj(attn_output)
        return output

计算过程中产生变量:
在这里插入图片描述
k,v的shape为[135,1024], q的shape为[135,4096], 说明使用了GQA技术,即4个q共享一个kv

接下来我们看下最重要的self.attn(…)的计算模块:

  • vllm_module/attention/backends/flash_attn.py class FlashAttentionImpl(AttentionImpl)
    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        kv_cache: torch.Tensor,
        attn_metadata: FlashAttentionMetadata,
        k_scale: float = 1.0,
        v_scale: float = 1.0,
        attn_type: AttentionType = AttentionType.DECODER,
    ) -> torch.Tensor:
		...
        num_tokens, hidden_size = query.shape
        # Reshape the query, key, and value tensors.
        # query.shape=[135, 32, 128]
        query = query.view(-1, self.num_heads, self.head_size)
        # key.shape=[135, 8, 128]
        key = key.view(-1, self.num_kv_heads, self.head_size)
        # value.shape=[135, 8, 128]
        value = value.view(-1, self.num_kv_heads, self.head_size)

        if kv_cache is not None:
        	# 取出该层缓存key的block,key_cache.shape=[1756, 16, 8, 128]
        	# 关于这个shape的维度含义,再第四篇文章中已经讲过了
            key_cache = kv_cache[0]
            value_cache = kv_cache[1]
			# 调用cuda核函数缓存kv值
            ops.reshape_and_cache_flash(
                key,
                value,
                key_cache,
                value_cache,
                attn_metadata.slot_mapping.flatten(),
                self.kv_cache_dtype,
                k_scale,
                v_scale,
            )
		...
        if prefill_meta := attn_metadata.prefill_metadata:
            # Prompt run.
            if (kv_cache is None or prefill_meta.block_tables is None
                    or prefill_meta.block_tables.numel() == 0):
				# 计算attention值
                out = flash_attn_varlen_func(
                    q=query,
                    k=key,
                    v=value,
                    cu_seqlens_q=prefill_meta.seq_start_loc,
                    cu_seqlens_k=prefill_meta.seq_start_loc,
                    max_seqlen_q=prefill_meta.max_prefill_seq_len,
                    max_seqlen_k=prefill_meta.max_prefill_seq_len,
                    softmax_scale=self.scale,
                    causal=True,
                    window_size=self.sliding_window,
                    alibi_slopes=self.alibi_slopes,
                    softcap=self.logits_soft_cap,
                )
                assert output[:num_prefill_tokens].shape == out.shape
                output[:num_prefill_tokens] = out
            else:
				...

        # Reshape the output tensor.
        return output.view(num_tokens, hidden_size)

该模块主要完成两个功能:缓存kv值和计算attention。

保存kv-cache的操作发生在ops.reshape_and_cache_flash(…)中:

  • vllm/_custom_ops.py
def reshape_and_cache_flash(
    key: torch.Tensor,
    value: torch.Tensor,
    key_cache: torch.Tensor,
    value_cache: torch.Tensor,
    slot_mapping: torch.Tensor,
    kv_cache_dtype: str,
    k_scale: float,
    v_scale: float,
) -> None:
    torch.ops._C_cache_ops.reshape_and_cache_flash(key, value, key_cache,
                                                   value_cache, slot_mapping,
                                                   kv_cache_dtype, k_scale,
                                                   v_scale)

很可惜,torch.ops._C_cache_ops.reshape_and_cache_flash已经被打包到.so文件中,不能断点调试。这是用CUDA实现的核函数,我们可以找到编译前的源码:

  • csrc/cache_kernels.cu
void reshape_and_cache_flash(
    torch::Tensor& key,        // [num_tokens, num_heads, head_size]
    torch::Tensor& value,      // [num_tokens, num_heads, head_size]
    torch::Tensor& key_cache,  // [num_blocks, block_size, num_heads, head_size]
    torch::Tensor&
        value_cache,  // [num_blocks, block_size, num_heads, head_size]
    torch::Tensor& slot_mapping,  // [num_tokens]
    const std::string& kv_cache_dtype, const double k_scale,
    const double v_scale) {
	...
  TORCH_CHECK(key_cache.stride(0) == value_cache.stride(0));

  dim3 grid(num_tokens);
  dim3 block(std::min(num_heads * head_size, 512));
  const at::cuda::OptionalCUDAGuard device_guard(device_of(key));
  const cudaStream_t stream = at::cuda::getCurrentCUDAStream();

  DISPATCH_BY_KV_CACHE_DTYPE(key.dtype(), kv_cache_dtype,
                             CALL_RESHAPE_AND_CACHE_FLASH);
}

这是个数据预处理函数,真正工作的是被CALL_RESHAPE_AND_CACHE_FLASH宏定义的函数

#define CALL_RESHAPE_AND_CACHE_FLASH(KV_T, CACHE_T, KV_DTYPE)         \
  vllm::reshape_and_cache_flash_kernel<KV_T, CACHE_T, KV_DTYPE>       \
      <<<grid, block, 0, stream>>>(                                   \
          reinterpret_cast<KV_T*>(key.data_ptr()),                    \
          reinterpret_cast<KV_T*>(value.data_ptr()),                  \
          reinterpret_cast<CACHE_T*>(key_cache.data_ptr()),           \
          reinterpret_cast<CACHE_T*>(value_cache.data_ptr()),         \
          slot_mapping.data_ptr<int64_t>(), block_stride, key_stride, \
          value_stride, num_heads, head_size, block_size, k_scale, v_scale);
__global__ void reshape_and_cache_flash_kernel(
    const scalar_t* __restrict__ key,    // [num_tokens, num_heads, head_size]
    const scalar_t* __restrict__ value,  // [num_tokens, num_heads, head_size]
    cache_t* __restrict__ key_cache,     // [num_blocks, block_size, num_heads,
                                         // head_size]
    cache_t* __restrict__ value_cache,   // [num_blocks, block_size, num_heads,
                                         // head_size]
    const int64_t* __restrict__ slot_mapping,  // [num_tokens]
    const int block_stride, const int key_stride, const int value_stride,
    const int num_heads, const int head_size, const int block_size,
    const float k_scale, const float v_scale) {
  // 每个cuda block处理一个token
  const int64_t token_idx = blockIdx.x;
  const int64_t slot_idx = slot_mapping[token_idx];
  
  // 如果槽索引小于 0,表示 token 被填充(padding),则直接返回
  if (slot_idx < 0) {
    return;
  }
   // 计算 block 索引和 block 内的偏移量
  const int64_t block_idx = slot_idx / block_size;
  const int64_t block_offset = slot_idx % block_size;
  
  // 计算每个注意力头和每个头的总数据量
  const int n = num_heads * head_size;
  
  // 每个线程处理数据中的一个元素
  for (int i = threadIdx.x; i < n; i += blockDim.x) {
    // 计算当前线程处理的 key 和 value 数据在输入数组中的索引
    const int64_t src_key_idx = token_idx * key_stride + i;
    const int64_t src_value_idx = token_idx * value_stride + i;
    // 计算当前元素对应的注意力头索引和头内的偏移量
    const int head_idx = i / head_size;
    const int head_offset = i % head_size;
    // 计算在缓存中目标位置的索引
    const int64_t tgt_key_value_idx = block_idx * block_stride +
                                      block_offset * num_heads * head_size +
                                      head_idx * head_size + head_offset;
    
    // 从输入数组中加载当前的 key 和 value 数据
    scalar_t tgt_key = key[src_key_idx];
    scalar_t tgt_value = value[src_value_idx];
    
    // 缓存kv值
    // 如果使用自动类型,不进行额外的缩放和转换,直接存储
    if constexpr (kv_dt == Fp8KVCacheDataType::kAuto) {
      key_cache[tgt_key_value_idx] = tgt_key;
      value_cache[tgt_key_value_idx] = tgt_value;
    } else {	// 否则,使用指定的缩放因子对数据进行转换后存储
      key_cache[tgt_key_value_idx] =
          fp8::scaled_convert<cache_t, scalar_t, kv_dt>(tgt_key, k_scale);
      value_cache[tgt_key_value_idx] =
          fp8::scaled_convert<cache_t, scalar_t, kv_dt>(tgt_value, v_scale);
    }
  }
}

通过写的reshape_and_cache_flash_kernel的注释已经清楚看到pagedAttention缓存kv的真实过程。

关于attention的计算,经过多次跳转好,由如下代码实现:

  • /usr/local/miniconda3/lib/python3.11/site-packages/vllm_flash_attn/flash_attn_interface.py
def _flash_attn_varlen_forward(q,  k, v, cu_seqlens_q,cu_seqlens_k,  max_seqlen_q,  max_seqlen_k,   dropout_p,
    softmax_scale,  causal,  window_size,  softcap,   alibi_slopes,   return_softmax, block_table,
    *, out=None
):
    q, k, v = [maybe_contiguous(x) for x in (q, k, v)]
    out, q, k, v, out_padded, softmax_lse, S_dmask, rng_state = flash_attn_cuda.varlen_fwd( q, k,  v,  out, 
                 cu_seqlens_q, cu_seqlens_k, None,    block_table,alibi_slopes, max_seqlen_q, max_seqlen_k,
                 dropout_p,   softmax_scale,   False,   causal,    window_size[0],    window_size[1],   softcap,
                 return_softmax,     None,
    )
    # if out.isnan().any() or softmax_lse.isnan().any():
    #     breakpoint()
    return out, q, k, v, out_padded, softmax_lse, S_dmask, rng_state

flash_attn_cuda函数来自. so包, 没找到源码!

8.32 非第一次推理(decode阶段)

经过预填充阶段后,vllm会把模型本身及推理过程处理成cuda计算图,正式的解码阶段,会直接使用计算图获得推理结果。

对应8.3开始代码中的model_executable 选择分支:
在这里插入图片描述

在decode推理前,我们先来看下输入参数与prefill有什么不同:
在初始阶段我们设定每个seq生成4条output,关于拼接原理,在第一篇文章由详细讲过了。
从model_input数据结构看,此时的模型输入只有一个token(这是prefill后生成的第一个token)。

在这里插入图片描述
看上图中input_tokens,有没有发现什么奇怪的事?

我们输入的prompt数量为3,设定每个prompt生成4条output,为什么这里是16个token? 这是因为decode使用的是cuda计算图,图需要固定大小的张量,这部分细节不想在此深究了~,有兴趣的自行探索吧。

计算图执行的推理流程如下:

  • vllm/worker/model_runner.py class CUDAGraphRunner
def forward(
        self,
        input_ids: torch.Tensor,                       # 输入的 token IDs 张量
        positions: torch.Tensor,                       # 输入的位置信息张量
        kv_caches: List[torch.Tensor],                 # KV cache 列表(这里被删除,不再使用)
        attn_metadata: AttentionMetadata,              # 注意力元数据,包含 slot_mapping 和其他解码元数据
        intermediate_tensors: Optional[IntermediateTensors],  # 中间张量,可能包含中间结果的数据
        **kwargs,                                      # 其他关键字参数,用于额外的自定义操作
) -> torch.Tensor:
    # KV caches 是固定的张量,因此在后续操作中不需要复制它们
    del kv_caches  # 删除 kv_caches,因为它们不再需要

    # 将输入张量复制到模型的输入缓冲区
    self.input_buffers["input_ids"].copy_(input_ids, non_blocking=True)  # 复制输入 token IDs
    self.input_buffers["positions"].copy_(positions, non_blocking=True)  # 复制位置信息
    self.input_buffers["slot_mapping"].copy_(attn_metadata.slot_mapping, non_blocking=True)  # 复制 slot_mapping
    
    # 根据后端的不同,处理额外的输入数据
    if self.backend_name != "flashinfer":
        # 如果后端不是 "flashinfer",复制解码元数据中的序列长度和块表
        self.input_buffers["seq_lens_tensor"].copy_(
                attn_metadata.decode_metadata.seq_lens_tensor,
                non_blocking=True)
        self.input_buffers["block_tables"].copy_(attn_metadata.decode_metadata.block_tables, non_blocking=True)
    
    # 如果 input_buffers 包含 "seqlen_agnostic_capture_inputs",在 CUDA 图之前复制输入
    if "seqlen_agnostic_capture_inputs" in self.input_buffers:
        self.model.copy_inputs_before_cuda_graphs(self.input_buffers, **kwargs)

    # 如果提供了 intermediate_tensors,复制这些中间张量到输入缓冲区
    if intermediate_tensors is not None:
        for key in intermediate_tensors.tensors:
            self.input_buffers[key].copy_(intermediate_tensors[key], non_blocking=True)
    
    # 执行计算图,计算存储在self的各个属性中
    # 这个计算图是核心代码,可惜这里看不到。
    self.graph.replay()
    
    # 如果 input_buffers 包含 "seqlen_agnostic_capture_inputs",在 CUDA 图之后复制输出
    if "seqlen_agnostic_capture_inputs" in self.input_buffers:
        self.model.copy_outputs_after_cuda_graphs(self.input_buffers, **kwargs)
    
    # 返回输出张量
    if get_pp_group().is_last_rank:
        return self.output_buffers["hidden_states"]  # 如果是最后一个进程,返回隐藏状态张量

    return self.output_buffers  # 否则返回输出缓冲区

后记

本篇文章仅梳理了vllm大致的模型推理流程,省去了很多代码细节;即使如此,这仍是一个极其复杂的耦合过程。在写作本篇文章时,官网又把vllm更新到了0.6.0,与0.5.4做了比较,又有很多改动。这个系列的文章还没写完,就要过时了???

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐