前言

最近又看了一遍《深度强化学习》,和TD3的代码,觉得市面上好多代码写的非常绚丽,但表达的意思,实际的操作确实同一个,再此总结一下这些常见的代码的含义。
顺便自己构建一个比较简单易懂的强化学习算法供自己使用。(暂时只搭建了部分,欢迎star)
参考了很多人写的代码,这里先不列举了。

torch版本:2.3.1+cu121
python版本:3.11.9

参考:设计深度强化学习库的思想

tensor张量

官方文档

基于这样的一个事实,一般环境给出的状态变量的类型为np.float32。
我们需要考虑从numpy数组转换成tensor张量,和张量转换成np数组的最快方式。

【从np.array->tensor张量的最优解】

torch.as_tensor(data_numpy,dtype=torch.float32)

torch.tensor(当数据为浮点时)默认创建torch.float32类型的张量,且可以指定张量类型,可读性较高。
但是:
在这里插入图片描述
使用torch.tensor 会创建一个新的张量,因此会占用更多的内存,torch.as_tensor则会尽可能共享内存,从而实现最快。

## torch.tensor 创建张量

import torch
import numpy as np
# 使用 torch.tensor 创建 float32 类型的张量
tensor = torch.tensor(np.array([1.0, 2.0, 3.0]), dtype=torch.float32)
print(tensor.dtype)

tensor_1d = torch.FloatTensor([1.0, 2.0, 3.0])
print(tensor_1d.dtype)

tensor_as = torch.as_tensor([1.0, 2.0, 3.0], dtype=torch.float32)
print(tensor_as.dtype)

'''
torch.float32
torch.float32
torch.float32
'''

torch.tensor 与torch.as_tensor的区别

data_numpy = np.array([1, 2, 3])
tensor_from_numpy = torch.tensor(data_numpy) ## 当数据不为浮点时,会自动转换为整型

# 使用 torch.as_tensor
tensor_from_numpy_as = torch.as_tensor(data_numpy)

# 检查内存共享
data_numpy[0] = 10
print(tensor_from_numpy)  # 输出: tensor([1, 2, 3])
print(tensor_from_numpy_as)  # 输出: tensor([10, 2, 3], dtype=torch.int32)

'''
tensor([1, 2, 3], dtype=torch.int32)
tensor([10,  2,  3], dtype=torch.int32)
'''

但是我们实际上确实只需要一份数据,所以这里我们可以采用如下形式

torch.as_tensor(data_numpy,dtype=torch.float32)

实际上在(elegentRL)小雅中认为去掉dtype=torch.float32是最快的,但是在它的库中,实际用的是上述方法,以增加可读性。

【从tensor张量->np.array的最优解】

tensor.detach().cpu().numpy()

这里直接借鉴elegentRL的形式,实际其他代码也有这种写法,下面这种方法最快。

print(tensor.detach().cpu().numpy().dtype)  # 输出: float32

tensor.detach().cpu().numpy()
不能用 data,因为这个很旧,功能已被 .detach() 替代
detach() 不让PyTorch框架去追踪张量的梯度,所以在放在最前
cpu() 把张量从GPU显存中传输到CPU内存 numpy() 把张量tensor变成数组array

detach

tensor.detach()

detach 用于使得此张量不参与梯度的运算,一般用于目标网络。
目标网路有一个特点,没有优化器给它。

以下三种等效

tensor = net(tensor.detach())   
tensor = net(tensor).detach()  
with  torch.no_grad():
    tensor = net(tensor)

这里我会选择第二种,看起来可读性更高,更灵活

调整张量维度reshape、view

reshape、view、transpose、t、permute、faltten、squeeze、unsqueeze

reshape:重塑,无限制,先判断是否连续,会自动调用.contiguous()方法
view:视图调整 ,限制:必须为连续张量
transpose: 转置,限制:必须接受两个参数,交换这两个参数的位置
t:二维转置,限制:必须是二维张量
permute:置换,无限制,根据给定的维度顺序重排张量的维度
flatten:弄平,无限制,压成一维
squeeze:挤压,接受一个参数,去除指定位置 或无参数,去除所有1维维度
unsqueeze:松开 ,限制:必须接受一个参数,在指定位置下加一个一维维度

