一. tansfomer

1. 介绍

自注意力同时具有并行计算和最短的最大路径长度这两个优势。因此使用自注意力来设计深度架构是很有吸引力的。对比依赖循环神经网络实现输入表示的自注意力模型,transformer模型完全基于注意力机制,没有任何卷积层或循环神经网络层 。尽管transformer最初是应用于在文本数据上的序列到序列学习,但现在已经推广到各种现代的深度学习中,例如语言、视觉、语音和强化学习领域。

2. 模型

Transformer是由编码器和解码器组成的,其整体架构图如下图所示。与李沐动手学深度学习V2-基于注意力机制的seq2seq相比,transformer的编码器和解码器是基于自注意力的模块叠加而成的,源(输入)序列和目标(输出)序列的嵌入(embedding)表示再加上位置编码(positional encoding),再分别输入到编码器和解码器中。
transformer架构
上图概述了transformer的架构。从宏观角度来看,transformer的编码器是由多个相同的层叠加而成的,每个层都有两个子层(子层表示为 s u b l a y e r \mathrm{sublayer} sublayer)。第一个子层是多头自注意力(multi-head self-attention)汇聚;第二个子层是基于位置的前馈网络(positionwise feed-forward network)。具体来说在计算编码器的自注意力时,查询、键和值都来自前一个编码器层的输出。受 残差网络的启发,每个子层都采用了残差连接(residual connection)。在transformer中,对于序列中任何位置的任何输入 x ∈ R d \mathbf{x} \in \mathbb{R}^d xRd,都要求满足 s u b l a y e r ( x ) ∈ R d \mathrm{sublayer}(\mathbf{x}) \in \mathbb{R}^d sublayer(x)Rd,以便残差连接满足 x + s u b l a y e r ( x ) ∈ R d \mathbf{x} + \mathrm{sublayer}(\mathbf{x}) \in \mathbb{R}^d x+sublayer(x)Rd。在残差连接的加法计算之后,紧接着应用层规范化(layer normalization)。因此输入序列对应的每个位置,transformer编码器都将输出一个 d d d维表示向量

Transformer解码器也是由多个相同的层叠加而成的,并且层中使用了残差连接和层规范化。除了编码器中描述的两个子层之外,解码器还在这两个子层之间插入了第三个子层,称为编码器-解码器注意力(encoder-decoder attention)层。在编码器-解码器注意力中,查询来自前一个解码器层的输出,而键和值来自整个编码器的输出。在解码器自注意力中,查询、键和值都来自上一个解码器层的输出。但是解码器中的每个位置只能考虑该位置之前的所有位置。这种掩蔽(masked)注意力保留了自回归(auto-regressive)属性,确保预测仅依赖于已生成的输出词元。

二. 实现

1. 基于位置的前馈网络

基于位置的前馈网络对序列中的所有位置的表示进行变换时使用的是同一个多层感知机(MLP),这就是称前馈网络是基于位置的(positionwise)的原因。在下面的实现中,输入X的形状(批量大小,时间步数或序列长度,隐单元数或特征维度)将被一个两层的感知机转换成形状为(批量大小,时间步数,ffn_num_outputs)的输出张量。

import math
import d2l.torch
import torch
from torch import nn
import pandas as pd
class PositionWiseFFN(nn.Module):
    """基于位置的前馈网络"""
    def __init__(self,ffn_num_inputs,ffn_num_hiddens,ffn_num_outputs):
        super(PositionWiseFFN,self).__init__()
        self.dense1 = nn.Linear(ffn_num_inputs,ffn_num_hiddens)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(ffn_num_hiddens,ffn_num_outputs)
    def forward(self,X):
        return self.dense2(self.relu(self.dense1(X)))

下面的例子显示改变张量的最里层维度的尺寸,会改变成基于位置的前馈网络的输出尺寸。因为用同一个多层感知机对所有位置上的输入进行变换,所以当所有这些位置的输入相同时,它们的输出也是相同的。

positionWiseFFN = PositionWiseFFN(4,4,8)
positionWiseFFN.eval()
positionWiseFFN(torch.ones(size=(2,3,4)))[0]
输出结果如下:
tensor([[-0.1271, -0.8308, -0.7345, -0.0625,  0.8272, -0.6249, -0.0928, -0.5698],
        [-0.1271, -0.8308, -0.7345, -0.0625,  0.8272, -0.6249, -0.0928, -0.5698],
        [-0.1271, -0.8308, -0.7345, -0.0625,  0.8272, -0.6249, -0.0928, -0.5698]],
       grad_fn=<SelectBackward0>)

2. 残差连接和层规范化

transformer中的“加法和规范化(add&norm)”组件,是由残差连接和紧随其后的层规范化组成的。两者都是构建有效的深度架构的关键。
batch normalization是在一个小批量的样本内基于批量规范化对数据进行重新中心化和重新缩放的调整。层规范化和批量规范化的目标相同,但层规范化是基于特征维度进行规范化。尽管批量规范化在计算机视觉中被广泛应用,但在自然语言处理任务中(输入通常是变长序列)批量规范化通常不如层规范化的效果好。
下面代码对比不同维度的层规范化和批量规范化的效果:

