【深度强化学习】常常使用的pytorch代码
最近又看了一遍《深度强化学习》,和TD3的代码,觉得市面上好多代码写的非常绚丽,但表达的意思,实际的操作确实同一个,再此总结一下这些常见的代码的含义。顺便自己构建一个比较简单易懂的强化学习算法供自己使用。暂时只搭建了部分,欢迎star参考了很多人写的代码,这里先不列举了。torch版本:2.3.1+cu121python版本:3.11.9设计深度强化学习库的思想。
文章目录
前言
最近又看了一遍《深度强化学习》,和TD3的代码,觉得市面上好多代码写的非常绚丽,但表达的意思,实际的操作确实同一个,再此总结一下这些常见的代码的含义。
顺便自己构建一个比较简单易懂的强化学习算法供自己使用。(暂时只搭建了部分,欢迎star)
参考了很多人写的代码,这里先不列举了。
torch版本:2.3.1+cu121
python版本:3.11.9
参考:设计深度强化学习库的思想
tensor张量
基于这样的一个事实,一般环境给出的状态变量的类型为np.float32。
我们需要考虑从numpy数组转换成tensor张量,和张量转换成np数组的最快方式。
【从np.array->tensor张量的最优解】
torch.as_tensor(data_numpy,dtype=torch.float32)
torch.tensor(当数据为浮点时
)默认创建torch.float32类型的张量,且可以指定张量类型,可读性较高。
但是:
使用torch.tensor 会创建一个新的张量,因此会占用更多的内存,torch.as_tensor则会尽可能共享内存,从而实现最快。
## torch.tensor 创建张量
import torch
import numpy as np
# 使用 torch.tensor 创建 float32 类型的张量
tensor = torch.tensor(np.array([1.0, 2.0, 3.0]), dtype=torch.float32)
print(tensor.dtype)
tensor_1d = torch.FloatTensor([1.0, 2.0, 3.0])
print(tensor_1d.dtype)
tensor_as = torch.as_tensor([1.0, 2.0, 3.0], dtype=torch.float32)
print(tensor_as.dtype)
'''
torch.float32
torch.float32
torch.float32
'''
torch.tensor 与torch.as_tensor的区别
data_numpy = np.array([1, 2, 3])
tensor_from_numpy = torch.tensor(data_numpy) ## 当数据不为浮点时,会自动转换为整型
# 使用 torch.as_tensor
tensor_from_numpy_as = torch.as_tensor(data_numpy)
# 检查内存共享
data_numpy[0] = 10
print(tensor_from_numpy) # 输出: tensor([1, 2, 3])
print(tensor_from_numpy_as) # 输出: tensor([10, 2, 3], dtype=torch.int32)
'''
tensor([1, 2, 3], dtype=torch.int32)
tensor([10, 2, 3], dtype=torch.int32)
'''
但是我们实际上确实只需要一份数据,所以这里我们可以采用如下形式
torch.as_tensor(data_numpy,dtype=torch.float32)
实际上在(elegentRL)小雅中认为去掉dtype=torch.float32是最快的,但是在它的库中,实际用的是上述方法,以增加可读性。
【从tensor张量->np.array的最优解】
tensor.detach().cpu().numpy()
这里直接借鉴elegentRL的形式,实际其他代码也有这种写法,下面这种方法最快。
print(tensor.detach().cpu().numpy().dtype) # 输出: float32
tensor.detach().cpu().numpy()
不能用 data,因为这个很旧,功能已被 .detach() 替代
detach() 不让PyTorch框架去追踪张量的梯度,所以在放在最前
cpu() 把张量从GPU显存中传输到CPU内存 numpy() 把张量tensor变成数组array
detach
tensor.detach()
detach 用于使得此张量不参与梯度的运算,一般用于目标网络。
目标网路有一个特点,没有优化器给它。
以下三种等效
tensor = net(tensor.detach()) tensor = net(tensor).detach() with torch.no_grad(): tensor = net(tensor)
这里我会选择第二种,看起来可读性更高,更灵活
调整张量维度reshape、view
reshape、view、transpose、t、permute、faltten、squeeze、unsqueeze
reshape:重塑,无限制,先判断是否连续,会自动调用.contiguous()方法
view:视图调整 ,限制:必须为连续张量
transpose: 转置,限制:必须接受两个参数,交换这两个参数的位置
t:二维转置,限制:必须是二维张量
permute:置换,无限制,根据给定的维度顺序重排张量的维度
flatten:弄平,无限制,压成一维
squeeze:挤压,接受一个参数,去除指定位置 或无参数,去除所有1维维度
unsqueeze:松开 ,限制:必须接受一个参数,在指定位置下加一个一维维度
二维的张量调整
## 调整张量维度
import torch
print('--两维度调整--')
torch_tensor = torch.rand(6, 4) # batch_size X feature_size 6x4
# reshape
new_shape_0 = torch_tensor.reshape(3, -1) # 6x4 -> 3x8
new_shape_1 = torch.reshape(torch_tensor, (3, -1)) # 6x4 -> 3x8
print('reshape:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([3, 8])
# view , view_as
new_shape_0 = torch_tensor.view(3, -1) # 6x4 -> 3x8
new_shape_1 = torch_tensor.view_as(torch.Tensor(3, 8)) # 6x4 -> 3x8 #torch.Tensor(3, 8)为创建一个3x8的张量
print('view:',new_shape_0.shape,'view_as:',new_shape_1.shape) # 输出: torch.Size([3, 8])
# transpose 只能接受两个参数
new_shape_0 = torch_tensor.transpose(0, 1) # 6x4 -> 4x6
new_shape_1 = torch.transpose(torch_tensor, 0, 1) # 6x4 -> 4x6
print('transpose:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([4, 6])
# t 只能用于两维
new_shape_0 = torch_tensor.t() # 6x4 -> 4x6
new_shape_1 = torch.t(torch_tensor) # 6x4 -> 4x6
print('t:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([4, 6])
# permute 改变视图
new_shape_0 = torch_tensor.permute(1, 0) # 6x4 -> 4x6
new_shape_1 = torch.permute(torch_tensor, (1, 0)) # 6x4 -> 4x6
print('permute:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([4, 6])
# flatten 返回新的张量
new_shape_0 = torch_tensor.flatten() # 6x4 -> 24
new_shape_1 = torch.flatten(torch_tensor) # 6x4 -> 24
print('flatten:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([24])
# unsqueeze
new_shape_0 = torch_tensor.unsqueeze(0) # 6x4 -> 1x6x4
new_shape_1 = torch.unsqueeze(torch_tensor, 1) # 6x4 -> 6x1x4
print('unsqueeze:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([1, 6, 4]), torch.Size([6, 1, 4])
# squeeze
new_shape_2 = new_shape_0.squeeze(0) == new_shape_0.squeeze()# 1x6x4 -> 6x4
new_shape_3 = torch.squeeze(new_shape_1) == new_shape_1.squeeze(1) # 6x1x4 -> 6x4
print('squeeze:',new_shape_2.shape,new_shape_3.shape) # 输出: torch.Size([6, 4]), torch.Size([6, 4])
new_tensor = torch.rand(1,1,1)
print(new_tensor,new_tensor.squeeze().shape) # 输出: torch.Size([])
# 尝试对转置后的张量使用 view 函数 --> t(),transpose(),permute()会导致张量不连续 reshape(),flatten(),squeeze(),unsqueeze() 则不会
try:
#tensor_t_view = torch_tensor.t().view(3, 8)
#tensor_t_view = torch_tensor.transpose(0,1).view(3, -1)
tensor_t_view = torch_tensor.permute(1, 0).view(3, -1)
except RuntimeError as e:
print("\nError when using view on non-contiguous tensor:")
print(e)
tensor_t_contiguous = torch_tensor.t().contiguous().view(3, 8)
print(tensor_t_contiguous.shape) # 输出: torch.Size([3, 8])
三维张量调整
print('--三维度调整--')
torch_tensor = torch.rand(6, 3, 4) # batch_size X seq_len X feature_size 6x3x4
# reshape
new_shape_0 = torch_tensor.reshape(2, 2, -1) # 6x3x4 -> 2x2x18
new_shape_1 = torch.reshape(torch_tensor, (2, 3, -1)) # 6x3x4 -> 2x3x12
print('reshape:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([2, 2, 18]), torch.Size([2, 3, 8])
# view , view_as
new_shape_0 = torch_tensor.view(2, 2, -1) # 6x3x4 -> 2x2x18
new_shape_1 = torch_tensor.view_as(torch.Tensor(2, 3, 12)) # 6x3x4 -> 2x3x12 #torch.Tensor(2, 3, 8)为创建一个2x3x8的张量
print('view:',new_shape_0.shape,'view_as:',new_shape_1.shape) # 输出: torch.Size([2, 2, 18]), torch.Size([2, 3, 8])
# transpose
new_shape_0 = torch_tensor.transpose(0, 2) # 6x3x4 -> 4x3x6
new_shape_1 = torch.transpose(torch_tensor, 0, 2) # 6x3x4 -> 4x3x6
print('transpose:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([4, 3, 6]), torch.Size([4, 3, 6])
# permute
new_shape_0 = torch_tensor.permute(2, 1, 0) # 6x3x4 -> 4x3x6
new_shape_1 = torch.permute(torch_tensor, (2, 1, 0)) # 6x3x4 -> 4x3x6
print('permute:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([4, 3, 6]), torch.Size([4, 3, 6])
# flatten
new_shape_0 = torch_tensor.flatten() # 6x3x4 -> 72
new_shape_1 = torch.flatten(torch_tensor) # 6x3x4 -> 72
print('flatten:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([72])
# unsqueeze
new_shape_0 = torch_tensor.unsqueeze(0) # 6x3x4 -> 1x6x3x4
new_shape_1 = torch.unsqueeze(torch_tensor, 1) # 6x3x4 -> 6x1x3x4
print('unsqueeze:',new_shape_0.shape,new_shape_1.shape) # 输出: torch.Size([1, 6, 3, 4]), torch.Size([6, 1, 3, 4])
# squeeze
new_shape_2 = new_shape_0.squeeze(0) == new_shape_0.squeeze()# 1x6x3x4 -> 6x3x4
new_shape_3 = torch.squeeze(new_shape_1) == new_shape_1.squeeze(1) # 6x1x3x4 -> 6x3x4
print('squeeze:',new_shape_2.shape,new_shape_3.shape) # 输出: torch.Size([6, 3, 4]), torch.Size([6, 3, 4])
综上比较:
reshape比view,view_as 更灵活,可以处理非连续的张量,可读性更高,且在性能上几乎一样。
permute vs transpose,t 更灵活,可以处理多个维度
reshape,permute
广播问题
广播原则:官方
一般发生在 target_Q = reward + self.gamma * target_Q * (1 - done)
reward: 4
tagget_Q: 4x1
done: 4
此时两者相加会出现4x4的情况
### 广播原则
import torch
'''
广播原则:从后往前逐个比较两个张量的维度,满足以下条件之一,两个张量才能进行广播
1. 如果张量的维度不同,将维度较小的张量进行扩展,直到两个张量的维度都一样。
2. 对应维度的两个张量,如果某个张量的长度为1,那么可以利用这个张量进行复制来扩展为相同的形状。
'''
A = torch.rand(4, 3)
B = torch.rand(1, 3) # 原则2:对应上下两个张量的维度,A的第一维度4对应B的第一维度1,那么B的第一维度扩展为4
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4, 3])
A = torch.rand(4, 1) # 原则2:在第二维度体现 A->4x3
B = torch.rand(1, 3) # B->4x3
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4, 3])
A = torch.rand(4, 1) # A->4x3
B = torch.rand( 3) # 原则1:B->4x3
Z = A * B # 原则1
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4, 3])
A = torch.rand(5, 2, 1, 1) # A->5x2x3x1
B = torch.rand( 1, 3, 1) # 原则1: B->5x2x3x1
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([5, 2, 3, 1])
# 无法广播
A = torch.rand(4, 3)
B = torch.rand(2, 3) # 无法广播 这里第一维度2 不是1 则无法广播
try:
Z = A * B
print(Z.shape)
Z = A + B
print(Z.shape)
except RuntimeError as e:
print("\nError when broadcasting tensors:")
print(e)
## 广播问题 : 4x1 * 4 = 4x4
'''
一般发生在 target_Q = reward + self.gamma * target_Q * (1 - done)
reward: 4
tagget_Q: 4x1
done: 4
'''
A = torch.rand(4, 1)
B = torch.rand( 4)
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4, 4])
## 广播问题解决1
A = torch.rand(4, 1).squeeze(1) #.flatten()
B = torch.rand( 4)
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4]), torch.Size([4])
## 广播问题解决2
A = torch.rand(4, 1)
B = torch.rand( 4).reshape(-1, 1)
Z = A * B
X = A + B
print(Z.shape,X.shape) # 输出: torch.Size([4, 1]), torch.Size([4, 1])
'''
这里选择解决2,因为后续算法中如TD3需要2个q值,如果解决1则需要两个squeeze
'''
这里选择解决2,因为后续算法中如TD3需要2个q值,如果解决1则需要两个squeeze。
即将reward的一维张量拓展reshape为2维。
reshape
max,argmax
同理min,argmin
.max(dim = 1)[0] argmax(dim = -1)
max在显式的选择最大动作的价值时会用到
.max(dim = 1)[0]是返回最大值,[1]是返回索引
## max,min,argmax,argmin
import torch
import numpy as np
torch_tensor = torch.rand(6, 4)
# max
max_val_0 = torch.max(torch_tensor) # 返回所有元素中的最大值
max_val_1 = torch_tensor.max() # 返回所有元素中的最大值
print('max:',max_val_0,max_val_1) # 输出: tensor(0.9977), tensor(0.9977)
max_val_0 = torch.max(torch_tensor, dim=1) # 返回每行的最大值和索引
max_val_1 = torch_tensor.max(dim=1) # 返回每行的最大值和索引
print('max:',max_val_1.values,max_val_1.indices)
'''
max_val_1.values 常见为 tensor.max(1)[0] #6x4 -> 6
max_val_1.indices 常见为 tensor.max(dim=1)[1]
'''
# argmax
argmax_val_0 = torch.argmax(torch_tensor) # 返回所有元素中的最大值的索引
argmax_val_1 = torch_tensor.argmax() # 返回所有元素中的最大值的索引
print('argmax:',argmax_val_0,argmax_val_1) # 输出: tensor(13), tensor(13)
argmax_val_0 = torch.argmax(torch_tensor, dim=1) # 返回每行的最大值的索引
argmax_val_1 = torch_tensor.argmax(dim=1) # 返回每行的最大值的索引 #6x4 -> 6
print('argmax:',argmax_val_1,argmax_val_1.shape) # 输出: tensor([1, 3, 0, 1, 1, 3])
argmax_val_1 = torch_tensor.argmax(dim=1,keepdim=True) # 6x4 -> 6x1 #keepdim=True 表示不改变维度
print('argmax:',argmax_val_1,argmax_val_1.shape)
gather
self.agent.Qnet(state).gather(1, action.long())
gather:收集
离散空间下,为了得到当前Q值,所应用到的函数
# gather
'''
gather(input, dim, index, out=None) → Tensor
input (Tensor) – 源张量
dim (int) – 索引的轴
index (LongTensor) – 包含索引的张量
out (Tensor, optional) – 目标张量
即搜集input在dim维度上的index索引的值,返回到out张量上
'''
torch_tensor = torch.rand(6, 4)
print(torch_tensor)
index = torch.LongTensor([0, 2, 3, 1, 1, 3]) # size = 6
gather_val = torch.gather(torch_tensor, 1, index.reshape(-1,1)) # 6x4 -> 6x1
print('gather:',gather_val)
'''
即在6行4列 行代表batch_size 列代表act_dim
在列的维度上取出index对应的值
第一行选择第0列的值,第二行选择第2列的值,第三行选择第3列的值...
'''
gather_val = torch_tensor.gather(1, index.reshape(-1,1)) # 6x4 -> 6x1
'''
注意 index 必须为LongTensor类型
index 张量的形状必须与 input 张量的形状在非收集维度上一致。
index 张量的元素必须在 input 张量的有效索引范围内。
'''
注意 index 必须为LongTensor类型
cat,stack
cat([s,a],dim = 1) stack([s,s],dim= -1)
cat:拼接,默认dim = 0
stack:堆叠 ,默认dim = 0
cat 多见于 在critic网路更新时,对state 和 action 的拼接
stack 多见于 多智能体在同一个维度上的log概率密度求和
# cat
'''
cat(tensors, dim=0) → Tensor
tensors (sequence of Tensors) – 要连接的张量序列
dim (int) – 要连接的维度
'''
torch_tensor = torch.rand(6, 4)
cat_val = torch.cat([torch_tensor, torch_tensor], dim=0) # 6x4 -> 12x4 # 默认dim = 0
print('cat:',cat_val,cat_val.shape)
# stack
'''
stack(tensors, dim=0) → Tensor
tensors (sequence of Tensors) – 要连接的张量序列
dim (int) – 要连接的维度
'''
torch_tensor = torch.rand(6, 4)
stack_val = torch.stack([torch_tensor, torch_tensor], dim=0) # 2x6x4 # 默认dim = 0
print('stack:',stack_val,stack_val.shape)
------2024.9.10更新------
torch.distributions.Normal ,torch.normal
normal_dist = tdist.Normal(mu, sigma)
# torch.distributions.Normal 和 torch.normal 的区别
'''
torch.distributions.Normal 是一个概率分布对象,可以用来生成服从正态分布的随机数样本。
torch.normal 是一个函数,用来生成服从正态分布的随机数样本。
'''
import torch
import torch.distributions as tdist
torch.manual_seed(0)
mu = torch.tensor([0.0, 0.0])
sigma = torch.tensor([1.0, 1.0])
# torch.distributions.Normal
normal_dist = tdist.Normal(mu, sigma)
sample = normal_dist.sample()
print(sample) # tensor([ 1.5410, -0.2934])
# torch.normal
sample = torch.normal(mu, sigma)
print(sample) # tensor([ 1.5410, -0.2934]) 注释掉上两行 则输出相同
## torch.distributions.Normal 独有
log_prob = normal_dist.log_prob(sample)
print(log_prob)
dist_entropy = normal_dist.entropy()
print(dist_entropy)
repeat,expand,expand_as
torch_tensor_2.expand(-1, 24)
# repeat 和 expand 的用法
# repeat
import torch
torch_tensor = torch.rand(6, 4)
repeated_tensor = torch_tensor.repeat(1,2) # 第一维重复1次,第二维重复2次
print(repeated_tensor.shape)
# expand 只在尺寸为1 的维度上有效
torch_tensor_2 = torch.rand(6, 1)
expanded_tensor = torch_tensor_2.expand(-1, 24)
print(expanded_tensor.shape)
# expand_as 只在尺寸为1 的维度上有效
expanded_tensor = torch_tensor_2.expand_as(torch_tensor)
print(expanded_tensor.shape)
'''
torch.Size([6, 8])
torch.Size([6, 24])
torch.Size([6, 4])
'''
由于expand的计算开销更小,这里我选用expand来实现。
其他细节
关于选择动作的神经网络输入
神经网络输入和输出时 尽量选用二维输入,二位输出,以加快网络更新
即这里self.agent.Qnet(ob)
的ob,ob以一维输入也可以,二维输入也可以。
但是pytorch是默认神经网络输入是二维的,且本身pytorch中对矩阵的优化比较好,所以选用二维作为输入,
测试过一次,在使用gpu或cpu的情况下,二维的输入比一维的输入快了1/4倍。(二维所花的时间是1维的3/4。)
所以这里在神经网络输入和输出时 尽量选用二维输入,二位输出,以加快网络更新。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)