1. 引言

对于许多新接触深度学习爱好者来说,玩AutoEncoder总是很有趣的,因为它具有简单的处理逻辑、简易的网络架构,方便可视化潜在的特征空间。在本文中,我将从头开始介绍一个简单的AutoEncoder模型,以及一些可视化潜在特征空间的一些的方法,以便使本文变得生动有趣。

闲话少说,我们直接开始吧!

2. 数据集介绍

在本文中,我们使用FashionMNIST数据集来完成此任务。
在这里插入图片描述

以下是Kaggle上数据集的链接:戳我
该数据集已在torchvision库中集成;我们可以通过几行代码直接导入和处理该数据集。

为此,首先需要是编写一个collate_fn函数,将数据集从PIL图像转换为torch张量,并进行相应的pad操作:

# This function convert the PIL images to tensors then pad them
def collate_fn(batch):
    process = transforms.Compose([
                transforms.ToTensor(),
                transforms.Pad([2])]
                )
    # x - images; we process each image in the batch
    x = [process(data[0]) for data in batch]
    x = torch.concat(x).unsqueeze(1)
    # y - labels, note that we should convert the labels to LongTensor
    y = torch.LongTensor([data[1] for data in batch])
    return x, y

3. 实现DataLoader

接着,我们就可以使用以下代码来完成相应的DataLoader的实现:

labels = ["T-shirt/top", "Trouser", "Pullover", "Dress","Coat", 
          "Sandla", "Shirt", "Sneaker", "Bag", "Ankle boot"]

# download/load dataset
train_data = FashionMNIST("./MNIST_DATA", train=True, download=True)
valid_data = FashionMNIST("./MNIST_DATA", train=False, download=True)

# put datasets into dataloaders
train_loader = DataLoader(train_data, batch_size=config["batch_size"], 
                          shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_data, batch_size=config["batch_size"], 
                           shuffle=False, collate_fn=collate_fn)

接着我们可以使用以下代码来检验上述代码是否符合我们的预期,测试代码如下:

print("Inspecting train data: ")
for _, data in enumerate(train_loader):
    print("Batch shape: ", data[0].shape)
    fig, ax = plt.subplots(1, 4, figsize=(10, 4))
    for i in range(4):
        # Ture 3D tensor to 2D tensor due to image's single channel
        ax[i].imshow(data[0][i].squeeze(), cmap="gray")
        ax[i].axis("off")
        ax[i].set_title(labels[data[1][i]])
    plt.show()
    # And don't forget to break
    break

运行结果如下:
在这里插入图片描述
观察上图,图像和标签一一对应关系正常,接着我们就可以进入我们的网络设计部分。

4. 实现encoder

我们知道自编码器是由编码器encoder和解码器decoder实现的,其中编码器的作用为将输入的图像编码为特征空间的特征向量,解码器的作用相反,尽可能的将上述特征向量结果恢复为原图。基于此,我们首先来一步步实现编码器。首先,我们来定义模型的基本超参数如下:

# Model parameters:
LAYERS = 3
KERNELS = [3, 3, 3]
CHANNELS = [32, 64, 128]
STRIDES = [2, 2, 2]
LINEAR_DIM = 2048

同时相应的编码器的网络结构设计如下:

class Encoder(nn.Module):
    def __init__(self, output_dim=2, use_batchnorm=False, use_dropout=False):
        super(Encoder, self).__init__()
        # bottleneck dimentionality
        self.output_dim = output_dim
        # variables deciding if using dropout and batchnorm in model
        self.use_dropout = use_dropout
        self.use_batchnorm = use_batchnorm
        # convolutional layer hyper parameters
        self.layers = LAYERS
        self.kernels = KERNELS
        self.channels = CHANNELS
        self.strides = STRIDES
        self.conv = self.get_convs()
        # layers for latent space projection
        self.fc_dim = LINEAR_DIM
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(self.fc_dim, self.output_dim)
    def get_convs(self):
        """
        generating convolutional layers based on model's hyper parameters
        """
        conv_layers = nn.Sequential()
        for i in range(self.layers):
            # The input channel of the first layer is 1
            if i == 0: conv_layers.append(nn.Conv2d(1, 
                                              self.channels[i], 
                                              kernel_size=self.kernels[i],
                                              stride=self.strides[i],
                                              padding=1))
            
            else: conv_layers.append(nn.Conv2d(self.channels[i-1], 
                                         self.channels[i],
                                         kernel_size=self.kernels[i],
                                         stride=self.strides[i],
                                         padding=1))
            
            if self.use_batchnorm:
                conv_layers.append(nn.BatchNorm2d(self.channels[i]))
            
            # Here we use GELU as activation function
            conv_layers.append(nn.GELU()) 
            if self.use_dropout:
                conv_layers.append(nn.Dropout2d(0.15))
        return conv_layers
  
    def forward(self, x):
        x = self.conv(x)
        x = self.flatten(x)
        return self.linear(x)