ln = nn.LayerNorm(3) #对每一行进行求均值为0,方差为1,每一行表示一个样本的所有特征
bn = nn.BatchNorm1d(3) #对每一列进行求均值为0,方差为1,每一列表示所有样本的一个特征
X = torch.tensor([[1,2,3],[8,9,10],[15,16,17]],dtype=torch.float32)
# 在训练模式下计算X的均值和方差
print('layer_norm :',ln(X),'\nbatch_norm : ',bn(X))
输出结果如下:
layer_norm : tensor([[-1.2247e+00,  0.0000e+00,  1.2247e+00],
        [-1.2247e+00,  3.5763e-07,  1.2247e+00],
        [-1.2247e+00,  0.0000e+00,  1.2247e+00]],
       grad_fn=<NativeLayerNormBackward0>) 
batch_norm :  tensor([[-1.2247e+00, -1.2247e+00, -1.2247e+00],
        [ 0.0000e+00,  4.4703e-08, -2.9802e-08],
        [ 1.2247e+00,  1.2247e+00,  1.2247e+00]],
       grad_fn=<NativeBatchNormBackward0>)

使用残差连接和层规范化来实现AddNorm类,暂退法dropout也被作为正则化方法使用:

class AddNorm(nn.Module):
    """残差连接后进行层规范化"""
    def __init__(self,normalized_shape,dropout):
        super(AddNorm,self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(normalized_shape=normalized_shape)
    def forward(self,X,Y):
        return self.layer_norm(self.dropout(Y)+X)

残差连接要求两个输入的形状相同,以便加法操作后输出张量的形状相同:

add_norm = AddNorm(normalized_shape=[3,4],dropout=0.5)
add_norm.eval()
add_norm(torch.ones(size=(2,3,4)),torch.ones(size=(2,3,4)))
输出结果如下:
tensor([[[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]],

        [[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]]], grad_fn=<NativeLayerNormBackward0>)

3. 多头注意力

查看前面博客李沐动手学深度学习V2-多头注意力机制和代码实现

4. 位置编码

查看前面博客李沐动手学深度学习V2-自注意力机制之位置编码

5. 编码器

有了组成transformer编码器的基础组件,现在可以先实现编码器中的一个层。下面的EncoderBlock类包含两个子层:多头自注意力和基于位置的前馈网络,这两个子层都使用了残差连接和紧随的层规范化。

class EncoderBlock(nn.Module):
    """transformer编码器块"""
    def __init__(self,query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,dropout,use_bias=False):
        super(EncoderBlock,self).__init__()
        self.multihead_attention = d2l.torch.MultiHeadAttention(key_size,query_size,value_size,num_hiddens,num_heads,dropout,use_bias)
        self.addnorm1 = AddNorm(normalized_shape,dropout)
        self.ffn = PositionWiseFFN(ffn_num_inputs,ffn_num_hiddens,num_hiddens)
        self.addnorm2 = AddNorm(normalized_shape,dropout)
    def forward(self,X,valid_lens):
        Y = self.addnorm1(X,self.multihead_attention(X,X,X,valid_lens))
        return self.addnorm2(Y,self.ffn(Y))

transformer编码器中的任何层都不会改变其输入的形状,输入输出形状大小相同

X = torch.ones(size=(2,100,24))
#valid_lens中3表示第一个样本序列有100个,这100个样本中前3个是有效样本,后面97个样本是填充的无效样本;
#2表示第二个样本序列有100个,这100个样本中前2个是有效样本,后面98个样本是填充的无效样本
valid_lens = torch.tensor([3,2])
encoder_block = EncoderBlock(query_size=24,key_size=24,value_size=24,num_hiddens=24,normalized_shape=[100,24],ffn_num_inputs=24,ffn_num_hiddens=48,num_heads=8,dropout=0.5,use_bias=False)
encoder_block.eval()
encoder_block(X,valid_lens).shape
输出结果如下:
torch.Size([2, 100, 24])

实现下面transformer编码器的代码中,堆叠了num_layers个EncoderBlock类的实例。由于我们使用的是值范围在 −1 和 1 之间的固定位置编码,因此通过学习得到的输入的嵌入表示的值需要先乘以嵌入维度的平方根进行重新缩放,然后再与位置编码相加。

class TransformerEncoder(d2l.torch.Encoder):
    """transformer编码器"""
    def __init__(self,vocab_size,query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,num_layers,dropout,use_bias=False):
        super(TransformerEncoder,self).__init__()
        self.num_hiddens = num_hiddens
        self.embedding = nn.Embedding(vocab_size,num_hiddens)
        self.positionalEncoding = d2l.torch.PositionalEncoding(num_hiddens,dropout)
        self.encoder_blocks = nn.Sequential()
        for i in range(num_layers):
            self.encoder_blocks.add_module(f'encoder_block{i}',
            EncoderBlock(query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,dropout,use_bias=use_bias))
    def forward(self, X,valid_lens, *args):
        # 因为位置编码值在-1和1之间,
        # 因此嵌入值乘以嵌入维度的平方根进行缩放,
        # 然后再与位置编码相加。
        X = self.positionalEncoding(self.embedding(X)*math.sqrt(self.num_hiddens))
        self.attention_weights = [None]*len(self.encoder_blocks)
        for i,encoder_block in enumerate(self.encoder_blocks):
            X = encoder_block(X,valid_lens)
            self.attention_weights[i] = encoder_block.multihead_attention.attention.attention_weights
        return X

下面指定超参数来创建一个两层的transformer编码器。 Transformer编码器输出的形状是(批量大小,时间步数目,num_hiddens)。

transformer_encoder = TransformerEncoder(200,24,24,24,24,[100,24],24,48,8,2,0.5,use_bias=False)
transformer_encoder.eval()
transformer_encoder(torch.ones(size=(2,100),dtype=torch.long),valid_lens).shape
输出结果如下:
torch.Size([2, 100, 24])

6. 解码器

如上图所示,transformer解码器也是由多个相同的层组成。在DecoderBlock类中实现的每个层包含了三个子层:解码器自注意力、“编码器-解码器”注意力和基于位置的前馈网络。这些子层也都被残差连接和紧随的层规范化围绕
在掩蔽多头解码器自注意力层(第一个子层)中,查询、键和值都来自上一个解码器层的输出。关于序列到序列模型(sequence-to-sequence model),在训练阶段,其输出序列的所有位置(时间步)的词元都是已知的;然而在预测阶段,其输出序列的词元是逐个生成的。因此,在任何解码器时间步中,只有生成的词元才能用于解码器的自注意力计算中。为了在解码器中保留自回归的属性,其掩蔽自注意力设定了参数dec_valid_lens,以便任何查询都只会与解码器中所有已经生成词元的位置(即直到该查询位置为止,该查询位置以及它之前的位置)进行注意力计算

class DecoderBlock(nn.Module):
    """解码器中第i个块"""
    def __init__(self,query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,dropout,i,use_bias=False):
        super(DecoderBlock,self).__init__()
        self.i = i #i表示这是第i个DecoderBlock块
        self.mask_multihead_attention1 = d2l.torch.MultiHeadAttention(key_size,query_size,value_size,num_hiddens,num_heads,dropout,bias=use_bias)
        self.addnorm1 = AddNorm(normalized_shape,dropout)
        self.mutilhead_attention2 = d2l.torch.MultiHeadAttention(key_size,query_size,value_size,num_hiddens,num_heads,dropout,bias=use_bias)
        self.addnorm2 = AddNorm(normalized_shape,dropout)
        self.ffn = PositionWiseFFN(ffn_num_inputs,ffn_num_hiddens,num_hiddens)
        self.addnorm3 = AddNorm(normalized_shape,dropout)
    def forward(self,X,state):
        enc_outputs,enc_valid_lens = state[0],state[1]
        # 训练阶段,输出序列的所有词元都在同一时间处理,
        # 因此state[2][self.i]初始化为None。
        # 预测阶段,输出序列是通过词元一个接着一个解码的,
        # 因此state[2][self.i]包含着直到当前时间步第i个块解码的输出表示

        # 训练时,由于每次都需要调用init_state函数,因此重新训练一个batch时,state[2]始终是一个None列表,当测试时,由于每次根据当前时间步的词元预测下一个词元时都不会重新调用init_state()函数,不会重新初始化state,因此state[2]里面保存的是之前时间步预测出来的词元信息(存的是decoder每层第一个掩码多头注意力state信息)
        if state[2][self.i] is None:
            keys_values = X
        else:
            keys_values = torch.cat([state[2][self.i],X],dim=1)
        state[2][self.i] = keys_values
        if self.training:
            batch_size,num_step,_ = X.shape
            #训练时执行当前时间步的query时只看它前面的keys,values,不看它后面的keys,values。因为预测时是从左往右预测的,右边还没有预测出来,因此右侧的keys是没有的,看不到右侧的keys;训练时预测当前时间步词元能看到后面的目标词元,因此需要dec_valid_lens
            # dec_valid_lens的开头:(batch_size,num_steps),
            # 其中每一行是[1,2,...,num_steps]
            dec_valid_lens = torch.arange(1,num_step+1,device=X.device).repeat(batch_size,1)
            
        else:
            #测试时预测当前时间步的词元只能看到之前预测出来的词元,后面还没预测的词元还看不到,因此dec_valid_lens可以不需要
            dec_valid_lens = None
        # 自注意力
        X2 = self.mask_multihead_attention1(X,keys_values,keys_values,dec_valid_lens)
        Y = self.addnorm1(X,X2)
        # 编码器-解码器注意力。
        # enc_outputs的开头:(batch_size,num_steps,num_hiddens)
        Y2 = self.mutilhead_attention2(Y,enc_outputs,enc_outputs,enc_valid_lens)
        Z = self.addnorm2(Y,Y2)
        return self.addnorm3(Z,self.ffn(Z)),state

为了便于在“编码器-解码器”注意力中进行缩放点积计算和残差连接中进行加法计算,编码器和解码器的特征维度都是num_hiddens。

decoder_block = DecoderBlock(24,24,24,24,[100,24],24,48,8,0.5,0,use_bias=False)
decoder_block.eval()
X = torch.ones(size=(2,100,24))
state = [encoder_block(X,valid_lens),valid_lens,[None]]
decoder_block(X,state)[0].shape
输出结果如下:
torch.Size([2, 100, 24])

构建由num_layers个DecoderBlock实例组成的完整的transformer解码器,最后通过一个全连接层预测输出词元值。解码器的自注意力权重和编码器解码器注意力权重都被存储下来,方便后面可视化的需要。

class TransformerDecoder(d2l.torch.Decoder):
    def __init__(self,vocab_size,query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,num_layers,dropout,use_bias=False):
        super(TransformerDecoder,self).__init__()
        self.num_hiddens = num_hiddens
        self.num_layers = num_layers
        self.embedding = nn.Embedding(vocab_size,num_hiddens)
        self.positionalEncoding = d2l.torch.PositionalEncoding(num_hiddens,dropout)
        self.decoder_blocks = nn.Sequential()
        for i in range(num_layers):
            self.decoder_blocks.add_module(f'decoder_block{i}',
            DecoderBlock(query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,dropout,i,use_bias=use_bias))
        self.dense = nn.Linear(num_hiddens,vocab_size)
    def init_state(self, enc_outputs, enc_valid_lens,*args):
        return [enc_outputs,enc_valid_lens,[None]*self.num_layers]
    def forward(self, X, state):
        X = self.positionalEncoding(self.embedding(X)*math.sqrt(self.num_hiddens))
        self._attention_weights = [[None]*len(self.decoder_blocks) for _ in range(2)]
        for i,decoder_block in enumerate(self.decoder_blocks):
            X,state = decoder_block(X,state)
            # 解码器自注意力权重
            self._attention_weights[0][i] = decoder_block.mask_multihead_attention1.attention.attention_weights
            # “编码器-解码器”自注意力权重
            self._attention_weights[1][i] = decoder_block.mutilhead_attention2.attention.attention_weights
        return self.dense(X),state
    @property
    def attention_weights(self):
        return self._attention_weights

7. 训练

依照transformer架构来实例化编码器-解码器模型,指定transformer的编码器和解码器都是2层,都使用4头注意力。为了进行序列到序列的学习,在“英语-法语”机器翻译数据集上训练transformer模型,训练结果如下图所示。

batch_size,num_steps = 64,10
query_size,key_size,value_size,num_hiddens = 32,32,32,32
normalized_shape = [32]
ffn_num_inputs,ffn_num_hiddens = 32,64
num_heads,num_layers,dropout = 4,2,0.1
use_bias = False
lr,num_epochs,device = 0.005,300,d2l.torch.try_gpu()
train_iter,src_vocab,tgt_vocab = d2l.torch.load_data_nmt(batch_size,num_steps)
transformer_encoder = TransformerEncoder(len(src_vocab),query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,num_layers,dropout,use_bias=use_bias)
transformer_decoder = TransformerDecoder(len(tgt_vocab),query_size,key_size,value_size,num_hiddens,normalized_shape,ffn_num_inputs,ffn_num_hiddens,num_heads,num_layers,dropout,use_bias=use_bias)
net = d2l.torch.EncoderDecoder(transformer_encoder,transformer_decoder)
d2l.torch.train_seq2seq(net,train_iter,lr,num_epochs,tgt_vocab,device)

训练结果
训练结束后,使用transformer模型将一些英语句子翻译成法语,并且计算它们的BLEU分数。

engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .','hi']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .','salut !']
for eng,fra in zip(engs,fras):
    translation,dec_attention_weight_seq = d2l.torch.predict_seq2seq(net,eng,src_vocab,tgt_vocab,num_steps,device,save_attention_weights=True)
    print(f'eng:{eng}==>',f'translation:{translation},',f'BLEU score:{d2l.torch.bleu(translation,fra,k=2)}')
