torch.nn.LSTM()

1. 输入的参数列表说明:

input_size:输入数据的特征维度,(单变量=1,embedding=【词向量的表示维度】)
hidden_size:LSTM隐层的维度
num_layers:循环神经网络的层数 1或者2
batch_first:通常默认为False,输入的数据shape=(time_steps,batch_size,embedding)
batch_first=True,则输入的数据shape=(batch_size,time_steps,embedding)

2.forward(参数说明)

  1. input:
    batch_first=True,则输入的数据shape=(batch_size,time_steps,embedding)
    batch_first=False,则输入的数据shape=(time_steps,batch_size,embedding)
  2. h_0
  3. c_0

3.输出

  1. output:
    batch_first=True,则输入的数据shape=(batch_size,time_steps,hidden)
    batch_first=False,则输入的数据shape=(time_steps,batch_size,hidden)

4. 代码说明:

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        self.num_directions = 1 # 单向LSTM
        self.batch_size = batch_size
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
        self.linear = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input_seq):
        
        h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
        c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
       
        # input_seq(batch_size, time_steps,input_size)
        output, _ = self.lstm(input_seq, (h_0, c_0))
        # output(batch_size, time_steps, num_directions * hidden_size)
            
        pred = self.linear(output)  
        # pred(batch_size, time_steps, output_size)

        pred = pred[:, -1, :]  
        #输出作为预测,只取pred中的time_steps列的最后一个值,所以pred.shape=(batch_size,1)
        
        return pred

说明:
input_size:每个时间步骤的输入特征数。
hidden_size:每个LSTM层中的隐藏单元数。
num_layers:堆叠的LSTM层数。
output_size:输出的大小(例如,预测值的数量)。

5. Pytorch框架中LSTM算法实现单变量单步预测实战:

问题描述:利用XXX,对XXX进行预测,其中使用前12天(time_stpes=window_size = 12)的数据预测未来1天的数据

5.1数据说明:

在csv文档中,第一轮为DATE,第二列为我们要预测的数据S4248SM144NCEN

DATE,S4248SM144NCEN
1992-01-01,3459
1992-02-01,3458
1992-03-01,4002
1992-04-01,4564
1992-05-01,4221
1992-06-01,4529
1992-07-01,4466
1992-08-01,4137
1992-09-01,4126
1992-10-01,4259
1992-11-01,4240
1992-12-01,4936
1993-01-01,3031
1993-02-01,3261
1993-03-01,4160
1993-04-01,4377
1993-05-01,4307
1993-06-01,4696

5.2 数据预处理

因为pytorch框架中LSTM模型的输入和输出有一个的shape要求,因此必须对csv数据进行预处理。

# 导入酒精销售数据
df = pd.read_csv('data\Alcohol_Sales.csv',index_col=0,parse_dates=True)
len(df)
df.head()  # 观察数据集,这是一个单变量时间序列

y = df['S4248SM144NCEN'].values.astype(float)
# print(len(y))  #325条数据

test_size = 12
# 划分训练和测试集,最后12个值作为测试集
train_set = y[:-test_size]  #323条数据
test_set = y[-test_size:] #12条数据
# print(train_set.shape) #(313,) 一位数组

# 归一化至[-1,1]区间,为了获得更好的训练效果
scaler = MinMaxScaler(feature_range=(-1, 1))
#scaler.fit_transform输入必须是二维的,但是train_set却是一个一维,所有实验reshape(-1,1)
train_norm = scaler.fit_transform(train_set.reshape(-1, 1)) #np.reshape(-1, 1)=1,行未知
# print(train_norm.shape) #(313, 1) 这里将一维数据转化为二维

# 转换成 tensor
train_norm = torch.FloatTensor(train_norm).view(-1) 
print(train_norm.shape) #torch.Size([313])

# 定义时间窗口,注意和前面的test size不是一个概念
window_size = 12
# 这个函数的目的是为了从原时间序列中抽取出训练样本,也就是用第一个值到第十二个值作为X输入,预测第十三个值作为y输出,这是一个用于训练的数据点,时间窗口向后滑动以此类推
def input_data(seq,ws):  
    out = []
    L = len(seq)
    for i in range(L-ws):
        window = seq[i:i+ws]
        label = seq[i+ws:i+ws+1]
        out.append((window,label))  #将x和y以tensor格式放入到out列表当中,    
    return out
