【强化学习】玩转Box2D游戏:LunarLander
使用强化学习算法(SAC)玩转Box2D游戏党的游戏环境(LunarLander)
★★★ 本文源自AI Studio社区精品项目,【点击此处】查看更多精品内容 >>>
SAC算法+LunarLander环境
关于SAC算法,我们已经在之前的项目中简单的进行了介绍,具体可参见:
在本项目中,我们将使用SAC算法解决Box2D中的lunar_lander
问题。
1. Lunar_lander环境介绍
Lunar_lander
是一个典型的火箭弹道优化问题。根据Pontryagin的最大原则,以全油门发射发动机或关闭发动机是最佳的。这就是为什么这个环境有离散行动的原因:引擎开启或关闭。
Lunar_lander
有两个环境版本:离散或连续。着陆点总是在坐标(0,0)。该坐标是状态向量中的前两个数字。在降落台以外的地方降落是可能的。燃料是无限的,所以agent可以学习飞行,然后在第一次尝试时就降落。
本项目中我们将针对连续的环境进行建模并训练智能体。
动作空间
连续的Lunar_lander环境的动作空间是Box(-1, +1, (2,), dtype=np.float32)
。第一个动作决定了主发动机的油门,而第二个动作则指定了横向助推器的油门。
状态空间
状态是一个8维向量:着陆器的x和y坐标、x和y的线速度、角度、角速度,以及代表每条腿是否与地面接触的两个布尔运算符。
奖励
从屏幕的顶部移动到着陆点并停在那里的奖励约为100-140分。如果着陆器离开了着陆台,它就会失去奖励。如果着陆器坠毁,它将获得额外的-100分。如果它休息了,它将得到额外的+100分。每一个与地面接触的腿是+10分。开启主引擎,每帧-0.3分。发射侧引擎,每帧-0.03分。解决了就是200分,即达到既定目标。
结束条件
- 着陆器坠落(着陆器主体与月球接触)
- 着陆器离开视口(X坐标大于1)。
- 着陆器没有被唤醒。从Box2D的文档中可以看出,一个没有被唤醒的物体是一个不移动并且不与其他物体碰撞的物体。
关于Lunar_lander
更详细的介绍可以参考gym:lunar_lander
2. 环境配置
2.1 安装Box2D环境
# 需要进行持久化安装, 需要使用持久化路径
!mkdir /home/aistudio/external-libraries
# 升级pip
!pip install --upgrade pip
# 安装box2d环境
!pip install box2d-py -t /home/aistudio/external-libraries
2.2 导入依赖
import sys
sys.path.append('/home/aistudio/external-libraries')
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.distribution import Normal
from visualdl import LogWriter
import gym
import matplotlib.pyplot as plt
from matplotlib import animation
from tqdm import tqdm
import numpy as np
import copy
3.搭建模型
3.1 策略网络
# clamp bounds for std of action_log
LOG_SIG_MAX = 2.0
LOG_SIG_MIN = -20.0
class Actor(paddle.nn.Layer):
def __init__(self, obs_dim, action_dim):
super(Actor, self).__init__()
self.l1 = nn.Linear(obs_dim, 256)
self.l2 = nn.Linear(256, 256)
self.mean_linear = nn.Linear(256, action_dim)
self.std_linear = nn.Linear(256, action_dim)
def forward(self, obs):
x = F.relu(self.l1(obs))
x = F.relu(self.l2(x))
act_mean = self.mean_linear(x)
act_std = self.std_linear(x)
act_log_std = paddle.clip(act_std, min=LOG_SIG_MIN, max=LOG_SIG_MAX)
return act_mean, act_log_std
3.2 价值网络
class Critic(paddle.nn.Layer):
def __init__(self, obs_dim, action_dim):
super(Critic, self).__init__()
# Q1 network
self.l1 = nn.Linear(obs_dim + action_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, 1)
# Q2 network
self.l4 = nn.Linear(obs_dim + action_dim, 256)
self.l5 = nn.Linear(256, 256)
self.l6 = nn.Linear(256, 1)
def forward(self, obs, action):
x = paddle.concat([obs, action], 1)
# Q1
q1 = F.relu(self.l1(x))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)
# Q2
q2 = F.relu(self.l4(x))
q2 = F.relu(self.l5(q2))
q2 = self.l6(q2)
return q1, q2
3.3 Actoer-Critic网络模型
class ACModel(paddle.nn.Layer):
def __init__(self, obs_dim, action_dim):
super(ACModel, self).__init__()
self.actor_model = Actor(obs_dim, action_dim)
self.critic_model = Critic(obs_dim, action_dim)
def policy(self, obs):
return self.actor_model(obs)
def value(self, obs, action):
return self.critic_model(obs, action)
def get_actor_params(self):
return self.actor_model.parameters()
def get_critic_params(self):
return self.critic_model.parameters()
def sync_weights_to(self, target_model, decay=0.0):
target_vars = dict(target_model.named_parameters())
for name, var in self.named_parameters():
target_data = decay * target_vars[name] + (1 - decay) * var
target_vars[name] = target_data
target_model.set_state_dict(target_vars)
4. 经验池
定义经验池类,用于存储智能体与环境的经验(交互轨迹),事后反复利用这些经验训练智能体。
经验回放机制有两个好处:
- 打破序列的相关性
- 是重复利用收集到的经验,而不是用一次就丢弃,这样可以用更少的样本数量达到同样的表现
class ReplayMemory(object):
def __init__(self, max_size, obs_dim, act_dim):
self.max_size = int(max_size)
self.obs_dim = obs_dim
self.act_dim = act_dim
self.obs = np.zeros((max_size, obs_dim), dtype='float32')
self.action = np.zeros((max_size, act_dim), dtype='float32')
self.reward = np.zeros((max_size, ), dtype='float32')
self.terminal = np.zeros((max_size, ), dtype='bool')
self.next_obs = np.zeros((max_size, obs_dim), dtype='float32')
self._curr_size = 0
self._curr_pos = 0
# 抽样指定数量(batch_size)的经验
def sample_batch(self, batch_size):
batch_idx = np.random.randint(self._curr_size, size=batch_size)
obs = self.obs[batch_idx]
reward = self.reward[batch_idx]
action = self.action[batch_idx]
next_obs = self.next_obs[batch_idx]
terminal = self.terminal[batch_idx]
return obs, action, reward, next_obs, terminal
def append(self, obs, act, reward, next_obs, terminal):
if self._curr_size < self.max_size:
self._curr_size += 1
self.obs[self._curr_pos] = obs
self.action[self._curr_pos] = act
self.reward[self._curr_pos] = reward
self.next_obs[self._curr_pos] = next_obs
self.terminal[self._curr_pos] = terminal
self._curr_pos = (self._curr_pos + 1) % self.max_size
def size(self):
return self._curr_size
def __len__(self):
return self._curr_size
5. SAC算法类
class SAC():
def __init__(self,model,gamma=None,tau=None,alpha=None,actor_lr=None,critic_lr=None):
self.gamma = gamma
self.tau = tau
self.alpha = alpha
self.actor_lr = actor_lr
self.critic_lr = critic_lr
self.model = model
self.target_model = copy.deepcopy(self.model)
self.actor_optimizer = paddle.optimizer.Adam(
learning_rate=actor_lr, parameters=self.model.get_actor_params())
self.critic_optimizer = paddle.optimizer.Adam(
learning_rate=critic_lr, parameters=self.model.get_critic_params())
self.load_para()
def sample(self, obs):
act_mean, act_log_std = self.model.policy(obs)
normal = Normal(act_mean, act_log_std.exp())
# 重参数化 (mean + std*N(0,1))
x_t = normal.sample([1])
action = paddle.tanh(x_t)
log_prob = normal.log_prob(x_t)
log_prob -= paddle.log((1 - action.pow(2)) + 1e-6)
log_prob = paddle.sum(log_prob, axis=-1, keepdim=True)
return action[0], log_prob[0]
def load_para(self):
try:
self.model.actor_model.set_state_dict(paddle.load('net.pdparams'))
except:
print("无法加载策略网络的参数")
def save(self):
paddle.save(self.model.actor_model.state_dict(),'net.pdparams')
def learn(self, obs, action, reward, next_obs, terminal):
critic_loss = self._critic_learn(obs, action, reward, next_obs,terminal)
actor_loss = self._actor_learn(obs)
self.sync_target()
return critic_loss, actor_loss
def _critic_learn(self, obs, action, reward, next_obs, terminal):
with paddle.no_grad():
next_action, next_log_pro = self.sample(next_obs)
q1_next, q2_next = self.target_model.value(next_obs, next_action)
target_Q = paddle.minimum(q1_next,
q2_next) - self.alpha * next_log_pro
terminal = paddle.cast(terminal, dtype='float32')
target_Q = reward + self.gamma * (1. - terminal) * target_Q
cur_q1, cur_q2 = self.model.value(obs, action)
critic_loss = F.mse_loss(cur_q1, target_Q) + F.mse_loss(
cur_q2, target_Q)
self.critic_optimizer.clear_grad()
critic_loss.backward()
self.critic_optimizer.step()
return critic_loss
def _actor_learn(self, obs):
act, log_pi = self.sample(obs)
q1_pi, q2_pi = self.model.value(obs, act)
min_q_pi = paddle.minimum(q1_pi, q2_pi)
actor_loss = ((self.alpha * log_pi) - min_q_pi).mean()
self.actor_optimizer.clear_grad()
actor_loss.backward()
self.actor_optimizer.step()
return actor_loss
def sync_target(self, decay=None):
if decay is None:
decay = 1.0 - self.tau
self.model.sync_weights_to(self.target_model, decay=decay)
6. SAC智能体
定义智能体,包括以下函数:
- init:初始化需要的算法及目标网络的同步参数
- sample:action的抽样函数
- learn:网络模型的更新
class SACAgent():
def __init__(self, algorithm):
self.alg=algorithm
self.alg.sync_target(decay=0)
def sample(self, obs):
obs = paddle.to_tensor(obs.reshape(1, -1), dtype='float32')
action, _ = self.alg.sample(obs)
action_numpy = action.cpu().numpy()[0]
return action_numpy
def learn(self, obs, action, reward, next_obs, terminal):
terminal = np.expand_dims(terminal, -1)
reward = np.expand_dims(reward, -1)
obs = paddle.to_tensor(obs, dtype='float32')
action = paddle.to_tensor(action, dtype='float32')
reward = paddle.to_tensor(reward, dtype='float32')
next_obs = paddle.to_tensor(next_obs, dtype='float32')
terminal = paddle.to_tensor(terminal, dtype='float32')
critic_loss, actor_loss = self.alg.learn(obs, action, reward, next_obs,
terminal)
return critic_loss, actor_loss
7. SAC的训练
7.1 定义相应的超参数
WARMUP_STEPS = 5e3
MEMORY_SIZE = int(1e6)
BATCH_SIZE = 128 #256
GAMMA = 0.99
TAU = 0.005
ACTOR_LR = 3e-4
CRITIC_LR = 3e-4
writer=LogWriter('./logs')
env_name='LunarLanderContinuous-v2'
env_seed=0
alpha=0.2
train_total_steps=300
env = gym.make(env_name)
env.seed(env_seed)
obs_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
# 初始化 模型,算法,智能体以及经验池
model = ACModel(obs_dim, action_dim)
algorithm = SAC(model,gamma=GAMMA,tau=TAU,alpha=alpha,actor_lr=ACTOR_LR,critic_lr=CRITIC_LR)
agent = SACAgent(algorithm)
rpm = ReplayMemory(max_size=MEMORY_SIZE, obs_dim=obs_dim, act_dim=action_dim)
W0131 14:45:18.162463 415 gpu_resources.cc:61] Please NOTE: device: 0, GPU Compute Capability: 8.0, Driver API Version: 11.2, Runtime API Version: 11.2
W0131 14:45:18.165642 415 gpu_resources.cc:91] device: 0, cuDNN Version: 8.2.
7.2 定义训练函数
- SAC是off-policy,反复使用经验池中的历史经验进行网络更新。
- 使用tqdm显示训练的进度
- 保存回合奖励最大值时的策略网络参数,用于验证
- 使用matplotlib绘制训练过程的回合奖励变化
def train_off_policy_agent(env, agent, num_episodes, replay_buffer, WARMUP_STEPS,batch_size):
return_list = []
maxre=-1000000
end_random=False
episode=0
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode+=1
action_dim = env.action_space.shape[0]
obs = env.reset()
done = False
episode_reward = 0
episode_steps = 0
while not done:
episode_steps += 1
if replay_buffer.size() < WARMUP_STEPS:
action = np.random.uniform(-1, 1, size=action_dim)
else:
if end_random==False:
print("开始使用智能体预测动作")
end_random=True
action = agent.sample(obs)
next_obs, reward, done, _ = env.step(action)
terminal = float(done) if episode_steps < 1000 else 0
replay_buffer.append(obs, action, reward, next_obs, terminal)
obs = next_obs
episode_reward += reward
# 收集到足够的经验后进行网络的更新
if replay_buffer.size() >= WARMUP_STEPS:
batch_obs, batch_action, batch_reward, batch_next_obs, batch_terminal = replay_buffer.sample_batch(BATCH_SIZE)
agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs,batch_terminal)
writer.add_scalar('Episode_reward',episode_reward,episode)
if maxre<episode_reward:
maxre=episode_reward
agent.alg.save()
return_list.append(episode_reward)
if (i_episode+1) % 10 == 0:
pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
train_off_policy_agent(env, agent, train_total_steps, rpm, WARMUP_STEPS, BATCH_SIZE)
奖励曲线
8. 验证
我们使用训练好的智能体进行测试
env=gym.make('LunarLanderContinuous-v2')
state=env.reset()
done=False
total_reward=0
i=0
while not done:
action=agent.sample(state)
next_state,reward,done,_=env.step(action)
total_reward+=reward
if i%20==0:
print(i," ",reward,done)
i+=1
state=next_state
print("总奖励:",total_reward)
env.close()
0 1.1362964249934293 False
20 -0.9152212061630337 False
40 -1.6817496167424224 False
60 0.612567875872121 False
80 2.65160559062017 False
100 3.092850427945321 False
120 2.0211063527009374 False
140 0.8737577052041431 False
160 -1.3234211037221897 False
180 2.062267459228774 False
200 0.8483712984577092 False
220 0.6579809519331685 False
240 -0.009453803122717375 False
260 -8.206880579564313e-07 False
总奖励: 278.31948684657357
8.31948684657357
训练好的智能体解决Lunar_lander
环境示意图:
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)