输出结果如下:
eng:go .==> translation:va !, BLEU score:1.0
eng:i lost .==> translation:j'ai perdu ., BLEU score:1.0
eng:he's calm .==> translation:il est calme ., BLEU score:1.0
eng:i'm home .==> translation:je suis chez moi ., BLEU score:1.0
eng:hi==> translation:salut !, BLEU score:1.0

8. 自注意力权重可视化

当进行最后一个英语到法语的句子翻译工作时,可视化transformer的注意力权重。编码器自注意力权重的形状为(编码器层数,注意力头数,num_steps或查询的数目,num_steps或“键-值”对的数目)。

enc_attention_weights = torch.cat(net.encoder.attention_weights,dim=0).reshape((num_layers,num_heads,-1,num_steps))
enc_attention_weights.shape
输出结果如下:
torch.Size([2, 4, 10, 10])

在编码器的自注意力中,查询和键都来自相同的输入序列。因为填充词元是不携带信息的,因此通过指定输入序列的有效长度可以避免查询与使用填充词元的位置计算注意力。接下来将逐行呈现两层多头注意力的权重。每个注意力头都根据查询、键和值的不同的表示子空间来表示不同的注意力。

d2l.torch.show_heatmaps(enc_attention_weights.cpu(),xlabel='Keys Positions',ylabel='Query Positions',titles=['Head %d'% i for i in range(1,5)],figsize=(7,3.5))

