基于Pytorch+LSTM实现一元单步时间序列预测(保姆级教程)
基于Pytorch+LSTM实现一元单步时间序列预测的保姆级教程,涵盖模型如何搭建、数据如何加载以及模型训练等相关技巧,更是包含各部分的详细代码!
目录
模型搭建
模型基础
图1 RNN网络结构
class LSTM(nn.Module):
def __init__(self,input_size,hidden_size,num_layers,output_size,batch_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.batch_size = batch_size
self.num_directions = 1
self.lstm = nn.LSTM(self.input_size,self.hidden_size,self.num_layers,batch_first=True)
self.linear = nn.Linear(self.hidden_size,self.output_size)
def forward(self,inputseq):
h_0 = torch.randn(self.num_directions*self.num_layers,self.batch_size,self.hidden_size).to(device)
c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
output,_ = self.lstm(inputseq,(h_0,c_0))
pred = self.linear(output[:,-1,:])
return pred
要点
1)关于nn.Module类:nn.Module是神经网络模型的基础类,自定义层/块/模型均需要继承该模型,并重新实现__init__()构造函数和forward前向函数。一般,构造函数只定义一些层,在forward函数中定义层间的连接关系。super().__init__()指的是继承父类的初始化方法。只要定义了forward函数,可以通过autograd自动实现反向求导。
2)关于LSTM的结构:input(batch_size,seq_len,input_size)
h_0/c_0(number_directions*num_layers,batch_size,hidden_size)
output(batch_size,seq_len,hidden_size)
其中:batch_size表示一次性输入的个数,seq_len表示输入的时间序列长度,input_size表示几元变量,hidden_size表示隐藏层神经单元个数。
单元测试
model = LSTM(1,32,2,1,2)
test_input = torch.tensor([[70, 80, 90],[70, 80, 90]]).float().unsqueeze(2)
predicted_output = model(test_input)
print(predicted_output)
数据准备
数据来源
天池-某地区电网某地区电网2018年1月1日至2021年8月31日间隔 15 分钟的电力系统负荷数据,链接为:电网_数据集-阿里云天池 (aliyun.com)
数据读取
def read_data(filename):
data = pd.read_csv(filename,skiprows=1)
data.head(5)
L = data.shape[0]
print("data的尺寸为:{}".format(data.shape))
# print("文件中有nan行共{}行".format(data.isnull().sum(axis=1)))
return data,L
数据加载
def process_data(data,N):
XY=[]
for i in range(len(data) - N):
X = []
Y = []
for j in range(N):
X.append(data.iloc[i+j, 1])
Y.append(data.iloc[i + N, 1])
X = torch.FloatTensor(X).view(-1,1)
Y = torch.FloatTensor(Y)
XY.append((X,Y))
return XY
class MyDataset(Dataset):
def __init__(self,data):
self.data = data
def __getitem__(self, item):
return self.data[item]
def __len__(self):
return len(self.data)
def data_loader(data,N,batch_size,shuffle):
seq = process_data(data, N)
seq_set = MyDataset(seq)
seq = DataLoader(dataset=seq_set,batch_size=batch_size,shuffle=shuffle,drop_last=True)
return seq_set,seq
要点:
1)继承Dataset类加载自己的数据集:torch.utils.data.Dataset是一个抽象类,加载自定义数据集,需要继承该类,并重写__len__()和__getitem__()两个函数。
2)DataLoader:封装了数据加载、数据预处理以及并行加载等功能,通过DataLoader可以方便的创建批量数据、打乱数据顺序、实现多线程数据加载等。
3)总结:Pytorch输入数据三步走
单元测试
filename = "complete_data.csv"
data,L = read_data(filename)
train_pro = 0.0005
train = data.loc[:len(data)*train_pro,:]
train_data_set,train_data = data_loader(train, 24, 5, False)
model = LSTM(1, 32, 2, 1, 5)
loss = nn.MSELoss()
for data in train_data:
x, label = data
print(x.size())
print(label.size())
predicted_output = model(x)
print(predicted_output)
loss_output = loss(predicted_output, label)
print(loss_output)
模型训练
模型参数规范化
def get_parameters():
para_dict = {
"input_size":1,
"output_size":1,
"batch_size":5,
"hidden_size":64,
"num_layers":3,
"seq_len":24,
"modelpara_path":'md.pth',
"loss_function":'mse',
"optimizer":'Adam',
"lr":0.0001,
"epoch":200,
}
return para_dict
训练步骤
def train_proc(para_dict,train_data,val_data):
input_size=para_dict["input_size"]
hidden_size = para_dict["hidden_size"]
num_layers = para_dict["num_layers"]
output_size = para_dict["output_size"]
batch_size = para_dict["batch_size"]
lr = para_dict["lr"]
epoch = para_dict["epoch"]
model = LSTM(input_size,hidden_size,num_layers,output_size,batch_size)
model.to(device)
#优化器保存当前的状态,并可以进行参数的更新
if para_dict["optimizer"]=='Adam':
optimizer = torch.optim.Adam(model.parameters(),lr)
if para_dict["loss_function"]=='mse':
loss_function = nn.MSELoss()
best_model = None
min_val_loss = float('inf')
train_loss = []
val_loss = []
for i in tqdm(range(epoch)):
train_loss_tmp = 0
val_loss_tmp = 0
# 训练
model.train()
for curdata in train_data:
seq, label = curdata
seq = seq.to(device)
label = label.to(device)
# 计算网络输出
y_pred = model(seq)
# 计算损失
loss = loss_function(y_pred,label)
train_loss_tmp += loss.item()
# 计算梯度和反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 验证
model.eval()
for (seq, label) in val_data:
seq = seq.to(device)
label = label.to(device)
with torch.no_grad():
y_pred = model(seq)
loss = loss_function(y_pred,label)
val_loss_tmp += loss.item()
# 最优模型
if val_loss_tmp<min_val_loss:
min_val_loss = val_loss_tmp
best_model = copy.deepcopy(model)
#损失保存
train_loss_tmp /= len(train_data)
val_loss_tmp /= len(val_data)
train_loss.append(train_loss_tmp)
val_loss.append(val_loss_tmp)
print("epoch={:03d}: train_loss = {:05f},val_loss = {:05f}".format(i,train_loss_tmp,val_loss_tmp))
# 绘制损失曲线
plt.figure()
plt.plot(range(epoch),train_loss,'r')
plt.plot(range(epoch), val_loss, 'b')
plt.legend(['train','val'])
plt.show()
#保存模型
state = {'models':best_model.state_dict()}
torch.save(state,para_dict["modelpara_path"])
模型测试
def test_proc(para_dict,test_data,min_val,max_val):
input_size=para_dict["input_size"]
hidden_size = para_dict["hidden_size"]
num_layers = para_dict["num_layers"]
output_size = para_dict["output_size"]
batch_size = para_dict["batch_size"]
lr = para_dict["lr"]
epoch = para_dict["epoch"]
path = para_dict["modelpara_path"]
model = LSTM(input_size,hidden_size,num_layers,output_size,batch_size)
model.to(device)
print("loading models ......")
model.load_state_dict(torch.load(path)['models'])
model.eval()
pred = []#list
labels = []
for curdata in test_data:
seq, label = curdata
seq = seq.to(device)
label = label.to(device)
with torch.no_grad():
y_pred = model(seq)
for j in range(len(y_pred)):
y = y_pred[j].item()*(max_val-min_val)+min_val
lb = label[j].item()*(max_val-min_val)+min_val
pred.append(y)
labels.append(lb)
errs = np.array(pred)-np.array(labels)
mape = abs(errs/np.array(labels)).sum()/len(errs)
print('预测结果:MAPE={:.3f}%'.format(mape*100))
# 绘制预测对比曲线
plt.figure()
plt.plot(pred,'r')
plt.plot(labels, 'b')
plt.legend(['pred','labels'])
plt.show()
plt.figure()
plt.plot(errs)
plt.show()
主程序
if __name__ == '__main__':
filename = "complete_data.csv"
data,L = read_data(filename)
# 归一化
min_val = min(data.iloc[:,1])
max_val = max(data.iloc[:,1])
data.iloc[:, 1] = (data.iloc[:,1]-min_val)/(max_val-min_val)
train_pro = 0.9
val_pro = 0.95
test_pro = 1
train = data.loc[:len(data)*train_pro,:]
val = data.loc[len(data)*train_pro+1:len(data)*val_pro,:]
test = data.loc[len(data)*val_pro+1:len(data)*test_pro,:]
print("训练集的大小为{}".format(train.shape))
print("验证集的大小为{}".format(val.shape))
print("测试集的大小为{}".format(test.shape))
para_dict = get_parameters()
batch_size = para_dict["batch_size"]
N = para_dict["seq_len"]
train_data_set,train_data = data_loader(train,N,batch_size,True)
print('训练数据导入完毕!')
val_data_set,val_data = data_loader(val, N,batch_size,True)
print('验证数据导入完毕!')
test_data_set,test_data = data_loader(test, N,batch_size,False)
print('测试数据导入完毕!')
print("开始训练")
train_proc(para_dict,train_data,val_data)
print("开始测试")
test_proc(para_dict, test_data,min_val,max_val)
print('训练完毕!!!!')
训练结果
MAPE=1%
训练和验证损失 预测误差
相关技巧
1)用tqdm显示训练进度:from tqdm import tqdm
2)训练验证要采用专用模式,model.train() model.eval()
3) 网络训练与参数更新步骤:计算网络输出->计算损失->清空梯度->损失反向传播->更新参数
4)保存最优模型权重:torch.save(model.state_dict(),path)
5)训练技巧:
(1)根据train_loss和val_loss走向查找问题:
train_loss下降缓慢,增加学习率,考虑归一化;
train_loss下降,val_loss上升,说明过拟合,减小网络规模;
(2)epoch设定50-200,可以更清楚地看到损失趋势变化
其他注意事项
1)to(device)的使用:将tensor或模型移动到指定设备
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)