train_data = input_data(train_norm,window_size)
len(train_data)  # 等于325(原始数据集长度)-12(测试集长度)-12(时间窗口)

5.3 LSTM模型训练

class LSTMnetwork(nn.Module):
    def __init__(self,input_size=1,hidden_size=100,output_size=1):
        super().__init__()
        self.hidden_size = hidden_size
        # 定义LSTM层
        self.lstm = nn.LSTM(input_size,hidden_size)
        # 定义全连接层
        self.linear = nn.Linear(hidden_size,output_size)
        # 初始化h0,c0
        self.hidden = (torch.zeros(1,1,self.hidden_size),
                       torch.zeros(1,1,self.hidden_size))

    def forward(self,seq):
        # 前向传播的过程是输入->LSTM层->全连接层->输出

        # https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html?highlight=lstm#torch.nn.LSTM
        # 在观察查看LSTM输入的维度,LSTM的第一个输入input_size维度是(L, N, H_in), L是序列长度,N是batch size,H_in是输入尺寸,也就是变量个数
        # LSTM的第二个输入是一个元组,包含了h0,c0两个元素,这两个元素的维度都是(D∗num_layers,N,H_out),D=1表示单向网络,num_layers表示多少个LSTM层叠加,N是batch size,H_out表示隐层神经元个数
        
        '''
        pytorch中LSTM输入为[time_step,batch,feature],这里窗口time_step=12,feature=1[1维数据],batch我们这里设置为1
        所以使用seq.view(len(seq),1,-1)将tensor[12]数据转化为tensor[12,1,1]
        '''
        lstm_out, self.hidden = self.lstm(seq.view(len(seq),1,-1), self.hidden)        
        # print(lstm_out) #torch.Size([12, 1, 100]) [time_step,batch,hidden]        
        # print(lstm_out.view(len(seq),-1))  #[12,100]
        pred = self.linear(lstm_out.view(len(seq),-1)) 
        
        # print(pred) #torch.Size([12, 1])
        return pred[-1]  # 输出只用取最后一个值

5.4 数据训练

torch.manual_seed(101)
model = LSTMnetwork()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 100
start_time = time.time()
for epoch in range(epochs):
    for seq, y_train in train_data:      
        # 每次更新参数前都梯度归零和初始化
        optimizer.zero_grad()
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))
        y_pred = model(seq) 
        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    print(f'Epoch: {epoch+1:2} Loss: {loss.item():10.8f}')
print(f'\nDuration: {time.time() - start_time:.0f} seconds')

5.4 数据测试

future = 12

# 选取序列最后12个值开始预测
preds = train_norm[-window_size:].tolist()

# 设置成eval模式
model.eval()
# 循环的每一步表示向时间序列向后滑动一格
for i in range(future):
    
    seq = torch.FloatTensor(preds[-window_size:]) #第下一次循环的时候,seq总是能取到后12个数据,因此及时后面用pred.append()也还是每次用到最新的预测数据完成下一次的预测。
    
    with torch.no_grad():
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))
        """
        item理解:取出张量具体位置的元素元素值,并且返回的是该位置元素值的高精度值,保持原元素类型不变;必须指定位置
            即:原张量元素为整形,则返回整形,原张量元素为浮点型则返回浮点型,etc.
        """
        # print(model(seq),model(seq).item())  #tensor([0.1027]), tensor([0.1026])
        preds.append(model(seq).item())  #每循环一次,这里会将新的预测值添加到pred中,

# 逆归一化还原真实值
true_predictions = scaler.inverse_transform(np.array(preds[window_size:]).reshape(-1, 1))

# 对比真实值和预测值
plt.figure(figsize=(12,4))
plt.grid(True)
plt.plot(df['S4248SM144NCEN'])
x = np.arange('2018-02-01', '2019-02-01', dtype='datetime64[M]').astype('datetime64[D]')

plt.plot(x,true_predictions)
plt.show()

# 放大看
fig = plt.figure(figsize=(12,4))
plt.grid(True)
fig.autofmt_xdate()

plt.plot(df['S4248SM144NCEN']['2017-01-01':])
plt.plot(x,true_predictions)
plt.show()

5.2 数据预测