注意力权重可视化
为了可视化解码器的自注意力权重和“编码器-解码器”的注意力权重,需要完成更多的数据操作工作。例如我们用零填充被掩蔽住的注意力权重。值得注意的是解码器的自注意力权重和“编码器-解码器”的注意力权重都有相同的查询:即以序列开始词元(beginning-of-sequence,BOS)打头,再与后续输出的词元共同组成序列。

''' 

#解码器注意力权重可视化代码的注释:
#dec_attention_weights_2d里面的元素为:每一步每个层每一个多头注意力每个头对相应key-value的注意力权重[torch.tensor([1])(预测时只有1个key-value),torch.tensor([10])(10个key-value),torch.tensor([3])(3个key-value)]
dec_attention_weights_2d = [head[0].tolist()
                            for step in dec_attention_weight_seq
                            for attn in step for blk in attn for head in blk]
#dec_attention_weights_filled:由于dec_attention_weights_2d中的元素(key-value的注意力权重)tensor形状不同,有torch.tensor([1]),torch.tensor([10]),torch.tensor([3])等,因此需要将这些tensor弄成形状大小相同的tensor,填充的部分值为0
dec_attention_weights_filled = torch.tensor(
    pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)
print(dec_attention_weights_filled.shape)


#dec_attention_weight_seq:表示预测所有步(第一步,第二步,第三步等)所得到的权重组合在一起(为一个list)
#step:表示预测第一步的词元所得到的注意力权重
#attn:表示模型所有层(2层)中第一个多头注意力的权重(也即是每层第一个多头注意力权重组合在一起)
#blk:表示模型第一个多头注意力在第一个层的多头注意力权重(也即是第一层第一个多头注意力权重),形状大小为:torch.Size([4, 1, 1])
#head:表示模型在第一层第一个多头注意力中第一个头的注意力权重,形状大小为:torch.Size([1, 1])
#head[0]:表示模型在第一层第一个多头注意力中第一个头的注意力权重中第一个query对于key_value的权重,形状大小为:torch.Size([1])
for step in dec_attention_weight_seq:
    for attn in step:
        print(len(attn))
        for blk in attn:
            print(blk.shape)
            for head in blk:
                print(head.shape)


#torch.Size([3, 2, 2, 4, 10])表示:预测的每一个词元(有3个预测词元),2表示每一层有两个多头attention,num_layers=2表示有两层decoder-block块,num_heads=4表示每个多头注意力有4个头,num_steps=10表示有10个key-value,对于10个key-value计算得到的10个注意力权重
dec_attention_weights = dec_attention_weights_filled.reshape((-1, 2, num_layers, num_heads, num_steps))
print(dec_attention_weights.shape)

'''
dec_attention_weights_2d = []
for step in dec_attention_weight_seq:
    for attn in step:
        for blk in attn:
            for head in blk:
                dec_attention_weights_2d.append(head[0].tolist())#将query对应的key-value的权重拿出来(head[0]表示将第一个query的所有权重全部拿出来,因为预测是一个进行一个预测,因此query是只有一个的)