二维的张量调整

## 调整张量维度
import torch

print('--两维度调整--')
torch_tensor = torch.rand(6, 4) # batch_size X  feature_size  6x4
# reshape 
new_shape_0 = torch_tensor.reshape(3, -1) # 6x4 -> 3x8
new_shape_1 = torch.reshape(torch_tensor, (3, -1)) # 6x4 -> 3x8
print('reshape:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([3, 8])
# view , view_as
new_shape_0 = torch_tensor.view(3, -1) # 6x4 -> 3x8
new_shape_1 = torch_tensor.view_as(torch.Tensor(3, 8)) # 6x4 -> 3x8  #torch.Tensor(3, 8)为创建一个3x8的张量
print('view:',new_shape_0.shape,'view_as:',new_shape_1.shape)  # 输出: torch.Size([3, 8])
# transpose 只能接受两个参数
new_shape_0 = torch_tensor.transpose(0, 1) # 6x4 -> 4x6
new_shape_1 = torch.transpose(torch_tensor, 0, 1) # 6x4 -> 4x6
print('transpose:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([4, 6])
# t 只能用于两维
new_shape_0 = torch_tensor.t() # 6x4 -> 4x6
new_shape_1 = torch.t(torch_tensor) # 6x4 -> 4x6
print('t:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([4, 6])
# permute 改变视图
new_shape_0 = torch_tensor.permute(1, 0) # 6x4 -> 4x6
new_shape_1 = torch.permute(torch_tensor, (1, 0)) # 6x4 -> 4x6
print('permute:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([4, 6])
# flatten 返回新的张量
new_shape_0 = torch_tensor.flatten() # 6x4 -> 24
new_shape_1 = torch.flatten(torch_tensor) # 6x4 -> 24
print('flatten:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([24])
# unsqueeze
new_shape_0 = torch_tensor.unsqueeze(0) # 6x4 -> 1x6x4
new_shape_1 = torch.unsqueeze(torch_tensor, 1) # 6x4 -> 6x1x4
print('unsqueeze:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([1, 6, 4]), torch.Size([6, 1, 4])
# squeeze
new_shape_2 = new_shape_0.squeeze(0) == new_shape_0.squeeze()# 1x6x4 -> 6x4
new_shape_3 = torch.squeeze(new_shape_1) ==  new_shape_1.squeeze(1) # 6x1x4 -> 6x4
print('squeeze:',new_shape_2.shape,new_shape_3.shape)  # 输出: torch.Size([6, 4]), torch.Size([6, 4])

new_tensor = torch.rand(1,1,1)
print(new_tensor,new_tensor.squeeze().shape)  # 输出: torch.Size([])

# 尝试对转置后的张量使用 view 函数 --> t(),transpose(),permute()会导致张量不连续 reshape(),flatten(),squeeze(),unsqueeze() 则不会
try:
    #tensor_t_view = torch_tensor.t().view(3, 8)
    #tensor_t_view = torch_tensor.transpose(0,1).view(3, -1)
    tensor_t_view = torch_tensor.permute(1, 0).view(3, -1)
except RuntimeError as e:
    print("\nError when using view on non-contiguous tensor:")
    print(e)
tensor_t_contiguous = torch_tensor.t().contiguous().view(3, 8)
print(tensor_t_contiguous.shape)  # 输出: torch.Size([3, 8])

三维张量调整

print('--三维度调整--')
torch_tensor = torch.rand(6, 3, 4) # batch_size X seq_len X feature_size  6x3x4
# reshape
new_shape_0 = torch_tensor.reshape(2, 2, -1) # 6x3x4 -> 2x2x18
new_shape_1 = torch.reshape(torch_tensor, (2, 3, -1)) # 6x3x4 -> 2x3x12
print('reshape:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([2, 2, 18]), torch.Size([2, 3, 8])
# view , view_as
new_shape_0 = torch_tensor.view(2, 2, -1) # 6x3x4 -> 2x2x18
new_shape_1 = torch_tensor.view_as(torch.Tensor(2, 3, 12)) # 6x3x4 -> 2x3x12  #torch.Tensor(2, 3, 8)为创建一个2x3x8的张量
print('view:',new_shape_0.shape,'view_as:',new_shape_1.shape)  # 输出: torch.Size([2, 2, 18]), torch.Size([2, 3, 8])
# transpose
new_shape_0 = torch_tensor.transpose(0, 2) # 6x3x4 -> 4x3x6
new_shape_1 = torch.transpose(torch_tensor, 0, 2) # 6x3x4 -> 4x3x6
print('transpose:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([4, 3, 6]), torch.Size([4, 3, 6])
# permute
new_shape_0 = torch_tensor.permute(2, 1, 0) # 6x3x4 -> 4x3x6
new_shape_1 = torch.permute(torch_tensor, (2, 1, 0)) # 6x3x4 -> 4x3x6
print('permute:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([4, 3, 6]), torch.Size([4, 3, 6])
# flatten
new_shape_0 = torch_tensor.flatten() # 6x3x4 -> 72
new_shape_1 = torch.flatten(torch_tensor) # 6x3x4 -> 72
print('flatten:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([72])
# unsqueeze
new_shape_0 = torch_tensor.unsqueeze(0) # 6x3x4 -> 1x6x3x4
new_shape_1 = torch.unsqueeze(torch_tensor, 1) # 6x3x4 -> 6x1x3x4
print('unsqueeze:',new_shape_0.shape,new_shape_1.shape)  # 输出: torch.Size([1, 6, 3, 4]), torch.Size([6, 1, 3, 4])
# squeeze
new_shape_2 = new_shape_0.squeeze(0) == new_shape_0.squeeze()# 1x6x3x4 -> 6x3x4
new_shape_3 = torch.squeeze(new_shape_1) ==  new_shape_1.squeeze(1) # 6x1x3x4 -> 6x3x4
print('squeeze:',new_shape_2.shape,new_shape_3.shape)  # 输出: torch.Size([6, 3, 4]), torch.Size([6, 3, 4])

综上比较:

reshape比view,view_as 更灵活,可以处理非连续的张量,可读性更高,且在性能上几乎一样。
permute vs transpose,t 更灵活,可以处理多个维度

reshape,permute

广播问题

广播原则:官方
一般发生在 target_Q = reward + self.gamma * target_Q * (1 - done)
reward: 4
tagget_Q: 4x1
done: 4
此时两者相加会出现4x4的情况

### 广播原则
import torch
'''
广播原则:从后往前逐个比较两个张量的维度,满足以下条件之一,两个张量才能进行广播
1. 如果张量的维度不同,将维度较小的张量进行扩展,直到两个张量的维度都一样。
2. 对应维度的两个张量,如果某个张量的长度为1,那么可以利用这个张量进行复制来扩展为相同的形状。

'''
A = torch.rand(4, 3)
B = torch.rand(1, 3) # 原则2:对应上下两个张量的维度,A的第一维度4对应B的第一维度1,那么B的第一维度扩展为4
Z = A * B   
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4, 3])

A = torch.rand(4, 1) # 原则2:在第二维度体现 A->4x3
B = torch.rand(1, 3) # B->4x3
Z = A * B 
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4, 3])

A = torch.rand(4, 1) # A->4x3
B = torch.rand(   3) # 原则1:B->4x3
Z = A * B # 原则1
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4, 3])

A = torch.rand(5, 2, 1, 1) # A->5x2x3x1
B = torch.rand(   1, 3, 1) # 原则1: B->5x2x3x1
Z = A * B
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([5, 2, 3, 1])

# 无法广播
A = torch.rand(4, 3)
B = torch.rand(2, 3) # 无法广播 这里第一维度2 不是1 则无法广播
try:
    Z = A * B
    print(Z.shape)
    Z = A + B
    print(Z.shape)
except RuntimeError as e:
    print("\nError when broadcasting tensors:")
    print(e)
 
## 广播问题 : 4x1 * 4 = 4x4
'''
一般发生在 target_Q = reward + self.gamma * target_Q * (1 - done)
reward: 4
tagget_Q: 4x1
done: 4
'''

A = torch.rand(4, 1)
B = torch.rand(   4)    

Z = A * B
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4, 4])

## 广播问题解决1
A = torch.rand(4, 1).squeeze(1) #.flatten()
B = torch.rand(   4)

Z = A * B
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4]), torch.Size([4])

