强化学习(六) - 连续空间中的强化学习(RL in Continuous Spaces)及相关实例
6.1离散空间和连续空间在之前的实例中,状态和动作的数量受到限制。使用小的,有限的马尔可夫决策过程(MDP),可以用表,字典或其他有限结构来表示动作值函数。例如,考虑下面的非常小的gridworld。假设世界有四个可能的状态,并且代理有四个可能的操作可供使用(上,下,左,右)。您在前面的课程中了解到,我们可以在表中表示估计的最佳操作值函数,每个状态对应一个行,每个动作对应一个列。我们将此表称为Q表
强化学习(六) - 连续空间中的强化学习
6.1 连续空间中的强化学习
在之前的实例中,状态和动作的数量受到限制。使用小的,有限的马尔可夫决策过程(MDP),可以用表,字典或其他有限结构来表示动作价值函数。
例如,考虑下面的非常小的gridworld。假设世界有四个可能的状态,并且代理有四个可能的操作可供使用(上,下,左,右)。我们可以在表中表示估计的最佳操作价值函数,每个状态对应一个行,每个动作对应一个列。我们将此表称为Q表。
但是,具有更大空间的MDP呢?考虑到Q表的每个状态必须有一行。因此,例如,如果有1000万个可能的状态,则Q表必须具有1000万行。此外,如果状态空间是连续的实数值的集合(无穷大),那么就不可能以有限的结构表示动作值。
强化学习算法通常分为两大类别: 基于模型的方法,例如策略迭代。以及需要已知转换和奖励模型的值迭代。它们本质上通过动态规划并使用该模型, 以迭代方式计算期望的价值函数和最优策略,另一方面 蒙特卡洛方法和时间差分学习等,不基于模型的方法不需要明确的模型。它们通过执行探索性动作对环境抽样,并使用获得的经验直接估计价价值函数。这就是强化学习的简单介绍 当然还有更多内容。
深度强化学习是一个相对较新的术语。 是指使用深度学习(主要是多层神经网络)解决强化学习问题的方法。 强化学习通常包含有限的 MDP,即状态和动作数量是有限的。 但是有太多的问题具有非常大的状态和动作空间,甚至由连续的实数组成。 传统算法使用表格或字典,或其他有限结构来记录状态和动作值,但是不再适合此类问题。 因此,我们首先要考虑的是,如何泛化这些算法以便适合大型连续空间。 这就为开发深度强化学习算法奠定了基础,包括深度 Q 学习等基于值的技巧,以及直接尝试优化策略的方法,例如策略梯度。 最后,会使用到结合这两类方法的更高级方法,即行动者-评论者方法。
6.2 离散空间和连续空间
我们先来看看离散空间和连续空间的含义。回忆下马尔可夫决策流程的定义, 我们假设任何时间的环境状态来自于一组潜在状态,当该组合是有限组合时,我们可以将其称之为离散状态空间。动作也类似,如果有一组有限的动作,则表示环境有一个离散动作空间。
离散空间简化了问题,首先,使我们能够将任何状态和动作函数,表示为字典或查询表。假设有一个状态价值函数 V V V,它是从一组状态到实数的映射。如果将状态表示为整数,则可以将价值函数表示为字典 并将每个状态当做键。类似地, 假设有一个动作价值函数 Q,它将每个状态动作对映射到一个实数。同样 我们可以使用字典或将价值函数存储为表格或矩阵,每行对应一个状态,每列对应一个动作。
离散空间对很多强化学习算法来说也很关键,例如, 在价值迭代中,这个内部 for 循环逐个遍历每个状态,并更新相应的估值 V ( s ) V(s) V(s)。如果状态空间是连续的 则不可能这么操作,循环将永远持续下去,甚至对于有很多状态的离散状态空间来说, 这一流程也很快变得不可行,Q 学习等不基于模型的方法也需要离散空间。这里 我们对状态 S ′ S^′ S′ 的所有潜在动作执行 max 运算。如果有一组有限的动作, 则很轻松。但是如果动作空间是连续的,这个小小的计算步骤本身就会变成完全失败的优化问题。那么连续空间到底是什么意思?
连续是离散的反义词。连续空间并不限定于一组独特的值, 例如整数。相反, 它可以是一定范围的值, 通常是实数。这意味着状态值等量值。例如表示离散情况的条形图,每个状态对应一个长条。现在需要转换成预期范围内的密度图。同一记法也扩展到了环境中,状态不再是单个实数 而是一个此类数字的向量。这样依然称之为连续空间 只是不再是一个维度。
在继续深入讲解之前,我们尝试了解下为何连续状态空间很重要。它们来自何处?思考一个高级决策制定任务, 例如下棋。经常可以将一组潜在状态看做离散状态。每个棋子都在棋盘上的哪个方框内。我们不需要精确地确定,每个棋子处在方框内的哪个位置或朝着哪个方向。虽然我们也可以了解这些细节信息,并思考, 为何你的骑士在瞪着我的王后。但是这些信息与要解决的问题不相关,我们可以从游戏模型中删除这些信息。通常, 网格环境在强化学习中非常热门。它们使我们能够直观地查看智能体在空间环境中的行为如何,但是现实的物理空间并不能始终清晰地划分为网格。
动作也可以是连续的动作。例如玩飞镖的机器人,它必须设置扔飞镖的高度和角度。选择相应级别的扔力。即使这些值出现小小的变化,也会对飞镖最终落在板上的位置有很大的影响。通常。 需要在物理环境中,采取的大多数动作本质上都是连续动作。很明显。 我们需要修改表示法或算法,或者同时修改二者以便处理连续空间。我们将讨论的两个主要策略是,离散化和函数逼近。
6.3 离散化
顾名思义,离散化就是将连续空间转换为离散空间。对于某些环境, 离散化状态空间效果很好,使我们能够几乎不加修改就能使用现有的算法。动作也可以离散化,例如, 角可以拆分为完整度数。甚至按 90 度递增。
如果合适的话,现在假设一个离散化环境中有一些物体, 机器人需要绕过这些障碍物。对于网格表示法,我们只能标记存在物体的单元格 甚至稍微超出范围,称之为占据网格。但是如果我们以较大网格离散化,可能会使智能体以为,没有绕过这些障碍物以抵达目标位置的道路。如果我们能够根据这些障碍物调整网格,那么可以为智能体找到一条潜在的道路。另一种方法是在需要时将网格拆分为更小的单元格,依然是逼近结果。但是可以让我们在需要的位置分配更多的状态表示,这样比将整个状态空间拆分为更小的单元格合适。拆分整个状态空间可能会增加状态的总数,进而增加计算值函数所需的时间。
这种离散化适合网格世界等空间领域,但是其他状态空间呢?我们来看看汽车换挡这个不同领域的问题。如今的大部分汽车都会自动换挡, 汽车如何决定切换到哪一挡以及何时换挡?这个图表简单地描述了对于一辆普通汽车,油耗如何随着不同挡位速度的变化而变化。假设状态仅包含车辆速度,以及当前挡位,奖励与油耗成反比,智能体可以采取的动作包括换到更高挡位或更低挡位。虽然速度是连续值,但是可以离散化为不同的范围。最佳划分方式是一个挡位对应一个速度范围,注意这些范围可以具有不同的长度,即离散化是不均匀的。如果状态空间还有其他维度, 例如油门位置,那么它们也可以不均匀地细分。
实例:小车上山
小车上山(MountainCar-v0)是一个经典的控制问题.如图6-6所示,小车在一段范围内行驶.在任意时刻,在水平方向看小车的位置范围是[-1.2, 0.6],速度的范围是[-0.07, 0.07].在每个时刻,智能体可以对小车施加3种动作中的一种, 向左施力, 不实例, 向右施力.智能体施力和小车的水平位置会共同决定小车 下一个时刻的速度.当某时刻小车的水平位置大于0.5时,控制目标成功达成,回合结束.控制的目标是让小车尽可能少的步骤达到目标.一般认为,如果智能体在连续100个回合中的平均步数 ≤ 100 \leq 100 ≤100,就认为问题解决了.
在绝大多数情况下,智能体简单的向右施力并不足以让小车成功越过目标.
本节假设智能体并不知道环境确定小车位置和速度的数学表达式.实际上,小车位置和速度是有数学表达式的.记 t t t时刻( t = 0 , 1 , 2 , . . . t=0, 1, 2,... t=0,1,2,...),小车的位置为 X t ( X t ∈ [ − 1.2 , 0.6 ] ) X_t(X_t \in [-1.2, 0.6]) Xt(Xt∈[−1.2,0.6]), 速度为 V t ( V t ∈ [ − 0.07 , 0.07 ] ) V_t(V_t \in [-0.07, 0.07]) Vt(Vt∈[−0.07,0.07]),智能体施力为 A t ∈ { 0 , 1 , 2 } A_t \in \{ 0, 1, 2\} At∈{0,1,2}, 初始状态 X 0 ∈ [ − 0.6 , − 0.4 ) , V 0 = 0 X_0 \in [-0.6, -0.4), V_0=0 X0∈[−0.6,−0.4),V0=0.
从
t
t
t时刻到
t
+
1
t+1
t+1时刻的更新式为
X
t
+
1
=
clip
(
X
t
+
V
t
,
−
1.2
,
0.6
)
V
t
+
1
=
clip
(
V
t
+
0.001
(
A
t
−
1
)
−
0.0025
c
o
s
(
3
X
t
)
,
−
0.07
,
0.07
)
\begin{aligned}X_{t+1} &= \text{clip}(X_t +V_t, -1.2, 0.6)\\V_{t+1}&=\text{clip}(V_t+0.001(A_t-1)-0.0025cos(3X_t), -0.07, 0.07)\end{aligned}
Xt+1Vt+1=clip(Xt+Vt,−1.2,0.6)=clip(Vt+0.001(At−1)−0.0025cos(3Xt),−0.07,0.07)
其中clip函数限制了位置和速度的范围
clip
(
x
,
x
m
i
n
,
x
m
a
x
)
=
{
x
m
i
n
,
x
≤
x
m
i
n
x
,
x
m
i
n
<
x
<
x
m
a
x
x
m
a
x
,
x
≥
x
m
a
x
\text{clip}(x, x_{min}, x_{max}) = \begin{cases} x_{min}, & x\leq x_{min} \\ x, & x_{min}< x < x_{max} \\ x_{max}, & x\geq x_{max} \\ \end{cases}
clip(x,xmin,xmax)=⎩⎪⎨⎪⎧xmin,x,xmax,x≤xminxmin<x<xmaxx≥xmax
6.3.1 相关程序
首先我们建立关于Agent的Q学习类
K6_MountainCar_Agent
import numpy as np
# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
print("Uniform grid: [<low>, <high>] / <bins> => <splits>")
for l, h, b, splits in zip(low, high, bins, grid):
print(" [{}, {}] / {} => {}".format(l, h, b, splits))
return grid
# 根据给定的网格离散样本。
def discretize(sample, grid):
return list(int(np.digitize(s, g)) for s, g in zip(sample, grid)) # 返回索引值
class QLearningAgent:
"""Q-Learning agent,,通过离散化可以作用于连续的状态空间。"""
def __init__(self, env, state_grid, alpha=0.02, gamma=0.99,
epsilon=1.0, epsilon_decay_rate=0.9995, min_epsilon=.01, seed=505):
"""初始化变量,创建离散化网格。"""
# Environment info
self.env = env
self.state_grid = state_grid
self.state_size = tuple(len(splits) + 1 for splits in self.state_grid) # n-维状态空间
self.action_size = self.env.action_space.n # 1-维离散动作空间
self.seed = np.random.seed(seed)
print("Environment:", self.env)
print("State space size:", self.state_size)
print("Action space size:", self.action_size)
# 学习模型参数
self.alpha = alpha # 学习率
self.gamma = gamma # 折扣因子
self.epsilon = self.initial_epsilon = epsilon # 初始探索率
self.epsilon_decay_rate = epsilon_decay_rate # epsilon衰减系数
self.min_epsilon = min_epsilon
# 创建Q表
self.q_table = np.zeros(shape=(self.state_size + (self.action_size,)))
print("Q table size:", self.q_table.shape)
def preprocess_state(self, state):
"""将连续状态映射到它的离散表示。"""
return tuple(discretize(state, self.state_grid))
def reset_episode(self, state):
"""为新的事件重置变量."""
# 逐步降低探索率
self.epsilon *= self.epsilon_decay_rate
self.epsilon = max(self.epsilon, self.min_epsilon)
# 决定初始行动
self.last_state = self.preprocess_state(state)
self.last_action = np.argmax(self.q_table[self.last_state])
return self.last_action
def reset_exploration(self, epsilon=None):
"""重置训练时使用的探索率."""
self.epsilon = epsilon if epsilon is not None else self.initial_epsilon
def act(self, state, reward=None, done=None, mode='train'):
"""选择next操作并更新内部Q表 (when mode != 'test')."""
state = self.preprocess_state(state)
if mode == 'test':
# 测试模式:简单地产生一个动作
action = np.argmax(self.q_table[state])
else:
# 训练模式(默认):更新Q表,选择下一步行动
# Note: 我们用当前状态,回报更新最后的状态动作对的Q表条目
self.q_table[self.last_state + (self.last_action,)] += self.alpha * \
(reward + self.gamma * max(self.q_table[state]) -
self.q_table[self.last_state + (self.last_action,)])
# 探索 vs. 利用
do_exploration = np.random.uniform(0, 1) < self.epsilon
if do_exploration:
# 随机选择一个动作
action = np.random.randint(0, self.action_size)
else:
# 从Q表中选择最佳动作
action = np.argmax(self.q_table[state])
# 存储当前状态,下一步操作
self.last_state = state
self.last_action = action
return action
主程序K6_MountainCar
如下
import sys
import gym
import numpy as np
import matplotlib.collections as mc
import pandas as pd
import matplotlib.pyplot as plt
from K6_MountainCar_Agent import QLearningAgent
# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
print("Uniform grid: [<low>, <high>] / <bins> => <splits>")
for l, h, b, splits in zip(low, high, bins, grid):
print(" [{}, {}] / {} => {}".format(l, h, b, splits))
return grid
# 根据给定的网格离散样本。
def discretize(sample, grid):
return list(int(np.digitize(s, g)) for s, g in zip(sample, grid)) # 返回索引值
# 在给定的二维网格上可视化原始的和离散的样本。
def visualize_samples(samples, discretized_samples, grid, low=None, high=None):
fig, ax = plt.subplots(figsize=(10, 10))
# 显示网格
ax.xaxis.set_major_locator(plt.FixedLocator(grid[0]))
ax.yaxis.set_major_locator(plt.FixedLocator(grid[1]))
ax.grid(True)
# 如果指定了边界(低、高),则使用它们来设置轴的限制
if low is not None and high is not None:
ax.set_xlim(low[0], high[0])
ax.set_ylim(low[1], high[1])
else:
# 否则使用第一个、最后一个网格位置为low、high(为了进一步映射离散化的样本)
low = [splits[0] for splits in grid]
high = [splits[-1] for splits in grid]
# 将每个离散的样本(实际上是一个索引)映射到相应网格单元格的中心
grid_extended = np.hstack((np.array([low]).T, grid, np.array([high]).T)) # add low and high ends
grid_centers = (grid_extended[:, 1:] + grid_extended[:, :-1]) / 2 # compute center of each grid cell
locs = np.stack(grid_centers[i, discretized_samples[:, i]] for i in range(len(grid))).T # map discretized samples
ax.plot(samples[:, 0], samples[:, 1], 'o') # 绘制初始样本
ax.plot(locs[:, 0], locs[:, 1], 's') # 绘制离散后的样本
ax.add_collection(mc.LineCollection(list(zip(samples, locs)),
colors='orange')) # 添加一条线连接每个原始离散样本
ax.legend(['original', 'discretized'])
def run(agent, env, num_episodes=20000, mode='train'):
"""给定的强化学习环境中运行agent并返回分数."""
scores = []
max_avg_score = -np.inf
for i_episode in range(1, num_episodes + 1):
# 初始化事件
state = env.reset()
action = agent.reset_episode(state)
total_reward = 0
done = False
# 运行步骤直到完成
while not done:
state, reward, done, info = env.step(action)
total_reward += reward
action = agent.act(state, reward, done, mode)
# 保存最终分数
scores.append(total_reward)
# 输出事件状态
if mode == 'train':
if len(scores) > 100:
avg_score = np.mean(scores[-100:])
if avg_score > max_avg_score:
max_avg_score = avg_score
if i_episode % 100 == 0:
print("\rEpisode {}/{} | Max Average Score: {}".format(i_episode, num_episodes, max_avg_score), end="")
sys.stdout.flush()
return scores
def plot_scores(scores, rolling_window=100):
"""Plot scores and optional rolling mean using specified window."""
plt.plot(scores)
plt.title("Scores")
rolling_mean = pd.Series(scores).rolling(rolling_window).mean()
plt.plot(rolling_mean)
return rolling_mean
def plot_q_table(q_table):
"""Visualize max Q-value for each state and corresponding action."""
q_image = np.max(q_table, axis=2) # max Q-value for each state
q_actions = np.argmax(q_table, axis=2) # best action for each state
fig, ax = plt.subplots(figsize=(10, 10))
cax = ax.imshow(q_image, cmap='jet')
cbar = fig.colorbar(cax)
for x in range(q_image.shape[0]):
for y in range(q_image.shape[1]):
ax.text(x, y, q_actions[x, y], color='white',
horizontalalignment='center', verticalalignment='center')
ax.grid(False)
ax.set_title("Q-table, size: {}".format(q_table.shape))
ax.set_xlabel('position')
ax.set_ylabel('velocity')
# main function
if __name__ == "__main__":
# 创建一个环境并设置随机种子
env = gym.make("MountainCar-v0")
env.seed(505)
# 环境测试
env_test = False
if env_test is True:
state = env.reset()
score = 0
for t in range(200):
action = env.action_space.sample()
env.render()
state, reward, done, _ = env.step(action)
score += reward
if done:
break
print("Final score:", score)
env.close()
# Explore state (observation) space
print("State space:", env.observation_space)
print("- low:", env.observation_space.low)
print("- high:", env.observation_space.high)
print("State space samples:")
print(np.array([env.observation_space.sample() for i in range(10)]))
# action space
print("Action space:", env.action_space)
# 从动作空间生成一些示例
print("Action space samples:")
print(np.array([env.action_space.sample() for i in range(10)]))
state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))
q_agent = QLearningAgent(env, state_grid)
# 以不同模式运行,方便测试结果
run_mode = True
if run_mode is True:
q_agent.q_table = np.load('q_table.npy', allow_pickle=True)
state = env.reset()
score = 0
for t in range(200):
action = q_agent.act(state, mode='test')
env.render()
state, reward, done, _ = env.step(action)
score += reward
if done:
break
print('Final score:', score)
env.close()
else:
scores = run(q_agent, env)
# plot data
plt.plot(scores)
plt.title("Scores")
rolling_mean = plot_scores(scores)
plt.show()
test_scores = run(q_agent, env, num_episodes=100, mode='test')
print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))
_ = plot_scores(test_scores)
plot_q_table(q_agent.q_table)
plt.show()
np.save('q_table.npy', q_agent.q_table)
6.3.2 程序注解
(1) 环境测试
首先来看一下MountainCar
这个环境条件,在主程序K6_MountainCar
中,可以通过更改env_test
的值,使其为True
来进行环境的测试.输出如下
Final score: -200.0
State space: Box(2,)
- low: [-1.2 -0.07]
- high: [0.6 0.07]
State space samples:
[[-0.00739189 -0.01564005]
[-0.4523059 0.04406008]
[-0.1716276 -0.04345753]
[-0.22001167 0.01072859]
[-1.1200054 -0.05656335]
[-0.91125274 -0.05465209]
[-0.38469937 -0.06611647]
[ 0.15009743 0.03064202]
[-1.1663655 -0.06613734]
[-0.352857 -0.01548356]]
Action space: Discrete(3)
Action space samples:
[1 1 2 2 1 1 1 1 0 0]
(2) 离散化
# 将观测空间离散化,其中bins控制离散精度
state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))
这个部分的作用是将观测空间进行离散化,借助create_uniform_grid()
函数
# 定义可用于离散空间的等间距网格。
def create_uniform_grid(low, high, bins=(10, 10)):
grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]
print("Uniform grid: [<low>, <high>] / <bins> => <splits>")
for l, h, b, splits in zip(low, high, bins, grid):
print(" [{}, {}] / {} => {}".format(l, h, b, splits))
return grid
其中bins
定义了离散化的精度
Uniform grid: [<low>, <high>] / <bins> => <splits>
[-1.2000000476837158, 0.6000000238418579] / 10 => [-1.02000004 -0.84000003 -0.66000003 -0.48000002 -0.30000001 -0.12
0.06 0.24000001 0.42000002]
[-0.07000000029802322, 0.07000000029802322] / 10 => [-0.056 -0.042 -0.028 -0.014 0. 0.014 0.028 0.042 0.056]
(3) 模型训练
q_agent = QLearningAgent(env, state_grid)
# 以不同模式运行,方便测试结果
run_mode = False
# 运行测试模式
if run_mode is True:
q_agent.q_table = np.load('q_table.npy', allow_pickle=True)
state = env.reset()
score = 0
for t in range(200):
action = q_agent.act(state, mode='test')
env.render()
state, reward, done, _ = env.step(action)
score += reward
if done:
break
print('Final score:', score)
env.close()
# 训练模式
else:
scores = run(q_agent, env)
# plot data
plt.plot(scores)
plt.title("Scores")
rolling_mean = plot_scores(scores)
plt.show()
test_scores = run(q_agent, env, num_episodes=100, mode='test')
print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))
_ = plot_scores(test_scores)
plot_q_table(q_agent.q_table)
plt.show()
在这里提供了两种运行模式,当run_mode = False
时,为训练模式. 此时会进行模型训练,相关训练过程的输出如下
--Agent--
Environment: <TimeLimit<MountainCarEnv<MountainCar-v0>>>
State space size: (10, 10)
Action space size: 3
Q table size: (10, 10, 3)
Episode 20000/20000 | Max Average Score: -130.49[TEST] Completed 100 episodes with avg. score = -146.91
可视化q表的输出:
分数和平均分数输出:
可以看到平均分数随着训练轮数的增加而增加.
同时,我们也可以在test模式下运行模型,来分析和观测所获得的分数,输出如下,
在训练完成之后,程序会将Q表保存到q_table.npy文件中,方便之后的使用.
当run_mode = True
时,程序以运行模式进行.程序会从文件中读取训练后的q_表的值,运行完之后,程序自动退出.
(4) 模型优化
之前在环境介绍中说过,小车上山问题在分数小于100时可以认为解决了.在之前的模型中,我们的平均分数保持在-150左右,我们可以通过细化离散度来获得更好的分数.
我们将bins
更改为(20, 20)
,并将训练轮数调整到50000轮.运行程序
分数和平均分数输出:
test模式输出:
可以看到分数有了一定的提升.
将run_mode
设置成True
即可查看效果.
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)