dec_attention_weights_filled = torch.tensor(pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)
dec_attention_weights = dec_attention_weights_filled.reshape((-1,2,num_layers,num_heads,num_steps))
dec_self_attention_weights,dec_inter_attention_weights = dec_attention_weights.permute(1,2,3,0,4)#将预测的词元步数调整到第三维,将每个层中多头注意力个数调整到第一维
dec_self_attention_weights.shape,dec_inter_attention_weights.shape #dec_self_attention_weights.shape=torch.Size([2, 4, 3, 10]表示:第一个多头attention中每个层每个头每个预测的词元步数中每个key-values的注意力权重,
#dec_inter_attention_weights.shape=torch.Size([2, 4, 3, 10]表示:第二个多头attention中每个层每个头每个预测的词元步数中每个key-values的注意力权重,
输出结果如下:
(torch.Size([2, 4, 3, 10]), torch.Size([2, 4, 3, 10]))

由于解码器自注意力的自回归属性,查询不会对当前位置之后的“键-值”对进行注意力计算。

d2l.torch.show_heatmaps(dec_self_attention_weights[:,:,:,:len(translation.split())+1],xlabel='Keys Position',ylabel='Queries Position',titles=['Head %d' % i for i in range(1,5)],figsize=(7,3.5))

解码器注意力权重可视化结果
与编码器的自注意力的情况类似,通过指定输入序列的有效长度,查询不会与输入序列中填充位置的词元进行注意力计算。

d2l.torch.show_heatmaps(dec_inter_attention_weights,xlabel='Keys Position',ylabel='Queries Position',titles=['Head %d' % i for i in range(1,5)],figsize=(7,3.5))

解码器注意力权重可视化结果

9. transformer模型全部代码:

import math
import d2l.torch
import torch
from torch import nn
import pandas as pd


class PositionWiseFFN(nn.Module):
    """基于位置的前馈网络"""

    def __init__(self, ffn_num_inputs, ffn_num_hiddens, ffn_num_outputs):
        super(PositionWiseFFN, self).__init__()
        self.dense1 = nn.Linear(ffn_num_inputs, ffn_num_hiddens)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)

    def forward(self, X):
        return self.dense2(self.relu(self.dense1(X)))


positionWiseFFN = PositionWiseFFN(4, 4, 8)
positionWiseFFN.eval()
positionWiseFFN(torch.ones(size=(2, 3, 4)))[0]
ln = nn.LayerNorm(3)  #对每一行进行求均值为0,方差为1,每一行表示一个样本的所有特征
bn = nn.BatchNorm1d(3)  #对每一列进行求均值为0,方差为1,每一列表示所有样本的一个特征
X = torch.tensor([[1, 2, 3], [8, 9, 10], [15, 16, 17]], dtype=torch.float32)
# 在训练模式下计算X的均值和方差
print('layer_norm :', ln(X), '\nbatch_norm : ', bn(X))