## 广播问题解决2
A = torch.rand(4, 1)
B = torch.rand(   4).reshape(-1, 1)
Z = A * B
X = A + B
print(Z.shape,X.shape)  # 输出: torch.Size([4, 1]), torch.Size([4, 1])

'''
这里选择解决2,因为后续算法中如TD3需要2个q值,如果解决1则需要两个squeeze
'''

这里选择解决2,因为后续算法中如TD3需要2个q值,如果解决1则需要两个squeeze。
即将reward的一维张量拓展reshape为2维。

reshape

max,argmax

同理min,argmin

.max(dim = 1)[0] argmax(dim = -1)

max在显式的选择最大动作的价值时会用到
.max(dim = 1)[0]是返回最大值,[1]是返回索引

## max,min,argmax,argmin
import torch
import numpy as np

torch_tensor = torch.rand(6, 4)
# max
max_val_0 = torch.max(torch_tensor) # 返回所有元素中的最大值
max_val_1 = torch_tensor.max() # 返回所有元素中的最大值
print('max:',max_val_0,max_val_1)  # 输出: tensor(0.9977), tensor(0.9977)
max_val_0 = torch.max(torch_tensor, dim=1) # 返回每行的最大值和索引
max_val_1 = torch_tensor.max(dim=1) # 返回每行的最大值和索引
print('max:',max_val_1.values,max_val_1.indices)  
'''
max_val_1.values 常见为 tensor.max(1)[0] #6x4 -> 6
max_val_1.indices 常见为 tensor.max(dim=1)[1]
'''