# 重新开始训练
epochs = 100
# 切回到训练模式
model.train()
y_norm = scaler.fit_transform(y.reshape(-1, 1))
y_norm = torch.FloatTensor(y_norm).view(-1)
all_data = input_data(y_norm,window_size)


start_time = time.time()

for epoch in range(epochs):

    for seq, y_train in all_data:  

        optimizer.zero_grad()
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))

        y_pred = model(seq)

        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    print(f'Epoch: {epoch+1:2} Loss: {loss.item():10.8f}')

print(f'\nDuration: {time.time() - start_time:.0f} seconds')

# 重新预测
window_size = 12
future = 12
L = len(y)

preds = y_norm[-window_size:].tolist()

model.eval()
for i in range(future):  
    seq = torch.FloatTensor(preds[-window_size:])
    with torch.no_grad():

        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))  
        preds.append(model(seq).item())


true_predictions = scaler.inverse_transform(np.array(preds).reshape(-1, 1))


x = np.arange('2019-02-01', '2020-02-01', dtype='datetime64[M]').astype('datetime64[D]')

plt.figure(figsize=(12,4))
plt.grid(True)
plt.plot(df['S4248SM144NCEN'])
plt.plot(x,true_predictions[window_size:])
plt.show()

6.完整代码

import torch
import torch.nn as nn

from sklearn.preprocessing import MinMaxScaler
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


# 导入酒精销售数据
df = pd.read_csv('data\Alcohol_Sales.csv',index_col=0,parse_dates=True)
len(df)

df.head()  # 观察数据集,这是一个单变量时间序列

plt.figure(figsize=(12,4))
plt.grid(True)
plt.plot(df['S4248SM144NCEN'])
plt.show()


y = df['S4248SM144NCEN'].values.astype(float)

# print(len(y))  #325条数据

test_size = 12

# 划分训练和测试集,最后12个值作为测试集
train_set = y[:-test_size]  #323条数据
test_set = y[-test_size:] #12条数据

# print(train_set.shape) #(313,) 一位数组

# 归一化至[-1,1]区间,为了获得更好的训练效果
scaler = MinMaxScaler(feature_range=(-1, 1))
#scaler.fit_transform输入必须是二维的,但是train_set却是一个一维,所有实验reshape(-1,1)
train_norm = scaler.fit_transform(train_set.reshape(-1, 1)) #np.reshape(-1, 1)=1,行未知

# print(train_norm.shape) #(313, 1) 这里将一维数据转化为二维

# 转换成 tensor
train_norm = torch.FloatTensor(train_norm).view(-1) 
print(train_norm.shape) #torch.Size([313])


# 定义时间窗口,注意和前面的test size不是一个概念
window_size = 12

# 这个函数的目的是为了从原时间序列中抽取出训练样本,也就是用第一个值到第十二个值作为X输入,预测第十三个值作为y输出,这是一个用于训练的数据点,时间窗口向后滑动以此类推
def input_data(seq,ws):  
    out = []
    L = len(seq)
    for i in range(L-ws):
        window = seq[i:i+ws]
        label = seq[i+ws:i+ws+1]
        out.append((window,label))  #将x和y以tensor格式放入到out列表当中,    
    return out

train_data = input_data(train_norm,window_size)
len(train_data)  # 等于325(原始数据集长度)-12(测试集长度)-12(时间窗口)
   
class LSTMnetwork(nn.Module):
    def __init__(self,input_size=1,hidden_size=100,output_size=1):
        super().__init__()
        self.hidden_size = hidden_size
        # 定义LSTM层
        self.lstm = nn.LSTM(input_size,hidden_size)
        # 定义全连接层
        self.linear = nn.Linear(hidden_size,output_size)
        # 初始化h0,c0
        self.hidden = (torch.zeros(1,1,self.hidden_size),
                       torch.zeros(1,1,self.hidden_size))

    def forward(self,seq):
        # 前向传播的过程是输入->LSTM层->全连接层->输出

        # https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html?highlight=lstm#torch.nn.LSTM
        # 在观察查看LSTM输入的维度,LSTM的第一个输入input_size维度是(L, N, H_in), L是序列长度,N是batch size,H_in是输入尺寸,也就是变量个数
        # LSTM的第二个输入是一个元组,包含了h0,c0两个元素,这两个元素的维度都是(D∗num_layers,N,H_out),D=1表示单向网络,num_layers表示多少个LSTM层叠加,N是batch size,H_out表示隐层神经元个数
        
        '''
        pytorch中LSTM输入为[time_step,batch,feature],这里窗口time_step=12,feature=1[1维数据],batch我们这里设置为1
        所以使用seq.view(len(seq),1,-1)将tensor[12]数据转化为tensor[12,1,1]
        '''
        lstm_out, self.hidden = self.lstm(seq.view(len(seq),1,-1), self.hidden)        
        # print(lstm_out) #torch.Size([12, 1, 100]) [time_step,batch,hidden]        
        # print(lstm_out.view(len(seq),-1))  #[12,100]
        pred = self.linear(lstm_out.view(len(seq),-1)) 
        
        # print(pred) #torch.Size([12, 1])
        return pred[-1]  # 输出只用取最后一个值
    