class AddNorm(nn.Module):
    """残差连接后进行层规范化"""

    def __init__(self, normalized_shape, dropout):
        super(AddNorm, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(normalized_shape=normalized_shape)

    def forward(self, X, Y):
        return self.layer_norm(self.dropout(Y) + X)


add_norm = AddNorm(normalized_shape=[3, 4], dropout=0.5)
add_norm.eval()
add_norm(torch.ones(size=(2, 3, 4)), torch.ones(size=(2, 3, 4)))


class EncoderBlock(nn.Module):
    """transformer编码器块"""

    def __init__(self, query_size, key_size, value_size, num_hiddens, normalized_shape, ffn_num_inputs, ffn_num_hiddens,
                 num_heads, dropout, use_bias=False):
        super(EncoderBlock, self).__init__()
        self.multihead_attention = d2l.torch.MultiHeadAttention(key_size, query_size, value_size, num_hiddens,
                                                                num_heads, dropout, use_bias)
        self.addnorm1 = AddNorm(normalized_shape, dropout)
        self.ffn = PositionWiseFFN(ffn_num_inputs, ffn_num_hiddens, num_hiddens)
        self.addnorm2 = AddNorm(normalized_shape, dropout)

    def forward(self, X, valid_lens):
        Y = self.addnorm1(X, self.multihead_attention(X, X, X, valid_lens))
        return self.addnorm2(Y, self.ffn(Y))


X = torch.ones(size=(2, 100, 24))
#valid_lens中3表示第一个样本序列有100个,这100个样本中前3个是有效样本,后面97个样本是填充的无效样本;
#2表示第二个样本序列有100个,这100个样本中前2个是有效样本,后面98个样本是填充的无效样本
valid_lens = torch.tensor([3, 2])
encoder_block = EncoderBlock(query_size=24, key_size=24, value_size=24, num_hiddens=24, normalized_shape=[100, 24],
                             ffn_num_inputs=24, ffn_num_hiddens=48, num_heads=8, dropout=0.5, use_bias=False)
encoder_block.eval()
encoder_block(X, valid_lens).shape


class TransformerEncoder(d2l.torch.Encoder):
    """transformer编码器"""

    def __init__(self, vocab_size, query_size, key_size, value_size, num_hiddens, normalized_shape, ffn_num_inputs,
                 ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False):
        super(TransformerEncoder, self).__init__()
        self.num_hiddens = num_hiddens
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.positionalEncoding = d2l.torch.PositionalEncoding(num_hiddens, dropout)
        self.encoder_blocks = nn.Sequential()
        for i in range(num_layers):
            self.encoder_blocks.add_module(f'encoder_block{i}',
                                           EncoderBlock(query_size, key_size, value_size, num_hiddens, normalized_shape,
                                                        ffn_num_inputs, ffn_num_hiddens, num_heads, dropout,
                                                        use_bias=use_bias))

    def forward(self, X, valid_lens, *args):
        # 因为位置编码值在-1和1之间,
        # 因此嵌入值乘以嵌入维度的平方根进行缩放,
        # 然后再与位置编码相加。
        X = self.positionalEncoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        self.attention_weights = [None] * len(self.encoder_blocks)
        for i, encoder_block in enumerate(self.encoder_blocks):
            X = encoder_block(X, valid_lens)
            self.attention_weights[i] = encoder_block.multihead_attention.attention.attention_weights
        return X


transformer_encoder = TransformerEncoder(200, 24, 24, 24, 24, [100, 24], 24, 48, 8, 2, 0.5, use_bias=False)
transformer_encoder.eval()
transformer_encoder(torch.ones(size=(2, 100), dtype=torch.long), valid_lens).shape


class DecoderBlock(nn.Module):
    """解码器中第i个块"""

    def __init__(self, query_size, key_size, value_size, num_hiddens, normalized_shape, ffn_num_inputs, ffn_num_hiddens,
                 num_heads, dropout, i, use_bias=False):
        super(DecoderBlock, self).__init__()
        self.i = i  #i表示这是第i个DecoderBlock块
        self.mask_multihead_attention1 = d2l.torch.MultiHeadAttention(key_size, query_size, value_size, num_hiddens,
                                                                      num_heads, dropout, bias=use_bias)
        self.addnorm1 = AddNorm(normalized_shape, dropout)
        self.mutilhead_attention2 = d2l.torch.MultiHeadAttention(key_size, query_size, value_size, num_hiddens,
                                                                 num_heads, dropout, bias=use_bias)
        self.addnorm2 = AddNorm(normalized_shape, dropout)
        self.ffn = PositionWiseFFN(ffn_num_inputs, ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(normalized_shape, dropout)

    def forward(self, X, state):
        enc_outputs, enc_valid_lens = state[0], state[1]
        # 训练阶段,输出序列的所有词元都在同一时间处理,
        # 因此state[2][self.i]初始化为None。
        # 预测阶段,输出序列是通过词元一个接着一个解码的,
        # 因此state[2][self.i]包含着直到当前时间步第i个块解码的输出表示

        # 训练时,由于每次都需要调用init_state函数,因此重新训练一个batch时,state[2]始终是一个None列表,当测试时,由于每次根据当前时间步的词元预测下一个词元时都不会重新调用init_state()函数,不会重新初始化state,因此state[2]里面保存的是之前时间步预测出来的词元信息(存的是decoder每层第一个掩码多头注意力state信息)
        if state[2][self.i] is None:
            keys_values = X
        else:
            keys_values = torch.cat([state[2][self.i], X], dim=1)
        state[2][self.i] = keys_values
        if self.training:
            batch_size, num_step, _ = X.shape
            #训练时执行当前时间步的query时只看它前面的keys,values,不看它后面的keys,values。因为预测时是从左往右预测的,右边还没有预测出来,因此右侧的keys是没有的,看不到右侧的keys;训练时预测当前时间步词元能看到后面的目标词元,因此需要dec_valid_lens
            # dec_valid_lens的开头:(batch_size,num_steps),
            # 其中每一行是[1,2,...,num_steps]
            dec_valid_lens = torch.arange(1, num_step + 1, device=X.device).repeat(batch_size, 1)

        else:
            #测试时预测当前时间步的词元只能看到之前预测出来的词元,后面还没预测的词元还看不到,因此dec_valid_lens可以不需要
            dec_valid_lens = None
        # 自注意力
        X2 = self.mask_multihead_attention1(X, keys_values, keys_values, dec_valid_lens)
        Y = self.addnorm1(X, X2)
        # 编码器-解码器注意力。
        # enc_outputs的开头:(batch_size,num_steps,num_hiddens)
        Y2 = self.mutilhead_attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
        Z = self.addnorm2(Y, Y2)
        return self.addnorm3(Z, self.ffn(Z)), state


decoder_block = DecoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5, 0, use_bias=False)
decoder_block.eval()
X = torch.ones(size=(2, 100, 24))
state = [encoder_block(X, valid_lens), valid_lens, [None]]
decoder_block(X, state)[0].shape


class TransformerDecoder(d2l.torch.Decoder):
    def __init__(self, vocab_size, query_size, key_size, value_size, num_hiddens, normalized_shape, ffn_num_inputs,
                 ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False):
        super(TransformerDecoder, self).__init__()
        self.num_hiddens = num_hiddens
        self.num_layers = num_layers
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.positionalEncoding = d2l.torch.PositionalEncoding(num_hiddens, dropout)
        self.decoder_blocks = nn.Sequential()
        for i in range(num_layers):
            self.decoder_blocks.add_module(f'decoder_block{i}',
                                           DecoderBlock(query_size, key_size, value_size, num_hiddens, normalized_shape,
                                                        ffn_num_inputs, ffn_num_hiddens, num_heads, dropout, i,
                                                        use_bias=use_bias))
        self.dense = nn.Linear(num_hiddens, vocab_size)

    def init_state(self, enc_outputs, enc_valid_lens, *args):
        return [enc_outputs, enc_valid_lens, [None] * self.num_layers]

    def forward(self, X, state):
        X = self.positionalEncoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        self._attention_weights = [[None] * len(self.decoder_blocks) for _ in range(2)]
        for i, decoder_block in enumerate(self.decoder_blocks):
            X, state = decoder_block(X, state)
            # 解码器自注意力权重
            self._attention_weights[0][i] = decoder_block.mask_multihead_attention1.attention.attention_weights
            # “编码器-解码器”自注意力权重
            self._attention_weights[1][i] = decoder_block.mutilhead_attention2.attention.attention_weights
        return self.dense(X), state

    @property
    def attention_weights(self):
        return self._attention_weights


import os
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
    """Train a model for sequence to sequence.

    Defined in :numref:`sec_seq2seq_decoder`"""

    def xavier_init_weights(m):
        if type(m) == nn.Linear:
            nn.init.xavier_uniform_(m.weight)
        if type(m) == nn.GRU:
            for param in m._flat_weights_names:
                if "weight" in param:
                    nn.init.xavier_uniform_(m._parameters[param])

    net.apply(xavier_init_weights)
    #net.to(device)
    #net = net.to(device[0])
    print(device)
    gpu0 = torch.device(device[0])
    net = nn.DataParallel(module=net, device_ids=device)
    net = net.to(gpu0)

    #net = nn.parallel.DistributedDataParallel(module=net,device_ids=device,broadcast_buffers=False)
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    loss = d2l.torch.MaskedSoftmaxCELoss()
    net.train()
    animator = d2l.torch.Animator(xlabel='epoch', ylabel='loss',
                                  xlim=[10, num_epochs])
    for epoch in range(num_epochs):
        timer = d2l.torch.Timer()
        metric = d2l.torch.Accumulator(2)  # Sum of training loss, no. of tokens
        for batch in data_iter:
            optimizer.zero_grad()
            X, X_valid_len, Y, Y_valid_len = [x.to(gpu0) for x in batch]
            bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
                               device=gpu0).reshape(-1, 1)
            #print(X.device,Y.device,X_valid_len.device,Y_valid_len.device,bos.device)
            dec_input = d2l.torch.concat([bos, Y[:, :-1]], 1)  # Teacher forcing
            dec_input = dec_input.to(gpu0)
            #print(dec_input.device)
            #net.cuda()
            Y_hat, _ = net(X, dec_input, X_valid_len)
            l = loss(Y_hat, Y, Y_valid_len)
            l.sum().backward()  # Make the loss scalar for `backward`
            d2l.torch.grad_clipping(net, 1)
            num_tokens = Y_valid_len.sum()
            optimizer.step()
            with torch.no_grad():
                metric.add(l.sum(), num_tokens)
        if (epoch + 1) % 10 == 0:
            animator.add(epoch + 1, (metric[0] / metric[1],))
    print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
          f'tokens/sec on {str(device)}')