# argmax
argmax_val_0 = torch.argmax(torch_tensor) # 返回所有元素中的最大值的索引
argmax_val_1 = torch_tensor.argmax() # 返回所有元素中的最大值的索引
print('argmax:',argmax_val_0,argmax_val_1)  # 输出: tensor(13), tensor(13)
argmax_val_0 = torch.argmax(torch_tensor, dim=1) # 返回每行的最大值的索引
argmax_val_1 = torch_tensor.argmax(dim=1) # 返回每行的最大值的索引 #6x4 -> 6
print('argmax:',argmax_val_1,argmax_val_1.shape)  # 输出: tensor([1, 3, 0, 1, 1, 3]) 

argmax_val_1 = torch_tensor.argmax(dim=1,keepdim=True) #  6x4 -> 6x1 #keepdim=True 表示不改变维度
print('argmax:',argmax_val_1,argmax_val_1.shape)  

gather

self.agent.Qnet(state).gather(1, action.long())

gather:收集
离散空间下,为了得到当前Q值,所应用到的函数

# gather
'''
gather(input, dim, index, out=None) → Tensor
input (Tensor) – 源张量
dim (int) – 索引的轴
index (LongTensor) – 包含索引的张量
out (Tensor, optional) – 目标张量
即搜集input在dim维度上的index索引的值,返回到out张量上
'''
torch_tensor = torch.rand(6, 4)
print(torch_tensor)
index = torch.LongTensor([0, 2, 3, 1, 1, 3]) # size = 6
gather_val = torch.gather(torch_tensor, 1, index.reshape(-1,1)) # 6x4 -> 6x1
print('gather:',gather_val)  
'''
即在6行4列 行代表batch_size 列代表act_dim
在列的维度上取出index对应的值
第一行选择第0列的值,第二行选择第2列的值,第三行选择第3列的值...
'''
gather_val = torch_tensor.gather(1, index.reshape(-1,1)) # 6x4 -> 6x1
'''
注意 index 必须为LongTensor类型
index 张量的形状必须与 input 张量的形状在非收集维度上一致。
index 张量的元素必须在 input 张量的有效索引范围内。
'''