在Pytorch中torchsummary是一个非常方便的工具,用于检查和调试模型的网络结构;我们可以检查层、每层中的张量形状以及模型的参数。代码如下:

from torchsummary import summary
# Get the summary of autoencoder architecture
encoder = Encoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(encoder, (1, 32, 32))
pass

得到输出如下:
在这里插入图片描述

5. 实现decoder

在我们的例子中,解码器层decoder是编码器的反向操作;确保每一层的输入和输出形状是很重要的。此外,我们应该调整转置卷积层中的paddingoutput_pading参数,以确保输出图像和输入图像的维度相同。代码实现如下:

class Decoder(nn.Module):
    def __init__(self, input_dim=2, use_batchnorm=False, use_dropout=False):
        super(Decoder, self).__init__()
        # variables deciding if using dropout and batchnorm in model
        self.use_dropout = use_dropout
        self.use_batchnorm = use_batchnorm
        self.fc_dim = LINEAR_DIM
        self.input_dim = input_dim
        # Conv layer hypyer parameters
        self.layers = LAYERS
        self.kernels = KERNELS
        self.channels = CHANNELS[::-1] # flip the channel dimensions
        self.strides = STRIDES
        
        # In decoder, we first do fc project, then conv layers
        self.linear = nn.Linear(self.input_dim, self.fc_dim)
        self.conv =  self.get_convs()
        self.output = nn.Conv2d(self.channels[-1], 1, kernel_size=1, stride=1)

    def get_convs(self):
        conv_layers = nn.Sequential()
        for i in range(self.layers):
            if i == 0: conv_layers.append(
                            nn.ConvTranspose2d(self.channels[i],
                                               self.channels[i],
                                               kernel_size=self.kernels[i],
                                               stride=self.strides[i],
                                               padding=1,
                                               output_padding=1)
                            )
            
            else: conv_layers.append(
                            nn.ConvTranspose2d(self.channels[i-1], 
                                               self.channels[i],
                                               kernel_size=self.kernels[i],
                                               stride=self.strides[i],
                                               padding=1,
                                               output_padding=1
                                              )
                            )
            if self.use_batchnorm and i != self.layers - 1:
                conv_layers.append(nn.BatchNorm2d(self.channels[i]))
            conv_layers.append(nn.GELU())
            if self.use_dropout:
                conv_layers.append(nn.Dropout2d(0.15))
        return conv_layers
   
    def forward(self, x):
        x = self.linear(x)
        # reshape 3D tensor to 4D tensor
        x = x.reshape(x.shape[0], 128, 4, 4)
        x = self.conv(x)
        return self.output(x)

相应的解码器实现如下:

decoder = Decoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(decoder, (1, 2))
pass

运行后,结果如下:
在这里插入图片描述

6. 实现自编码器

接着,我们将上述编码器和解码器串联起来,代码实现如下:

class AutoEncoder(nn.Module):
    
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder(output_dim=2, use_batchnorm=True, use_dropout=False)
        self.decoder = Decoder(input_dim=2, use_batchnorm=True, use_dropout=False)
        
    def forward(self, x):
        return self.decoder(self.encoder(x))

model = AutoEncoder().to(DEVICE)
summary(model, (1, 32, 32))
pass

得到结果如下:
在这里插入图片描述

7. 可视化函数

在进入训练部分之前,让我们花一些时间编写一个函数来可视化我们模型的潜在特征空间,即编码后二维特征向量的可视化表示。

def plotting(step:int=0, show=False):
    model.eval() # Switch the model to evaluation mode
    points = []
    label_idcs = []
    path = "./ScatterPlots"
    if not os.path.exists(path): os.mkdir(path)
    for i, data in enumerate(valid_loader):
        img, label = [d.to(DEVICE) for d in data]
        # We only need to encode the validation images
        proj = model.encoder(img)
        points.extend(proj.detach().cpu().numpy())
        label_idcs.extend(label.detach().cpu().numpy())
        del img, label
    
    points = np.array(points)
    # Creating a scatter plot
    fig, ax = plt.subplots(figsize=(10, 10) if not show else (8, 8))
    scatter = ax.scatter(x=points[:, 0], y=points[:, 1], s=2.0, 
                c=label_idcs, cmap='tab10', alpha=0.9, zorder=2)
    
    ax.spines["right"].set_visible(False)
    ax.spines["top"].set_visible(False)
    
    if show: 
        ax.grid(True, color="lightgray", alpha=1.0, zorder=0)
        plt.show()
    else: 
        # Do not show but only save the plot in training
        plt.savefig(f"{path}/Step_{step:03d}.png", bbox_inches="tight")
        plt.close() # don't forget to close the plot, or it is always in memory
        model.train()

以下是训练过程中生成的图;该过程显示了模型的潜在空间随时间的分布,可以看出尽管有个别离群点,整体不同类别的数据在特征空间呈现出聚类趋势:
在这里插入图片描述

8. 损失函数