batch_size, num_steps = 64, 10
query_size, key_size, value_size, num_hiddens = 32, 32, 32, 32
normalized_shape = [32]
ffn_num_inputs, ffn_num_hiddens = 32, 64
num_heads, num_layers, dropout = 4, 2, 0.1
use_bias = False
lr, num_epochs, device = 0.005, 300, d2l.torch.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.torch.load_data_nmt(batch_size, num_steps)
transformer_encoder = TransformerEncoder(len(src_vocab), query_size, key_size, value_size, num_hiddens,
                                         normalized_shape, ffn_num_inputs, ffn_num_hiddens, num_heads, num_layers,
                                         dropout, use_bias=use_bias)
transformer_decoder = TransformerDecoder(len(tgt_vocab), query_size, key_size, value_size, num_hiddens,
                                         normalized_shape, ffn_num_inputs, ffn_num_hiddens, num_heads, num_layers,
                                         dropout, use_bias=use_bias)
net = d2l.torch.EncoderDecoder(transformer_encoder, transformer_decoder)
d2l.torch.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
#train_seq2seq(net,train_iter,lr,num_epochs,tgt_vocab,device)
# engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
# fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .', 'hi']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .', 'salut !']
for eng, fra in zip(engs, fras):
    translation, dec_attention_weight_seq = d2l.torch.predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device,
                                                                      save_attention_weights=True)
    print(f'eng:{eng}==>', f'translation:{translation},', f'BLEU score:{d2l.torch.bleu(translation, fra, k=2)}')
