1 前期准备

1.1 引入库

import numpy as np
import pandas as pd
import torch.nn as nn #用于构建网络
import torch
import matplotlib.pyplot as plt
import torch.nn.functional as F
torch.set_default_tensor_type(torch.DoubleTensor)
#将tensor的默认类型设置为双精度浮点类型(torch.doubletensor),便于反向传播

1.2 读取数据

本文选取了每月太阳黑子数据集,该数据集描述了1749年1月至2023 年10月观测到的太阳黑子数量的月度计数。
数据网址如下(也可以关注并私信博主来获取数据集):
http://www.sidc.be/silso/datafiles

Data=pd.read_csv("E:\\代码学习\\CSDN博客\\CNN\\SN_m.csv")
start_time=pd.to_datetime("1749-01-01")
end_time=pd.to_datetime("2023-11-01")
time=pd.date_range(start=start_time,end=end_time,freq='M')#生成时间序列
Data['time']=time
#将时间设为索引
data=Data.set_index('time',drop=True, append=False, inplace=False, verify_integrity=False)
series=np.array(data['Sunspots'])

1.3 可视化

从图中可以看出该数据集有很强的的季节性。

plt.figure(figsize=(14,6))
plt.plot(data,label='Sunspots')
plt.grid()
plt.xlabel('Date')

2 数据预处理

2.1 划分训练集、测试集

自定义函数train_test_split用于划分训练集与测试集。其中series表示整体样本,split_prop表示训练集所占的比例。

def train_test_split(series,split_prop):
    #split_prop表示训练集所占的比例
    train =series[:int(split_prop*int(series.size))]
    test=series[int(split_prop*int(series.size)):]
    return train,test

将数据集的70%用作训练集,30%用作测试集

split_prop=0.7#设置划分比例
train,test=train_test_split(series,split_prop)#划分

2.2 划分特征与标签

自定义函数data_process用于滑动窗口采样,从而获得测试集训练集的特征和标签,其中windowsize表示滑窗大小,step表示滑动步长。固定滑动窗口采样示意如图所示:

import random
def data_process(train,test,window_size,step):
    #将数据转为tensor数据结构并进行划窗操作,得到短序列
    train_tensor=torch.from_numpy(train)#将训练集数据转为tensor数据结构
    train_window_split=train_tensor.unfold(0,window_size,step)#获得训练集“卷”后的数据集
    train_set=train_window_split.numpy()
    test_tensor=torch.from_numpy(test)#将测试集数据转为tensor数据结构
    test_window_split=test_tensor.unfold(0,window_size,step)#获得测试集“卷”后的数据集
    test_set=test_window_split.numpy()
    
    #将训练集中的各个短序列打乱
    train_temp1=train_set.tolist()#转换为列表格式
    #random.shuffle()将一个列表中的元素(短序列)打乱顺序,不生成新列表,只是将原列表次序打乱
    random.shuffle(train_temp1)#打乱训练集
    train_temp2=np.array(train_temp1)#建立数组,数组内容为打乱顺序的列表
    
    #将短序列划分为Feature和Label
    train_feature_array=train_temp2[:,:window_size-1]
    train_label_array=train_temp2[:,window_size-1:]#取各个短序列内最后一个值为该短序列的标签(即时序真实值)
    test_temp1=test_set.tolist()#转换为列表格式
    test_temp2=np.array(test_temp1)#建立数组
    test_feature_array=test_temp2[:,:window_size-1]
    test_label_array=test_temp2[:,window_size-1:]
    
    #将ndarray(N维数组类型的对象)转化为tensor
    train_feature_tensor=torch.from_numpy(train_feature_array)
    train_label=torch.from_numpy(train_label_array)
    test_feature_tensor=torch.from_numpy(test_feature_array)
    test_label=torch.from_numpy(test_label_array)
    
    #扩展数据维度,符合CNN输出
    train_feature=train_feature_tensor.reshape(train_feature_tensor.shape[0],1,train_feature_tensor.shape[1])
    test_feature=test_feature_tensor.reshape(test_feature_tensor.shape[0],1,test_feature_tensor.shape[1])
    return train_feature,train_label,test_feature,test_label
window_size=7#设置滑窗大小
step=1#设置滑动步长
train_feature,train_label,test_feature,test_label=data_process(train,test,window_size,step)

训练集的特征与标签值如下所示:

3 构建一维卷积神经网络

本文设计的一维卷积神经网络网络示意图如图所示:

构建一维卷积神经网络代码如下:

#定义一个类MyConv,继承于父类nn.Module(PyTorch 体系下所有神经网络模块的基类)
class MyConv(nn.Module):
    def __init__(self):#__init__是类的构造函数,self是类的实例
        super(MyConv,self).__init__()#super()用于调用父类中的构造函数
        #一层一维卷积
        #nn.Sequential相当于一个容器,按照顺序存储模块
        self.conv1=nn.Sequential(
            nn.Conv1d(in_channels=1,out_channels=32,kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True)
        )
        #Convid函数表示在由多个输入平面组成的输入信号上应用一维卷积
        #其中in_channels输入信号通道数,out_channels卷积产生的通道,kernel_size卷积核尺寸
        #stride卷积步长,padding输入的每一条边补充0的层数
        self.conv2=nn.Sequential(
            nn.Conv1d(in_channels=32,out_channels=64,kernel_size=2,stride=1,padding=1),
            nn.ReLU(inplace=True)
        )
        #激活函数ReLU
        #将输出通道变为单值
        self.fc1=nn.Linear(64,32)
        #nn.Linear定义一个神经网络的线性层
        #第一个参数表示输入神经元个数,第二个表示输出神经元个数
        self.fc2=nn.Linear(32,1)
    def forward(self,X):
        #在调用时可直接MyConv(data),而不用MyConv.forward(data)
        out=self.conv1(X)#一维卷积
        out=F.avg_pool1d(out,3)#平均池化
        out=self.conv2(out)#一维卷积
        out=F.avg_pool1d(out,3)#平均池化
        out=out.squeeze()
        #去掉值为1的维度,即shape=1的维度
        #[[[1,2,3],[3,4,5]]]  》 [[1,2,3],[3,4,5]]
        #原来张量的shape为(1,2,3),squeeze()处理后张量的shape为(2,3)
        out=self.fc1(out)
        out=self.fc2(out)
        return out
#构建网络
net=MyConv()

4 训练模型

4.1 划分batch

这里先引申出几个神经网络的经典名词:batch、epoch、iteration:

  • batch:在神经网络模型训练时,比如有1000个样本,把这些样本分为10批,就是10个batch。每个批(batch)的大小为100,就是batch size=100。
  • epoch:使用训练集中的全部样本训练的次数,通俗地讲几次epoch就是整个数据集被训练几次。
  • iteration:使用batchsize数量的样本训练的次数,比如训练集有1000个样本,batchsize = 100 ,那么训练完整个训练集:iteration=10,epoch=1。

自定义函数data_iter用于划分batch。其中batch_size表示一个batch的大小,features表示特征,labels表示标签

def data_iter(batch_size,features,labels):
    num_examples=len(features)
    indices=list(range(num_examples))
    for i in range(0,num_examples,batch_size):
        j=torch.LongTensor(indices[i: min(i+batch_size,num_examples)])
        yield features.index_select(0,j),labels.index_select(0,j)

4.2 设置损失函数

损失函数(loss function)就是用来度量模型的预测值f(x)与真实值Y的差异程度的运算函数。损失函数使用主要是在模型的训练阶段,每个批次的训练数据送入模型后,通过前向传播输出预测值,然后损失函数会计算出预测值和真实值之间的差异值,也就是损失值。得到损失值之后,模型通过反向传播去更新各个参数,来降低真实值与预测值之间的损失,使得模型生成的预测值往真实值方向靠拢,从而达到学习的目的。

#损失函数-均方误差
def square_loss(feature,label):
    return (net(feature)-label)**2/2

4.3 参数初始化

#参数初始化
for params in net.parameters():#网络的参数包含网络的连接权重W和偏置b
    torch.nn.init.normal_(params,mean=0,std=0.01)
    #一般给网络中连接权重W初始化,初始化参数值符合均值为0,标准差为0.01的正态分布

lr=0.001#学习率
num_epochs=100#训练轮数
batch_size=128#batch大小
loss=square_loss#损失函数
optimizer=torch.optim.Adam(net.parameters(),lr)
#设置优化器,传入网络模型的参数,并设置学习率

4.4 训练模型

#训练模型
train_loss=[]
test_loss=[]
#模型训练
for epoch in range(num_epochs):#外循环训练
    train_1,test_1=0.0,0.0#更新训练集和测试机的损失函数值
    for X,y in data_iter(batch_size,train_feature,train_label):#内循环一个batch
        l=loss(X,y).sum()
        #计算每一个batch中的损失函数,即输出值与真实值的差距
        if optimizer is not None:
            optimizer.zero_grad()
            #optimizer.zero_grad()是把梯度置零,即把loss关于权重W的导数变成0
        elif params is not None and params[0].grad is not None:
            for param in params:
                param.grad.data.zero_()
        #反向传播
        l.backward()#反向传播计算得到每个参数的梯度值(loss关于权重W的梯度)
        optimizer.step()#通过梯度下降执行一步参数(权重W)更新
    train_1=loss((train_feature),train_label)
    test_1=loss((test_feature),test_label)
    train_loss.append((train_1.mean().item()))
    test_loss.append(test_1.mean().item())
    print('epoch %d,train loss %f,test loss %f'%(epoch+1,train_1.mean().item(),test_1.mean().item()))

5 模型结果

5.1 损失函数曲线

#绘制损失函数loss曲线
x=np.arange(num_epochs)
plt.figure(figsize=(8,6))
plt.plot(x,train_loss,label='train_loss',linewidth=1.5)
plt.plot(x,test_loss,label='test_loss',linewidth=1.5)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.grid()

5.2 测试集的真实值与预测值曲线

test_predict=[]
split_point=int(split_prop*int(series.size))
#split_prop表示训练集所占的比例
#split_point表示训练集和测试集的划分位置
test_time=time[split_point+window_size-1:]

#测试集真实序列
test_true=series[split_point+window_size-1:]
#测试集预测序列
test_predict=net(test_feature).squeeze().tolist()

#画图
#整体
plt.figure(figsize=(14,12))
plt.subplot2grid((2,1),(0,0))
plt.plot(test_time,test_true,label='true')
plt.plot(test_time,test_predict,label='predict')
plt.legend(fontsize=15)
plt.grid()
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