在编写训练和验证函数之前,还有一个步骤是定义目标函数和优化方法。由于自动编码器是一个自监督模型,输入也是网络输出重建图像逼近的对象,因此我们可以使用MSE(均方误差)损失来评估输入和重建图像之间的逐像素损失。当然有很多优化器可供选择,这里我选择的是AdamW,因为我在过去几个月里经常使用它。

criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"], weight_decay=1e-5)

# For mixed precision training
scaler = torch.cuda.amp.GradScaler()
steps = 0 # tracking the training steps

9. 训练函数

接着我们来定义训练一个epoch的函数,代码实现如下:

def train(model, dataloader, criterion, optimizer, save_distrib=False):
    # steps is used to track training progress, purely for latent space plots
    global steps 
    model.train()
    train_loss = 0.0
    
    # Process tqdm bar, helpful for monitoring training process
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True, 
                     leave=False, position=0, desc="Train")
    for i, batch in enumerate(dataloader):
        optimizer.zero_grad()
        x = batch[0].to(DEVICE)
        
        # Here we implement the mixed precision training
        with torch.cuda.amp.autocast():
            y_recons = model(x)
            loss = criterion(y_recons, x)
        
        train_loss += loss.item()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        batch_bar.set_postfix(
            loss=f"{train_loss/(i+1):.4f}",
            lr = f"{optimizer.param_groups[0]['lr']:.4f}"
        )
        batch_bar.update()        

        # Saving latent space plots
        if steps % 10 == 0 and save_distrib and steps <= 400: plotting(steps)
        steps += 1        
        
        # remove unnecessary cache in CUDA memory
        torch.cuda.empty_cache()
        del x, y_recons
    
    batch_bar.close()
    train_loss /= len(dataloader)

    return train_loss

10 验证函数

相应的验证函数的实现稍微简单一点,代码如下:

def validate(model, dataloader, criterion):
    model.eval() # Don't forget to turn the model to eval mode
    valid_loss = 0.0
    # Progress tqdm bar
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True,
                     leave=False, position=0, desc="Validation")
    
    for i, batch in enumerate(dataloader):
        x = batch[0].to(DEVICE)
        with torch.no_grad(): # we don't need gradients in validation
            y_recons = model(x)
            loss = criterion(y_recons, x)
        valid_loss += loss.item()
        batch_bar.set_postfix(
            loss=f"{valid_loss/(i+1):.4f}",
            lr = f"{optimizer.param_groups[0]['lr']:.4f}"
        )
        batch_bar.update()
        torch.cuda.empty_cache()
        del x, y_recons
    
    batch_bar.close()
    valid_loss /= len(dataloader)
    return valid_loss

11 训练过程

接着,我们将上述代码串起来,来实现我们模型的训练,由于FashionMNIST是一个很小的数据集,我们实际上不需要大量训练;初始训练和验证损失非常低,并且在三个epoch之后没有太大的改进空间。

for i in range(config["epochs"]):

    curr_lr = float(optimizer.param_groups[0]["lr"])
    train_loss = train(model, train_loader, criterion, 
                       optimizer, save_distrib=True)
    valid_loss = validate(model, valid_loader, criterion)

    print(f"Epoch {i+1}/{config['epochs']}\nTrain loss: {train_loss:.4f}\t Validation loss: {valid_loss:.4f}\tlr: {curr_lr:.4f}")

输出如下:
在这里插入图片描述

12 结果可视化

我们现在可以再次绘制和检查收敛后的特征空间,可视化输出如下:
在这里插入图片描述
观察上图可知,相应的聚类后的效果比训练过程中的要好,但有些个别类混合在同一集群中。这个问题可以通过增加编码器输出的特征向量的维度或使用其他损失函数函数来解决。

13 预测效果可视化

为了验证我们的解码器确实学到了东西,我们可以在随机绘制一些离散点来观察解码器重建图像的效果,代码如下:

# randomly sample x and y values
xs = [random.uniform(-6.0, 8.0) for i in range(8)]
ys = [random.uniform(-7.5, 10.0) for i in range(8)]

points = list(zip(xs, ys))
coords = torch.tensor(points).unsqueeze(1).to(DEVICE)
nrows, ncols = 2, 4
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 5))
model.eval()
with torch.no_grad():
    generates = [model.decoder(coord) for coord in coords]
# plot points
idx = 0
for row in range(0, nrows):
    for col in range(0, ncols):
        ax = axes[row, col]
        im = generates[idx].squeeze().detach().cpu()
        ax.imshow(im, cmap="gray")
        ax.axis("off")
        coord = coords[idx].detach().cpu().numpy()[0]
        ax.set_title(f"({coord[0]:.3f}, {coord[1]:.3f})")
        idx += 1

plt.show()

代码输出如下:
在这里插入图片描述

14. 总结

本文重点介绍了如何利用Pytorch来实现自编码器,从数据集,到搭建网络结构,以及特征可视化和网络预测输出几个方面,分别进行了详细的阐述,并给出了相应的代码示例。

您学废了吗?

完整代码链接:戳我

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