enc_attention_weights = torch.cat(net.encoder.attention_weights, dim=0).reshape((num_layers, num_heads, -1, num_steps))
enc_attention_weights.shape
d2l.torch.show_heatmaps(enc_attention_weights.cpu(), xlabel='Keys Positions', ylabel='Query Positions',
                        titles=['Head %d' % i for i in range(1, 5)], figsize=(7, 3.5))

'''
#dec_attention_weights_2d里面的元素为:每一步每个层每一个多头注意力每个头对相应key-value的注意力权重[torch.tensor([1])(预测时只有1个key-value),torch.tensor([10])(10个key-value),torch.tensor([3])(3个key-value)]
dec_attention_weights_2d = [head[0].tolist()
                            for step in dec_attention_weight_seq
                            for attn in step for blk in attn for head in blk]
#dec_attention_weights_filled:由于dec_attention_weights_2d中的元素(key-value的注意力权重)tensor形状不同,有torch.tensor([1]),torch.tensor([10]),torch.tensor([3])等,因此需要将这些tensor弄成形状大小相同的tensor,填充的部分值为0
dec_attention_weights_filled = torch.tensor(
    pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)
print(dec_attention_weights_filled.shape)
#dec_attention_weight_seq:表示预测所有步(第一步,第二步,第三步等)所得到的权重组合在一起(为一个list)
#step:表示预测第一步的词元所得到的注意力权重
#attn:表示模型所有层(2层)中第一个多头注意力的权重(也即是每层第一个多头注意力权重组合在一起)
#blk:表示模型第一个多头注意力在第一个层的多头注意力权重(也即是第一层第一个多头注意力权重),形状大小为:torch.Size([4, 1, 1])
#head:表示模型在第一层第一个多头注意力中第一个头的注意力权重,形状大小为:torch.Size([1, 1])
#head[0]:表示模型在第一层第一个多头注意力中第一个头的注意力权重中第一个query对于key_value的权重,形状大小为:torch.Size([1])
for step in dec_attention_weight_seq:
    for attn in step:
        print(len(attn))
        for blk in attn:
            print(blk.shape)
            for head in blk:
                print(head.shape)
#torch.Size([3, 2, 2, 4, 10])表示:预测的每一个词元(有3个预测词元),2表示每一层有两个多头attention,num_layers=2表示有两层decoder-block块,num_heads=4表示每个多头注意力有4个头,num_steps=10表示有10个key-value,对于10个key-value计算得到的10个注意力权重
dec_attention_weights = dec_attention_weights_filled.reshape((-1, 2, num_layers, num_heads, num_steps))
print(dec_attention_weights.shape)
'''
dec_attention_weights_2d = []
for step in dec_attention_weight_seq:
    for attn in step:
        for blk in attn:
            for head in blk:
                dec_attention_weights_2d.append(head[
                                                    0].tolist())  #将query对应的key-value的权重拿出来(head[0]表示将第一个query的所有权重全部拿出来,因为预测是一个进行一个预测,因此query是只有一个的)
dec_attention_weights_filled = torch.tensor(pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)
dec_attention_weights = dec_attention_weights_filled.reshape((-1, 2, num_layers, num_heads, num_steps))
dec_self_attention_weights, dec_inter_attention_weights = dec_attention_weights.permute(1, 2, 3, 0,
                                                                                        4)  #将预测的词元步数调整到第三维,将每个层中多头注意力个数调整到第一维
dec_self_attention_weights.shape, dec_inter_attention_weights.shape  #dec_self_attention_weights.shape=torch.Size([2, 4, 3, 10]表示:第一个多头attention中每个层每个头每个预测的词元步数中每个key-values的注意力权重,
#dec_inter_attention_weights.shape=torch.Size([2, 4, 3, 10]表示:第二个多头attention中每个层每个头每个预测的词元步数中每个key-values的注意力权重,
d2l.torch.show_heatmaps(dec_self_attention_weights[:, :, :, :len(translation.split()) + 1], xlabel='Keys Position',
                        ylabel='Queries Position', titles=['Head %d' % i for i in range(1, 5)], figsize=(7, 3.5))
d2l.torch.show_heatmaps(dec_inter_attention_weights, xlabel='Keys Position', ylabel='Queries Position',
                        titles=['Head %d' % i for i in range(1, 5)], figsize=(7, 3.5))

三. 小结

  • transformer是编码器-解码器架构的一个实践,尽管在实际情况中编码器或解码器可以单独使用。
  • 在transformer中,多头自注意力用于表示输入序列和输出序列,不过解码器必须通过掩蔽机制来保留自回归属性。
  • transformer中的残差连接和层规范化是训练非常深度模型的重要工具。
  • transformer模型中基于位置的前馈网络使用同一个多层感知机,作用是对所有序列位置的表示进行转换。

四. 相关链接

注意力机制第一篇:李沐动手学深度学习V2-注意力机制
注意力机制第二篇:李沐动手学深度学习V2-注意力评分函数
注意力机制第三篇:李沐动手学深度学习V2-基于注意力机制的seq2seq
注意力机制第四篇:李沐动手学深度学习V2-自注意力机制之位置编码
注意力机制第五篇:李沐动手学深度学习V2-自注意力机制
注意力机制第六篇:李沐动手学深度学习V2-多头注意力机制和代码实现
注意力机制第七篇:李沐动手学深度学习V2-transformer和代码实现

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