注意 index 必须为LongTensor类型

cat,stack

cat([s,a],dim = 1) stack([s,s],dim= -1)

cat:拼接,默认dim = 0
stack:堆叠 ,默认dim = 0
cat 多见于 在critic网路更新时,对state 和 action 的拼接
stack 多见于 多智能体在同一个维度上的log概率密度求和

# cat
'''
cat(tensors, dim=0) → Tensor
tensors (sequence of Tensors) – 要连接的张量序列
dim (int) – 要连接的维度
'''
torch_tensor = torch.rand(6, 4)
cat_val = torch.cat([torch_tensor, torch_tensor], dim=0) # 6x4 -> 12x4 # 默认dim = 0
print('cat:',cat_val,cat_val.shape)

# stack
'''
stack(tensors, dim=0) → Tensor
tensors (sequence of Tensors) – 要连接的张量序列
dim (int) – 要连接的维度
'''
torch_tensor = torch.rand(6, 4)
stack_val = torch.stack([torch_tensor, torch_tensor], dim=0) # 2x6x4 # 默认dim = 0
print('stack:',stack_val,stack_val.shape)

------2024.9.10更新------

torch.distributions.Normal ,torch.normal

normal_dist = tdist.Normal(mu, sigma)

# torch.distributions.Normal 和 torch.normal 的区别
'''
torch.distributions.Normal 是一个概率分布对象,可以用来生成服从正态分布的随机数样本。
torch.normal 是一个函数,用来生成服从正态分布的随机数样本。
'''
import torch
import torch.distributions as tdist
torch.manual_seed(0)
mu = torch.tensor([0.0, 0.0])
sigma = torch.tensor([1.0, 1.0])
# torch.distributions.Normal
normal_dist = tdist.Normal(mu, sigma)
sample = normal_dist.sample()
print(sample) # tensor([ 1.5410, -0.2934])

# torch.normal
sample = torch.normal(mu, sigma) 
print(sample) # tensor([ 1.5410, -0.2934]) 注释掉上两行 则输出相同

## torch.distributions.Normal 独有
log_prob = normal_dist.log_prob(sample)
print(log_prob)

dist_entropy = normal_dist.entropy()
print(dist_entropy)

repeat,expand,expand_as

torch_tensor_2.expand(-1, 24)

# repeat 和 expand 的用法

# repeat
import torch
torch_tensor = torch.rand(6, 4)
repeated_tensor = torch_tensor.repeat(1,2) # 第一维重复1次,第二维重复2次
print(repeated_tensor.shape)

# expand 只在尺寸为1 的维度上有效
torch_tensor_2 = torch.rand(6, 1)
expanded_tensor = torch_tensor_2.expand(-1, 24)
print(expanded_tensor.shape)

# expand_as 只在尺寸为1 的维度上有效
expanded_tensor = torch_tensor_2.expand_as(torch_tensor)
print(expanded_tensor.shape)
'''
torch.Size([6, 8])
torch.Size([6, 24])
torch.Size([6, 4])
'''

repeat 会
由于expand的计算开销更小,这里我选用expand来实现。

其他细节

关于选择动作的神经网络输入

神经网络输入和输出时 尽量选用二维输入,二位输出,以加快网络更新

即这里self.agent.Qnet(ob)的ob,ob以一维输入也可以,二维输入也可以。
但是pytorch是默认神经网络输入是二维的,且本身pytorch中对矩阵的优化比较好,所以选用二维作为输入,
测试过一次,在使用gpu或cpu的情况下,二维的输入比一维的输入快了1/4倍。(二维所花的时间是1维的3/4。)

所以这里在神经网络输入和输出时 尽量选用二维输入,二位输出,以加快网络更新。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