torch.manual_seed(101)
model = LSTMnetwork()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 100
start_time = time.time()
for epoch in range(epochs):

    for seq, y_train in train_data:
        
        # 每次更新参数前都梯度归零和初始化
        optimizer.zero_grad()
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))

        y_pred = model(seq)
        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    print(f'Epoch: {epoch+1:2} Loss: {loss.item():10.8f}')

print(f'\nDuration: {time.time() - start_time:.0f} seconds')

future = 12

# 选取序列最后12个值开始预测
preds = train_norm[-window_size:].tolist()

# 设置成eval模式
model.eval()
# 循环的每一步表示向时间序列向后滑动一格
for i in range(future):
    
    seq = torch.FloatTensor(preds[-window_size:]) #第下一次循环的时候,seq总是能取到后12个数据,因此及时后面用pred.append()也还是每次用到最新的预测数据完成下一次的预测。
    
    with torch.no_grad():
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))
        """
        item理解:取出张量具体位置的元素元素值,并且返回的是该位置元素值的高精度值,保持原元素类型不变;必须指定位置
            即:原张量元素为整形,则返回整形,原张量元素为浮点型则返回浮点型,etc.
        """
        # print(model(seq),model(seq).item())  #tensor([0.1027]), tensor([0.1026])
        preds.append(model(seq).item())  #每循环一次,这里会将新的预测值添加到pred中,

# 逆归一化还原真实值
true_predictions = scaler.inverse_transform(np.array(preds[window_size:]).reshape(-1, 1))

# 对比真实值和预测值
plt.figure(figsize=(12,4))
plt.grid(True)
plt.plot(df['S4248SM144NCEN'])
x = np.arange('2018-02-01', '2019-02-01', dtype='datetime64[M]').astype('datetime64[D]')

plt.plot(x,true_predictions)
plt.show()

# 放大看
fig = plt.figure(figsize=(12,4))
plt.grid(True)
fig.autofmt_xdate()

plt.plot(df['S4248SM144NCEN']['2017-01-01':])
plt.plot(x,true_predictions)
plt.show()


# 重新开始训练
epochs = 100
# 切回到训练模式
model.train()
y_norm = scaler.fit_transform(y.reshape(-1, 1))
y_norm = torch.FloatTensor(y_norm).view(-1)
all_data = input_data(y_norm,window_size)


start_time = time.time()

for epoch in range(epochs):

    for seq, y_train in all_data:  

        optimizer.zero_grad()
        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))

        y_pred = model(seq)

        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    print(f'Epoch: {epoch+1:2} Loss: {loss.item():10.8f}')

print(f'\nDuration: {time.time() - start_time:.0f} seconds')

# 重新预测
window_size = 12
future = 12
L = len(y)

preds = y_norm[-window_size:].tolist()

model.eval()
for i in range(future):  
    seq = torch.FloatTensor(preds[-window_size:])
    with torch.no_grad():

        model.hidden = (torch.zeros(1,1,model.hidden_size),
                        torch.zeros(1,1,model.hidden_size))  
        preds.append(model(seq).item())


true_predictions = scaler.inverse_transform(np.array(preds).reshape(-1, 1))


x = np.arange('2019-02-01', '2020-02-01', dtype='datetime64[M]').astype('datetime64[D]')

plt.figure(figsize=(12,4))
plt.grid(True)
plt.plot(df['S4248SM144NCEN'])
plt.plot(x,true_predictions[window_size:])
plt.show()

参考全部的代码:https://github.com/skywateryang/timeseries101

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