直接训练SNN:从LIF模型到MNIST分类的完整实战【含源码】
我们计划使用原生Python代码直接训练SNN,并在相同的精度、超参数和网络结构下与SpikingJelly进行精度对比。以下是基准方法和相关教程的链接:>[时间驱动:使用单层全连接SNN识别MNIST — SpikingJelly alpha 文档](https://spikingjelly.readthedocs.io/zh-cn/0.0.0.0.8/clock_driven/3_fc_mni
我们计划使用原生Python代码直接训练SNN,并在相同的精度、超参数和网络结构下与SpikingJelly进行精度对比。以下是基准方法和相关教程的链接:
在直接训练SNN时,我们需要实现以下三个方面:
- LIF神经元:实现充电、发射脉冲、重置等操作。
- 编码方式:将连续值转换为适合SNN输入的形式。
- BPTT(反向传播算法):实现SNN的反向传播算法以更新参数。
第一步、构建神经元
Leaky Integrate-and-Fire (LIF) 神经元模型是SNN中最常用的神经元模型之一。它模拟了生物神经元的发放脉冲(spiking)的过程。该模型可以通过以下数学方程描述:
1. 电流输入和膜电位的更新
LIF神经元的膜电位 V ( t ) V(t) V(t) 随时间 t t t 更新,受输入电流 I ( t ) I(t) I(t) 的影响。LIF神经元的核心动态方程为:
τ m d V ( t ) d t = − V ( t ) + R m I ( t ) \tau_m \frac{dV(t)}{dt} = -V(t) + R_m I(t) τmdtdV(t)=−V(t)+RmI(t)
- V ( t ) V(t) V(t) 是时间 t t t 时刻的膜电位。
- τ m \tau_m τm 是膜时间常数,决定了膜电位的衰减速度。
- R m R_m Rm 是电阻, R m I ( t ) R_m I(t) RmI(t) 表示输入电流对膜电位的贡献。
- I ( t ) I(t) I(t) 是时间 t t t 时刻的输入电流。
2. 膜电位的更新离散化
在计算机中,我们通常将上述微分方程离散化,用离散的时间步 Δ t \Delta t Δt 进行计算。离散化后的膜电位更新公式为:
V ( t + Δ t ) = V ( t ) + Δ t τ m ( − V ( t ) + R m I ( t ) ) V(t + \Delta t) = V(t) + \frac{\Delta t}{\tau_m} \left( -V(t) + R_m I(t) \right) V(t+Δt)=V(t)+τmΔt(−V(t)+RmI(t))
3. 发射脉冲机制
当膜电位 V ( t ) V(t) V(t) 达到或超过某个阈值 V t h V_{th} Vth 时,LIF神经元会发射一个脉冲(spike),并且膜电位立即重置为 V r e s e t V_{reset} Vreset。这一过程可以表示为:
if V ( t ) ≥ V t h , then V ( t ) ← V r e s e t , emit spike \text{if } V(t) \geq V_{th}, \text{ then } V(t) \leftarrow V_{reset}, \text{ emit spike} if V(t)≥Vth, then V(t)←Vreset, emit spike
在计算机代码中实现的原理
在代码中,我们需要实现上述LIF神经元的动态过程。具体来说:
- 初始化神经元参数:包括膜时间常数 τ m \tau_m τm、电阻 R m R_m Rm 、阈值 V t h V_{th} Vth、重置电位 V r e s e t V_{reset} Vreset 等。
- 输入电流的处理:输入电流可以是一个随时间变化的函数或一个常值。
- 膜电位的更新:每个时间步都要更新膜电位,并检查是否超过阈值。
- 脉冲发射和重置:如果膜电位超过阈值,发射脉冲并重置膜电位。
LIF神经元的实现代码
import numpy as np
import matplotlib.pyplot as plt
from spikingjelly.clock_driven import base
class BaseNode(base.MemoryModule):
def __init__(self, v_threshold: float = 1., v_reset: float = 0., detach_reset: bool = False):
super().__init__()
if v_reset is None:
self.register_memory('v', 0.)
else:
self.register_memory('v', v_reset)
self.register_memory('v_threshold', v_threshold)
self.register_memory('v_reset', v_reset)
self.detach_reset = detach_reset
def neuronal_charge(self, x: torch.Tensor):
if self.decay_input:
if self.v_reset is None or self.v_reset == 0.:
self.v = self.v + (x - self.v) / self.tau
def neuronal_fire(self):
"""
根据当前神经元的电压、阈值,计算输出脉冲。
"""
# return self.surrogate_function(self.v - self.v_threshold)
# return sigmoid(self.v - self.v_threshold)
return sigmoid(self.v - self.v_threshold)
def neuronal_reset(self, spike):
"""
根据当前神经元释放的脉冲,对膜电位进行重置。
"""
spike_d = spike
self.v = (1. - spike_d) * self.v + spike_d * self.v_reset
def forward(self, x: torch.Tensor):
"""
:param x: 输入到神经元的电压增量
:type x: torch.Tensor
:return: 神经元的输出脉冲
:rtype: torch.Tensor
按照充电、放电、重置的顺序进行前向传播。
"""
self.neuronal_charge(x)
spike = self.neuronal_fire()
self.neuronal_reset(spike)
return spike
class LIFNode(BaseNode):
def __init__(self, tau: float = 2., decay_input: bool = True, v_threshold: float = 1.,
v_reset: float = 0.,
detach_reset: bool = False):
"""
:param tau: 膜电位时间常数
:param decay_input: 输入是否会衰减
:param v_threshold: 神经元的阈值电压
:param v_reset: 神经元的重置电压。如果不为 ``None``,当神经元释放脉冲后,电压会被重置为 ``v_reset``;
如果设置为 ``None``,则电压会被减去 ``v_threshold``
:param surrogate_function: 反向传播时用来计算脉冲函数梯度的替代函
Leaky Integrate-and-Fire 神经元模型,可以看作是带漏电的积分器。其阈下神经动力学方程为:
若 ``decay_input == True``:
.. math::
V[t] = V[t-1] + \\frac{1}{\\tau}(X[t] - (V[t-1] - V_{reset}))
若 ``decay_input == False``:
.. math::
V[t] = V[t-1] - \\frac{1}{\\tau}(V[t-1] - V_{reset}) + X[t]
"""
assert isinstance(tau, float) and tau > 1.
super().__init__(v_threshold, v_reset, detach_reset)
self.tau = tau
self.decay_input = decay_input
def forward(self, x: torch.Tensor):
return super().forward(x)
代码说明
这段代码实现了 Leaky Integrate-and-Fire (LIF) 神经元模型,并且基于 SpikingJelly 库中的 BaseNode
类,进一步扩展了 LIFNode
。下面将结合前面描述的原理,对代码进行详细说明。
1. BaseNode 类
BaseNode
类是一个基本的神经元类,定义了神经元的核心操作,包括膜电位的充电(neuronal_charge
)、脉冲的发放(neuronal_fire
)、以及膜电位的重置(neuronal_reset
)。这些操作正对应了 LIF 神经元的三大过程:
膜电位的充电(neuronal_charge
)
def neuronal_charge(self, x: torch.Tensor):
if self.decay_input:
if self.v_reset is None or self.v_reset == 0.:
self.v = self.v + (x - self.v) / self.tau
这个函数对应了膜电位随输入电流变化的更新过程。这里用的是离散化后的膜电位更新公式:
V ( t + Δ t ) = V ( t ) + Δ t τ m ( − V ( t ) + R m I ( t ) ) V(t + \Delta t) = V(t) + \frac{\Delta t}{\tau_m} \left( -V(t) + R_m I(t) \right) V(t+Δt)=V(t)+τmΔt(−V(t)+RmI(t))
在实现中,self.v
表示当前神经元的膜电位,x
代表输入电流。参数 tau
表示膜时间常数,用来决定膜电位的衰减速度。decay_input
为 True
时,电位会根据输入电流和时间常数进行衰减。对于 v_reset
为 0 的情况,膜电位直接进行更新。
脉冲的发放(neuronal_fire
)
def neuronal_fire(self):
return sigmoid(self.v - self.v_threshold)
这部分计算是否要发射脉冲。当膜电位超过阈值 v_threshold
时,神经元发射脉冲。在这里,使用了 sigmoid
函数模拟发射脉冲的过程,尽管生物神经元发射脉冲是一个离散事件(发射或不发射),但在反向传播中使用光滑的函数(如 sigmoid
)可以方便计算梯度。
膜电位的重置(neuronal_reset
)
def neuronal_reset(self, spike):
spike_d = spike
self.v = (1. - spike_d) * self.v + spike_d * self.v_reset
当发射脉冲后,膜电位需要重置。重置的电位由 v_reset
控制。当 spike
(代表发射脉冲)为 1 时,膜电位被重置为 v_reset
;否则,膜电位保持原值。该过程对应了公式中:
if V ( t ) ≥ V t h , then V ( t ) ← V r e s e t \text{if } V(t) \geq V_{th}, \text{ then } V(t) \leftarrow V_{reset} if V(t)≥Vth, then V(t)←Vreset
2. LIFNode 类
LIFNode
类继承了 BaseNode
,并且实现了 LIF 神经元特有的动力学。主要的新增属性包括:
tau
: 膜时间常数。decay_input
: 控制输入电流是否衰减。
forward 函数
def forward(self, x: torch.Tensor):
return super().forward(x)
forward
函数负责执行神经元的前向传播过程。这里通过 super()
调用了父类的 forward()
函数,按照以下顺序进行神经元的状态更新:
- 调用
neuronal_charge(x)
更新膜电位。 - 调用
neuronal_fire()
计算脉冲发放。 - 调用
neuronal_reset()
对膜电位进行重置。
3. 代码执行流程
每当有新的输入 x
(表示输入电流)进入 LIF 神经元时,以下步骤依次执行:
- 充电:首先,神经元的膜电位根据输入电流进行更新,并考虑膜时间常数
tau
和输入电流是否衰减。 - 发射脉冲:当膜电位超过阈值
v_threshold
时,神经元发射脉冲。这里通过sigmoid
函数模拟了这个过程。 - 重置膜电位:如果发射了脉冲,膜电位被重置为
v_reset
。
4. 代码的扩展
在实际使用中,LIFNode
的实例会嵌入到更大的网络结构中。通过这个神经元模型,整个网络可以利用 LIF 的脉冲机制来处理时间序列数据。
第二步、编码方式-Poisson编码
1. Poisson编码简介
Poisson编码是一种常用于SNN的数据编码方式,其基本思想是将输入数据(例如图像的像素值)转化为脉冲序列(spike train)。具体来说,输入的连续值被转换为脉冲发射的概率,发射脉冲的时间点服从泊松分布。
2. Poisson编码的步骤
- 输入数据归一化:通常将图像的像素值归一化到 [ 0 , 1 ] [0, 1] [0,1] 区间。
- 脉冲发射概率:对于每个像素值 x x x,我们将其视为在给定时间步内发射脉冲的概率 P P P。
- 脉冲生成:在每个时间步,通过比较随机生成的数与发射概率 P P P,决定是否发射脉冲。
3. Poisson编码的代码实现
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
class PoissonEncoder(nn.Module):
def __init__(self):
"""
Poisson Encoder 将输入 `x` 转换为脉冲信号,脉冲的发放概率与 `x` 相同。
`x` 的取值范围必须在 `[0, 1]` 之间。
"""
super(PoissonEncoder, self).__init__()
def forward(self, x: torch.Tensor):
# 使用与输入张量相同形状的随机张量,并比较是否小于输入值
out_spike = torch.rand_like(x).le(x).float()
return out_spike
# 加载lena图像
image = Image.open('lena512.bmp').convert('L') # 转换为灰度图像
image_array = np.array(image)
# 将numpy数组转换为torch张量
image_tensor = torch.from_numpy(image_array / 255.).float()
encoder = PoissonEncoder()
# 对图像进行Poisson编码
spike_train = encoder(image_tensor)
# 展示原图像
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(image_array, cmap='gray')
plt.title("Original Image")
plt.axis('off')
# 展示编码后的一部分数据(可以展示某个时间步的脉冲序列)
plt.subplot(1, 2, 2)
# 将torch张量转换回numpy数组以便显示
encoded_image = spike_train.numpy() * 255 # 展示第1个时间步的脉冲情况
plt.imshow(encoded_image, cmap='gray')
plt.title("Poisson Encoded Image (Time step 1)")
plt.axis('off')
plt.show()
4. 代码说明
Poisson编码是一种将输入数据转化为脉冲序列(spike train)的方式,用于Spiking Neural Networks (SNN) 中。以下是对代码实现的逐步解析:
1. Poisson编码器的定义
class PoissonEncoder(nn.Module):
def __init__(self):
super(PoissonEncoder, self).__init__()
- 这里定义了一个Poisson编码器类,继承自
torch.nn.Module
,使其可以与PyTorch框架的其他组件协同工作。 - Poisson编码的作用是将输入的连续值(如图像像素)转换为脉冲信号,脉冲的发放概率与输入值成正比。
2. 脉冲的生成逻辑
def forward(self, x: torch.Tensor):
out_spike = torch.rand_like(x).le(x).float()
return out_spike
- 这一部分实现了编码的核心逻辑:
torch.rand_like(x)
:生成一个与输入张量x
形状相同的随机张量,每个元素都在 [ 0 , 1 ] [0, 1] [0,1] 范围内。.le(x)
:比较随机数与输入值x
,当随机数小于或等于x
时返回True(表示发射脉冲)。.float()
:将布尔值转换为浮点数形式,True转换为1.0(表示发射脉冲),False转换为0.0(没有脉冲)。
通过这种方式,输入的每个像素值被转换为发射脉冲的概率,进而生成一个脉冲序列。
3. 加载并归一化图像
image = Image.open('lena512.bmp').convert('L')
image_array = np.array(image)
image_tensor = torch.from_numpy(image_array / 255.).float()
Image.open()
:加载图像,并使用.convert('L')
将其转换为灰度图。np.array(image)
:将图像转换为NumPy数组。torch.from_numpy(image_array / 255.)
:将图像数组的像素值归一化到 [ 0 , 1 ] [0, 1] [0,1] 范围,并转换为PyTorch的张量格式。
4. 生成脉冲序列
spike_train = encoder(image_tensor)
- 使用定义的
PoissonEncoder
类将归一化后的图像数据转换为脉冲序列。编码后的数据是一个与原图像形状相同的二值张量,其中每个值代表是否在该时间步发射了脉冲。
5. 结果展示
plt.imshow(encoded_image, cmap='gray')
- 在结果展示部分,
encoded_image
表示Poisson编码后的图像,其中白色像素表示发射了脉冲,黑色像素表示没有发射脉冲。你可以通过不同的时间步来可视化不同时间步内的脉冲分布。
Poisson编码在SNN中的作用
- 多样性脉冲序列:通过这种编码方式,原始的像素值被转换为一系列随机脉冲序列,这些脉冲序列可以输入到SNN中。
- 神经元响应性:脉冲的发放频率与像素值正相关,输入值越大,发射脉冲的概率越大,使得神经元能够基于输入特征更有效地学习和响应。
5. 测试结果
运行上述代码后,你将会看到两幅图像:
- 原始图像:展示了
lena512.bmp
的灰度图像。 - 编码后图像:展示了Poisson编码后的图像,其中白色像素表示在该时间步内发射脉冲,黑色像素表示没有发射脉冲。
通过这种方式,原始的图像数据被编码为SNN可以处理的脉冲序列。这个脉冲序列可以直接作为SNN的输入,在接下来的步骤中,我们可以利用这些编码后的数据进行训练和测试。
第三步、构建一个用于MNIST分类的Spiking Neural Network (SNN)
1. 构建SNN网络结构
在这一步,我们将使用刚才实现的LIF神经元构建一个简单的Spiking Neural Network (SNN) 来对MNIST数据集进行分类。由于MNIST数据集的每个输入图像是28x28像素,我们可以将其展平成一个784维的输入向量。然后,我们将通过一个全连接层,输出一个10维的向量,对应10个类别。
2. 网络的结构设计
这个网络由以下部分组成:
- Flatten层:将28x28的输入图像展平成784维的向量。
- 线性层(Linear Layer):将784维的输入映射到10维的输出空间。
- LIF神经元层:每个输出节点使用LIF神经元进行发射脉冲的处理。
为了能够在后续使用Backpropagation Through Time (BPTT)算法,我们需要在代码中引入时间步的处理,每个输入图像将以多个时间步进行处理,这意味着在每个时间步内LIF神经元的状态将被更新。
3. 代码实现
# 定义Flatten层
def flatten_layer(x, start_dim=1, end_dim=-1):
return torch.flatten(x, start_dim=start_dim, end_dim=end_dim)
# return x.reshape(x.shape[0], -1)
# 定义Linear层
def linear_layer(x, w):
return torch.matmul(x, w.T)
def lif_layer(tau = 2.0):
return LIFNode(tau=tau)
# 初始化参数
input_size = 28 * 28
output_size = 10
device = 'cuda:0'
dataset_dir = './'
batch_size = 64
lr = 1e-3
T = 100
tau = 2.0
train_epoch = 100
# 线性层权重
W_linear = torch.randn(output_size, input_size).to(device)
LIF_layer = lif_layer(tau=tau).to(device)
def net(x):
# 前向传播
x_flat = flatten_layer(x)
x_linear = linear_layer(x_flat, W_linear)
output_spikes = LIF_layer.forward(x_linear)
return output_spikes
4. 代码说明
我们对实现的代码进行详细说明,以帮助理解每一部分的功能和原理。
1. Flatten层
def flatten_layer(x, start_dim=1, end_dim=-1):
return torch.flatten(x, start_dim=start_dim, end_dim=end_dim)
- 该函数的目的是将输入张量从二维(28x28的图像)展平为一维(784维的向量),为后续的全连接层处理提供方便。
torch.flatten(x, start_dim=start_dim, end_dim=end_dim)
:这一步将输入x
从start_dim
维度到end_dim
维度进行展平。对于MNIST数据,通常将图像展平成784个元素的向量。
2. Linear层
def linear_layer(x, w):
return torch.matmul(x, w.T)
- 该函数定义了全连接层的操作,实质是矩阵乘法。
torch.matmul(x, w.T)
:输入x
与权重矩阵w
的转置进行矩阵乘法,输出是一个大小为[batch_size, output_size]
的张量,其中output_size
为10,表示10个分类(对应MNIST的10个数字类别)。
3. LIF层
def lif_layer(tau=2.0):
return LIFNode(tau=tau)
- 该函数返回一个基于
LIFNode
的脉冲神经元层(Leaky Integrate-and-Fire模型)。 tau
是LIF神经元的时间常数,它控制了神经元的电荷衰减速度,LIFNode
是自定义或引入的LIF神经元模型。
4. 初始化网络参数
input_size = 28 * 28
output_size = 10
device = 'cuda:0'
dataset_dir = './'
batch_size = 64
lr = 1e-3
T = 100
tau = 2.0
train_epoch = 100
- 这里定义了网络的基本参数:
input_size
: MNIST图像的输入大小,28x28像素,展平成784个输入神经元。output_size
: 输出大小为10,表示10个数字类别。device
: 指定在GPU上运行(若无GPU则可改为cpu
)。batch_size
: 批量大小。lr
: 学习率。T
: 表示每个输入将在T
个时间步上进行处理,通常用于时间步长的模拟。tau
: LIF神经元的时间常数。train_epoch
: 训练的总轮次。
5. 定义网络结构
W_linear = torch.randn(output_size, input_size).to(device)
LIF_layer = lif_layer(tau=tau).to(device)
W_linear
: 初始化全连接层的权重矩阵,大小为[output_size, input_size]
,即10 x 784
,并将其转移到指定设备上。LIF_layer
: 初始化LIF神经元层,并将其移动到指定设备上。
6. 网络前向传播函数
def net(x):
# 前向传播
x_flat = flatten_layer(x)
x_linear = linear_layer(x_flat, W_linear)
output_spikes = LIF_layer.forward(x_linear)
return output_spikes
net
函数实现了网络的前向传播(forward pass):flatten_layer(x)
: 将输入的二维图像展平为一维向量。linear_layer(x_flat, W_linear)
: 将展平后的向量通过全连接层进行线性变换,得到每个类别的输出。LIF_layer.forward(x_linear)
: 线性层输出经过LIF神经元进行非线性处理,并在多个时间步上产生脉冲(spike)。return output_spikes
: 返回LIF神经元的脉冲输出,它将作为最终的输出用于分类。
5. 网络结构的可视化
该网络结构主要包含输入层、线性层(全连接层)和LIF神经元层。
-
Flatten 层:
- 输入层(28x28)图像通过展平成为一个784维的向量,这一层将输入数据的维度进行了转换,但并未涉及神经元的真正处理。
-
Linear 层:
- 这一层包含了784个输入神经元和10个输出神经元。这一层是全连接层,因此每个输入神经元都与输出神经元相连,输出的是一个经过加权求和后的向量(不涉及脉冲)。
-
LIF 层:
- LIF层模拟了生物神经元的动态行为。输出神经元通过LIF模型对输入电流进行整合(integrate),并基于累积电压超过阈值时发放脉冲(spike),这一层的输出是时间步长上的脉冲序列。
6. 完整流程总结
- 输入:28x28像素的MNIST图像。
- Flatten 层:图像展平为784维向量。
- Linear 层:784维输入向量通过线性变换映射到10维的输出向量。
- LIF 层:将线性层的输出通过LIF神经元处理,输出脉冲序列。
- 输出:10维向量的脉冲频率,表示每个类别的输出概率。
第四步、加载数据集并展示部分图像
我们将首先加载MNIST数据集,并展示部分图像,以确认数据的正确性。
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
# 设置数据集路径和批量大小
dataset_dir = './data'
batch_size = 64
# 定义图像变换
transform = transforms.Compose([transforms.ToTensor()])
# 加载训练集和测试集
train_dataset = torchvision.datasets.MNIST(
root=dataset_dir,
train=True,
transform=transform,
download=True
)
test_dataset = torchvision.datasets.MNIST(
root=dataset_dir,
train=False,
transform=transform,
download=True
)
# 定义数据加载器
train_data_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True
)
test_data_loader = torch.utils.data.DataLoader(
dataset=test_dataset,
batch_size=batch_size,
shuffle=False,
drop_last=False
)
# 展示部分图像
def show_images(data_loader):
images, labels = next(iter(data_loader))
images = images.numpy()
fig, axes = plt.subplots(1, 6, figsize=(12, 6))
for i in range(6):
ax = axes[i]
ax.imshow(images[i].reshape(28, 28), cmap='gray')
ax.set_title(f'Label: {labels[i].item()}')
ax.axis('off')
plt.show()
show_images(train_data_loader)
展示效果:
数据加载器规模
-
训练数据加载器:
- 总共有 60,000 张训练图像。
batch_size = 64
,因此每批数据包含 64 张图像。- 因为
drop_last=True
,如果最后一个批次不足以构成 64 张图像,则会被丢弃。 - 总共会有 ⌊ 60000 64 ⌋ = 937 \left\lfloor \frac{60000}{64} \right\rfloor = 937 ⌊6460000⌋=937 个完整的批次。
-
测试数据加载器:
- 总共有 10,000 张测试图像。
batch_size = 64
,因此每批数据包含 64 张图像。- 因为
drop_last=False
,即使最后一个批次不足 64 张图像也会被返回。 - 总共会有 ⌈ 10000 64 ⌉ = 157 \left\lceil \frac{10000}{64} \right\rceil = 157 ⌈6410000⌉=157 个批次,其中最后一个批次可能包含少于 64 张图像。
训练参数
- 输入大小 (
input_size
): 28 * 28 = 784,这代表每个输入图像会被展平成一个 784 维的向量。 - 输出大小 (
output_size
): 10,表示有 10 个分类(即 0 到 9 的数字)。 - 设备 (
device
): 使用的是 GPU,具体为'cuda:0'
。 - 学习率 (
lr
): 设置为 0.001(1e-3)。 - tau: 同样,这个参数可能是某种衰减或平滑因子,但需要更多上下文来明确。
- 训练轮数 (
train_epoch
): 训练总共会进行 100 轮。
第五步、反向传播通过时间(BPTT)算法及其实现
BPTT简介
反向传播通过时间(Backpropagation Through Time, BPTT)是传统反向传播算法在处理时间序列数据时的扩展。在SNN中,由于神经元状态随时间步变化,我们需要考虑每个时间步上的误差并进行累积。
梯度更新公式
假设损失函数为 L L L ,权重矩阵为 W W W,输入为 x x x,膜电位为 V V V,发射的脉冲为 S S S:
- 膜电位的更新: V ( t + 1 ) = V ( t ) + I ( t ) − S ( t ) ⋅ V r e s e t V(t+1) = V(t) + I(t) - S(t) \cdot V_{reset} V(t+1)=V(t)+I(t)−S(t)⋅Vreset
- 脉冲发射判断: S ( t ) = H ( V ( t ) − V t h ) S(t) = H(V(t) - V_{th}) S(t)=H(V(t)−Vth)
- 损失函数的梯度:通过BPTT,我们会累积每个时间步上的误差:
∂ L ∂ W = ∑ t = 1 T ∂ L ( t ) ∂ S ( t ) ⋅ ∂ S ( t ) ∂ V ( t ) ⋅ ∂ V ( t ) ∂ W \frac{\partial L}{\partial W} = \sum_{t=1}^{T} \frac{\partial L(t)}{\partial S(t)} \cdot \frac{\partial S(t)}{\partial V(t)} \cdot \frac{\partial V(t)}{\partial W} ∂W∂L=t=1∑T∂S(t)∂L(t)⋅∂V(t)∂S(t)⋅∂W∂V(t)
步骤 1:计算梯度
在我们实现的LIF神经元模型中,权重 W linear W_{\text{linear}} Wlinear 是唯一需要更新的参数。为了计算梯度,我们需要使用链式法则。
-
损失函数的梯度 ∂ Loss ∂ y ^ \frac{\partial \text{Loss}}{\partial \hat{y}} ∂y^∂Loss :
由于我们使用的是均方误差(MSE)损失函数,损失函数对输出的梯度可以表示为:∂ Loss ∂ y ^ = y ^ − y true \frac{\partial \text{Loss}}{\partial \hat{y}} = \hat{y} - y_{\text{true}} ∂y^∂Loss=y^−ytrue
-
神经元的输出对线性层输入的梯度 ∂ y ^ ∂ z \frac{\partial \hat{y}}{\partial z} ∂z∂y^ :
我们使用了 Sigmoid 作为脉冲发放函数的替代函数,因此该部分的梯度为:∂ y ^ ∂ z = σ ( z ) ⋅ ( 1 − σ ( z ) ) \frac{\partial \hat{y}}{\partial z} = \sigma(z) \cdot (1 - \sigma(z)) ∂z∂y^=σ(z)⋅(1−σ(z))
-
线性层输入对权重的梯度 ∂ z ∂ W linear \frac{\partial z}{\partial W_{\text{linear}}} ∂Wlinear∂z :
线性层输入为 z = W linear ⋅ x z = W_{\text{linear}} \cdot x z=Wlinear⋅x,因此:∂ z ∂ W linear = x \frac{\partial z}{\partial W_{\text{linear}}} = x ∂Wlinear∂z=x
步骤 2:更新权重
将所有部分的梯度结合起来,对 W linear W_{\text{linear}} Wlinear 进行更新:
W new = W linear − η ⋅ ( ∂ Loss ∂ y ^ ⋅ ∂ y ^ ∂ z ⋅ ∂ z ∂ W linear ) W_{\text{new}} = W_{\text{linear}} - \eta \cdot \left(\frac{\partial \text{Loss}}{\partial \hat{y}} \cdot \frac{\partial \hat{y}}{\partial z} \cdot \frac{\partial z}{\partial W_{\text{linear}}}\right) Wnew=Wlinear−η⋅(∂y^∂Loss⋅∂z∂y^⋅∂Wlinear∂z)
代码实现
def compute_gradient():
# 计算梯度
grad_loss = out_spikes_counter_frequency - label_one_hot # dLoss/dy
grad_spike = sigmoid_derivative(LIF_layer.v) # dy/dz
return grad_loss , grad_spike
# 初始化梯度列表
grad_history = []
def update_weight_sgd(grad_loss, grad_spike):
global W_linear
# 向量化处理整个批量数据,计算梯度
gradients = (grad_loss * grad_spike).t() @ img.view(batch_size, -1)
# 更新权重
W_linear -= lr * gradients
# 保存本次迭代的梯度
grad_history.append(gradients.norm().item())
# 打印梯度信息
# print("Current gradients:\n", gradients)
第六步,训练网络
第六步,开始训练SNN网络,首先指定好训练参数如学习率等以及若干其他配置
优化器使用Adam,以及使用泊松编码器,在每次输入图片时进行脉冲编码;
训练代码的编写需要遵循以下三个要点:
- 脉冲神经元的输出是二值的,而直接将单次运行的结果用于分类极易受到干扰。因此一般认为脉冲网络的输出是输出层一段时间内的发放频率(或称发放率),发放率的高低表示该类别的响应大小。因此网络需要运行一段时间,即使用
T
个时刻后的平均发放率作为分类依据。 - 我们希望的理想结果是除了正确的神经元以最高频率发放,其他神经元保持静默。常常采用交叉熵损失或者MSE损失,这里我们使用实际效果更好的MSE损失。
- 每次网络仿真结束后,需要重置网络状态
for epoch in range(train_epoch):
print("Epoch {}:".format(epoch))
print("Training...")
train_correct_sum = 0
train_sum = 0
# net.train()
for img, label in tqdm(train_data_loader):
img = img.to(device)
label = label.to(device)
label_one_hot = one_hot(label, 10)
# 运行T个时长,out_spikes_counter是shape=[batch_size, 10]的tensor
# 记录整个仿真时长内,输出层的10个神经元的脉冲发放次数
for t in range(T):
if t == 0:
out_spikes_counter = net(encoder(img).float())
else:
out_spikes_counter += net(encoder(img).float())
# out_spikes_counter / T 得到输出层10个神经元在仿真时长内的脉冲发放频率
out_spikes_counter_frequency = out_spikes_counter / T
# 损失函数为输出层神经元的脉冲发放频率,与真实类别的MSE
# 这样的损失函数会使,当类别i输入时,输出层中第i个神经元的脉冲发放频率趋近1,而其他神经元的脉冲发放频率趋近0
loss = mse_loss(out_spikes_counter_frequency, label_one_hot)
# print(loss)
# 记录损失值
losses.append(loss.item())
# 更新 W_linear——计算LOSS
grad_loss,grad_spike = compute_gradient()
update_weight_sgd(grad_loss, grad_spike)
# 优化一次参数后,需要重置网络的状态,因为SNN的神经元是有“记忆”的
reset_net(LIF_layer)
# 正确率的计算方法如下。认为输出层中脉冲发放频率最大的神经元的下标i是分类结果
train_correct_sum += (out_spikes_counter_frequency.max(1)[1] == label.to(device)).float().sum().item()
train_sum += label.numel()
train_batch_accuracy = (out_spikes_counter_frequency.max(1)[1] == label.to(device)).float().mean().item()
train_accs.append(train_batch_accuracy)
train_times += 1
train_accuracy = train_correct_sum / train_sum
print("Testing...")
# net.eval()
with torch.no_grad():
# 每遍历一次全部数据集,就在测试集上测试一次
test_correct_sum = 0
test_sum = 0
for img, label in tqdm(test_data_loader):
img = img.to(device)
for t in range(T):
if t == 0:
out_spikes_counter = net(encoder(img).float())
else:
out_spikes_counter += net(encoder(img).float())
test_correct_sum += (out_spikes_counter.max(1)[1] == label.to(device)).float().sum().item()
test_sum += label.numel()
# reset_net(net)
# 在优化前重置网络状态
reset_net(LIF_layer)
test_accuracy = test_correct_sum / test_sum
test_accs.append(test_accuracy)
max_test_accuracy = max(max_test_accuracy, test_accuracy)
print("Epoch {}: train_acc = {}, test_acc={}, max_test_acc={}, train_times={}".format(epoch, train_accuracy,
test_accuracy,
max_test_accuracy,
train_times))
print()
loss下降曲线
最终梯度
部分运行过程截图
附录:完整jupyter notebook代码
#%%
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torchvision
import numpy as np
from spikingjelly.clock_driven import neuron, surrogate, base
from tqdm import tqdm
import matplotlib.pyplot as plt
class PoissonEncoder(nn.Module):
def __init__(self):
"""
Poisson Encoder 将输入 `x` 转换为脉冲信号,脉冲的发放概率与 `x` 相同。
`x` 的取值范围必须在 `[0, 1]` 之间。
"""
super().__init__()
pass
def forward(self, x: torch.Tensor):
# 使用与输入张量相同形状的随机张量,并比较是否小于输入值
out_spike = torch.rand_like(x).le(x).float()
return out_spike
def reset_net(net):
"""
重置网络中所有 LIFNode 神经元的状态
"""
for module in net.modules():
if isinstance(module, LIFNode):
# 使用新的批量大小初始化self.v
module.v = module.v_reset
def sigmoid(x):
"""
计算 Sigmoid 函数的值。
参数:
x -- 输入数据,可以是单个数字或者 PyTorch 张量
返回:
s -- Sigmoid 函数的结果
"""
# 确保 x 是 PyTorch 张量
if not isinstance(x, torch.Tensor):
x = torch.tensor(x, dtype=torch.float32, device=device)
s = 1 / (1 + torch.exp(-x))
return s
def sigmoid_derivative(x):
"""
计算 Sigmoid 函数的导数。
参数:
x -- 输入数据,可以是单个数字或者 PyTorch 张量
返回:
ds -- Sigmoid 函数导数的结果
"""
# 先计算 Sigmoid 函数的值
sig = sigmoid(x)
# 然后根据 Sigmoid 导数的公式计算导数
ds = sig * (1 - sig)
return ds
def mse_loss(predictions: torch.Tensor, targets: torch.Tensor):
"""
计算 MSE 损失。
参数:
predictions -- 预测值,torch.Tensor
targets -- 目标值,torch.Tensor
返回:
loss -- 计算得到的 MSE 损失,torch.Tensor
"""
# 计算差值的平方
squared_diff = 1 / 2.0 * (predictions - targets) ** 2
# 计算均值
loss = torch.mean(squared_diff)
return loss
#%%
class BaseNode(base.MemoryModule):
def __init__(self, v_threshold: float = 1., v_reset: float = 0., detach_reset: bool = False):
super().__init__()
if v_reset is None:
self.register_memory('v', 0.)
else:
self.register_memory('v', v_reset)
self.register_memory('v_threshold', v_threshold)
self.register_memory('v_reset', v_reset)
self.detach_reset = detach_reset
def neuronal_charge(self, x: torch.Tensor):
if self.decay_input:
if self.v_reset is None or self.v_reset == 0.:
self.v = self.v + (x - self.v) / self.tau
def neuronal_fire(self):
"""
根据当前神经元的电压、阈值,计算输出脉冲。
"""
# return self.surrogate_function(self.v - self.v_threshold)
# return sigmoid(self.v - self.v_threshold)
return sigmoid(self.v - self.v_threshold)
def neuronal_reset(self, spike):
"""
根据当前神经元释放的脉冲,对膜电位进行重置。
"""
spike_d = spike
self.v = (1. - spike_d) * self.v + spike_d * self.v_reset
def forward(self, x: torch.Tensor):
"""
:param x: 输入到神经元的电压增量
:type x: torch.Tensor
:return: 神经元的输出脉冲
:rtype: torch.Tensor
按照充电、放电、重置的顺序进行前向传播。
"""
self.neuronal_charge(x)
spike = self.neuronal_fire()
self.neuronal_reset(spike)
return spike
class LIFNode(BaseNode):
def __init__(self, tau: float = 2., decay_input: bool = True, v_threshold: float = 1.,
v_reset: float = 0.,
detach_reset: bool = False):
"""
:param tau: 膜电位时间常数
:param decay_input: 输入是否会衰减
:param v_threshold: 神经元的阈值电压
:param v_reset: 神经元的重置电压。如果不为 ``None``,当神经元释放脉冲后,电压会被重置为 ``v_reset``;
如果设置为 ``None``,则电压会被减去 ``v_threshold``
:param surrogate_function: 反向传播时用来计算脉冲函数梯度的替代函
Leaky Integrate-and-Fire 神经元模型,可以看作是带漏电的积分器。其阈下神经动力学方程为:
若 ``decay_input == True``:
.. math::
V[t] = V[t-1] + \\frac{1}{\\tau}(X[t] - (V[t-1] - V_{reset}))
若 ``decay_input == False``:
.. math::
V[t] = V[t-1] - \\frac{1}{\\tau}(V[t-1] - V_{reset}) + X[t]
"""
assert isinstance(tau, float) and tau > 1.
super().__init__(v_threshold, v_reset, detach_reset)
self.tau = tau
self.decay_input = decay_input
def forward(self, x: torch.Tensor):
return super().forward(x)
# 定义Flatten层
def flatten_layer(x, start_dim=1, end_dim=-1):
return torch.flatten(x, start_dim=start_dim, end_dim=end_dim)
# return x.reshape(x.shape[0], -1)
# 定义Linear层
def linear_layer(x, w):
return torch.matmul(x, w.T)
def lif_layer(tau = 2.0):
return LIFNode(tau=tau)
#%%
# 初始化参数
input_size = 28 * 28
output_size = 10
device = 'cuda:0'
dataset_dir = './'
batch_size = 64
lr = 1e-3
T = 100
tau = 2.0
train_epoch = 100
# 线性层权重
W_linear = torch.randn(output_size, input_size).to(device)
LIF_layer = lif_layer(tau=tau).to(device)
#%%
def net(x):
# 前向传播
x_flat = flatten_layer(x)
x_linear = linear_layer(x_flat, W_linear)
output_spikes = LIF_layer.forward(x_linear)
return output_spikes
#%%
# 初始化数据加载器
train_dataset = torchvision.datasets.MNIST(
root=dataset_dir,
train=True,
transform=torchvision.transforms.ToTensor(),
download=True
)
test_dataset = torchvision.datasets.MNIST(
root=dataset_dir,
train=False,
transform=torchvision.transforms.ToTensor(),
download=True
)
train_data_loader = data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True
)
test_data_loader = data.DataLoader(
dataset=test_dataset,
batch_size=batch_size,
shuffle=False,
drop_last=False
)
#%%
encoder = PoissonEncoder()
def one_hot(labels, num_classes=10):
"""
将标签转换为 one-hot 编码形式。
参数:
labels -- 整数列表或整数数组,表示每个样本的类别标签
num_classes -- 类别的总数,默认为 10
返回:
one_hot_labels -- one-hot 编码后的数组
"""
# # 创建一个形状为 (len(labels), num_classes) 的全零数组
# one_hot_labels = np.zeros((len(labels), num_classes))
#
# # 将 labels 中每个元素对应的位置设置为 1
# one_hot_labels[np.arange(len(labels)), labels] = 1
# 创建一个形状为 (len(labels), num_classes) 的全零张量
one_hot_labels = torch.zeros((len(labels), num_classes), dtype=torch.float32, device=device)
# 将 labels 中每个元素对应的位置设置为 1
one_hot_labels.scatter_(1, labels.unsqueeze(1), 1)
return one_hot_labels
def compute_gradient():
# 计算梯度
grad_loss = out_spikes_counter_frequency - label_one_hot # dLoss/dy
grad_spike = sigmoid_derivative(LIF_layer.v) # dy/dz
return grad_loss , grad_spike
# 初始化梯度列表
grad_history = []
def update_weight_sgd(grad_loss, grad_spike):
global W_linear
# 向量化处理整个批量数据,计算梯度
gradients = (grad_loss * grad_spike).t() @ img.view(batch_size, -1)
# 更新权重
W_linear -= lr * gradients
# 保存本次迭代的梯度
grad_history.append(gradients.norm().item())
# 打印梯度信息
# print("Current gradients:\n", gradients)
#%%
train_times = 0
max_test_accuracy = 0
test_accs = []
train_accs = []
# 初始化损失列表
losses = []
#%%
for epoch in range(train_epoch):
print("Epoch {}:".format(epoch))
print("Training...")
train_correct_sum = 0
train_sum = 0
# net.train()
for img, label in tqdm(train_data_loader):
img = img.to(device)
label = label.to(device)
label_one_hot = one_hot(label, 10)
# 运行T个时长,out_spikes_counter是shape=[batch_size, 10]的tensor
# 记录整个仿真时长内,输出层的10个神经元的脉冲发放次数
for t in range(T):
if t == 0:
out_spikes_counter = net(encoder(img).float())
else:
out_spikes_counter += net(encoder(img).float())
# out_spikes_counter / T 得到输出层10个神经元在仿真时长内的脉冲发放频率
out_spikes_counter_frequency = out_spikes_counter / T
# 损失函数为输出层神经元的脉冲发放频率,与真实类别的MSE
# 这样的损失函数会使,当类别i输入时,输出层中第i个神经元的脉冲发放频率趋近1,而其他神经元的脉冲发放频率趋近0
loss = mse_loss(out_spikes_counter_frequency, label_one_hot)
# print(loss)
# 记录损失值
losses.append(loss.item())
# 更新 W_linear——计算LOSS
grad_loss,grad_spike = compute_gradient()
update_weight_sgd(grad_loss, grad_spike)
# 优化一次参数后,需要重置网络的状态,因为SNN的神经元是有“记忆”的
reset_net(LIF_layer)
# 正确率的计算方法如下。认为输出层中脉冲发放频率最大的神经元的下标i是分类结果
train_correct_sum += (out_spikes_counter_frequency.max(1)[1] == label.to(device)).float().sum().item()
train_sum += label.numel()
train_batch_accuracy = (out_spikes_counter_frequency.max(1)[1] == label.to(device)).float().mean().item()
train_accs.append(train_batch_accuracy)
train_times += 1
train_accuracy = train_correct_sum / train_sum
print("Testing...")
# net.eval()
with torch.no_grad():
# 每遍历一次全部数据集,就在测试集上测试一次
test_correct_sum = 0
test_sum = 0
for img, label in tqdm(test_data_loader):
img = img.to(device)
for t in range(T):
if t == 0:
out_spikes_counter = net(encoder(img).float())
else:
out_spikes_counter += net(encoder(img).float())
test_correct_sum += (out_spikes_counter.max(1)[1] == label.to(device)).float().sum().item()
test_sum += label.numel()
# reset_net(net)
# 在优化前重置网络状态
reset_net(LIF_layer)
test_accuracy = test_correct_sum / test_sum
test_accs.append(test_accuracy)
max_test_accuracy = max(max_test_accuracy, test_accuracy)
print("Epoch {}: train_acc = {}, test_acc={}, max_test_acc={}, train_times={}".format(epoch, train_accuracy,
test_accuracy,
max_test_accuracy,
train_times))
print()
#%%
import matplotlib.pyplot as plt
# 绘制损失曲线
plt.figure(figsize=(10, 6))
plt.plot(losses, label='Training Loss')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Manual - Training Loss over Iterations')
plt.legend()
plt.grid(True)
plt.show()
# 训练完成后绘制梯度变化图
plt.plot(grad_history)
plt.xlabel('Iteration')
plt.ylabel('Gradient Norm')
plt.title('Gradient Norm Over Training Iterations')
# 使用对数刻度显示
plt.yscale('log')
plt.show()
#%%
# 假设 grad_history 是一个包含梯度张量的列表
# 使用列表切片获取最后10个元素
last_10_gradients = grad_history[-10:]
# 遍历并打印这些梯度
for i, gradient in enumerate(last_10_gradients):
print(gradient)
#%%
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)