强化学习系列文章(二十六):向量化环境Vectorized Environments
强化学习系列文章(二十六):向量化环境Vectorized EnvironmentsOpenAI Gym最近公布了官方API手册,可以趁机学习一下环境运行的并行化技术。https://www.gymlibrary.ml/pages/vector_api/indexVectorized Environments所谓“矢量化环境”,是运行多个(独立)子环境的环境,可以按顺序运行,也可以使用多处理并行运
强化学习系列文章(二十六):向量化环境Vectorized Environments
OpenAI Gym最近公布了官方API手册,可以趁机学习一下环境运行的并行化技术。
https://www.gymlibrary.ml/pages/vector_api/index
Vectorized Environments
所谓“矢量化环境”,是运行多个(独立)子环境的环境,可以按顺序运行,也可以使用多处理并行运行。矢量化环境将一批action作为输入,并返回一批observation。例如,当策略被定义为对一批obs进行操作的神经网络时,矢量化环境就特别有用了。
Gym 提供两种类型的矢量化环境:
gym.vector.SyncVectorEnv
,其中的子环境按顺序执行。gym.vector.AsyncVectorEnv
,其中的子环境使用多进程并行执行。这将为每个子环境创建一个进程。
与gym.make
类似,您可以使用gym.vector.make
函数运行已注册环境的矢量化版本。这将运行同一环境的多个副本(默认情况下并行运行)。
下面的示例并行运行CartPole-v1
环境的3个副本,将3个二值操作(每个子环境一个)的向量作为输入,并返回一个沿第一维度堆叠的3个observation的数组,其中包含每个子环境返回的奖励数组,以及一个指示每个子环境中的episode是否已结束的布尔数组。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)
>>> observations
array([[ 0.00122802, 0.16228443, 0.02521779, -0.23700266],
[ 0.00788269, -0.17490888, 0.03393489, 0.31735462],
[ 0.04918966, 0.19421194, 0.02938497, -0.29495203]],
dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})
函数gym.vector.make
仅用于基本情况(例如,运行同一注册环境的多个副本)。对于任何其他用例,请使用SyncVectorEnv
进行顺序执行,或使用AsyncVectorEnv
进行并行执行。这些用例可能包括:
- 使用不同的参数运行同一环境的多个实例(例如,具有不同重力值的
"Pendulum-v0"
) - 运行未注册环境(例如自定义环境)的多个实例
- 在某些(但不是全部)子环境中使用包装器。
Creating a vectorized environment
若要创建运行多个子环境的矢量化环境,可以将子环境包装在gym.vector.SyncVectorEnv
(用于顺序执行)或gym.vector.AsyncVectorEnv
(用于并行执行,具有多进程)中。这些创建矢量化环境的API的输入是“一个指定如何创建子环境的可调用对象的列表”。
>>> envs = gym.vector.AsyncVectorEnv([
lambda: gym.make("CartPole-v1"),
lambda: gym.make("CartPole-v1"),
lambda: gym.make("CartPole-v1")
])
或者,要创建同一环境的多个副本的矢量化环境,可以使用函数gym.vector.make()
。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3) # Equivalent
要启用action和observation的自动批处理,所有子环境必须共享相同的action_space
和observation_space
。但是,所有子环境都不需要是彼此的精确副本。例如,可以使用以下命令在矢量化环境中运行2个具有不同重力值的Pendulum-v0
实例:
>>> env = gym.vector.AsyncVectorEnv([
lambda: gym.make("Pendulum-v0", g=9.81),
lambda: gym.make("Pendulum-v0", g=1.62)
])
关于自动批处理的详细信息,请参考Observation & Action spaces
章节。
将AsyncVectorEnv
与spawn
或forkserver start
方法一起使用时,必须使用'if __name__ == '__main__":'
包装包含矢量化环境的代码。有关详细信息,请参阅此文档。
if __name__ == "__main__":
envs = gym.vector.make("CartPole-v1", num_envs=3, context="spawn")
Working with vectorized environments
虽然标准Gym环境执行单个action并返回单个observation(包括reward和done),但矢量化环境是将一批action作为输入,并返回一批observation,以及一系列奖励和布尔值Done,指示episode是否在每个子环境中结束。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
array([[ 0.00198895, -0.00569421, -0.03170966, 0.00126465],
[-0.02658334, 0.00755256, 0.04376719, -0.00266695],
[-0.02898625, 0.04779156, 0.02686412, -0.01298284]],
dtype=float32)
>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)
>>> observations
array([[ 0.00187507, 0.18986781, -0.03168437, -0.301252 ],
[-0.02643229, -0.18816885, 0.04371385, 0.3034975 ],
[-0.02803041, 0.24251814, 0.02660446, -0.29707024]],
dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})
矢量化环境与任何子环境兼容,无论动作和观察空间如何(例如,像gym.spaces.Dict
这样的容器空间,或任何任意嵌套的空间)。特别是,矢量化环境可以自动批处理VectorEnv.reset
和VectorEnv.step
返回的任何标准GymSpace
定义的observation(例如gym.spaces.Box
、gym.spaces.Discrete
、gym.spaces.Dict
或其任何嵌套结构)。同样,矢量化环境也可以执行批量action,而这些action可以是任何标准的GymSpace
定义的。
>>> class DictEnv(gym.Env):
... observation_space = gym.spaces.Dict({
... "position": gym.spaces.Box(-1., 1., (3,), np.float32),
... "velocity": gym.spaces.Box(-1., 1., (2,), np.float32)
... })
... action_space = gym.spaces.Dict({
... "fire": gym.spaces.Discrete(2),
... "jump": gym.spaces.Discrete(2),
... "acceleration": gym.spaces.Box(-1., 1., (2,), np.float32)
... })
...
... def reset(self):
... return self.observation_space.sample()
...
... def step(self, action):
... observation = self.observation_space.sample()
... return (observation, 0., False, {})
>>> envs = gym.vector.AsyncVectorEnv([lambda: DictEnv()] * 3)
>>> envs.observation_space
Dict(position:Box(-1.0, 1.0, (3, 3), float32), velocity:Box(-1.0, 1.0, (3, 2), float32))
>>> envs.action_space
Dict(fire:MultiDiscrete([2 2 2]), jump:MultiDiscrete([2 2 2]), acceleration:Box(-1.0, 1.0, (3, 2), float32))
>>> envs.reset()
>>> actions = {
... "fire": np.array([1, 1, 0]),
... "jump": np.array([0, 1, 0]),
... "acceleration": np.random.uniform(-1., 1., size=(3, 2))
... }
>>> observations, rewards, dones, infos = envs.step(actions)
>>> observations
{"position": array([[-0.5337036 , 0.7439302 , 0.41748118],
[ 0.9373266 , -0.5780453 , 0.8987405 ],
[-0.917269 , -0.5888639 , 0.812942 ]], dtype=float32),
"velocity": array([[ 0.23626241, -0.0616814 ],
[-0.4057572 , -0.4875375 ],
[ 0.26341468, 0.72282314]], dtype=float32)}
矢量化环境中的子环境在episode结束时会自动调用obj:reset
。在下面的示例中,第 3 个子环境的episode在2个step后结束(agent掉入一个洞中),子环境被重置(observation0
)。
>>> envs = gym.vector.make("FrozenLake-v1", num_envs=3, is_slippery=False)
>>> envs.reset()
array([0, 0, 0])
>>> observations, rewards, dones, infos = envs.step(np.array([1, 2, 2]))
>>> observations, rewards, dones, infos = envs.step(np.array([1, 2, 1]))
>>> dones
array([False, False, True])
>>> observations
array([8, 2, 0])
Observation & Action spaces
与任何Gym环境一样,矢量化环境包含VectorEnv.observation_space
和VectorEnv.action_space
两个属性,用于指定环境的观察和操作空间。由于矢量化环境在多个子环境中运行,其中所有子环境执行的操作和返回的观测值一起批处理,因此obs和action空间也进行批处理,只要输入action是VectorEnv.action_space
的有效元素,并且observation是VectorEnv.observation_space
的有效元素。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.observation_space
Box([[-4.8 ...]], [[4.8 ...]], (3, 4), float32)
>>> envs.action_space
MultiDiscrete([2 2 2])
为了在矢量化环境中对obs和action进行适当的批处理,所有子环境的观测值和动作空间必须相同。
>>> envs = gym.vector.AsyncVectorEnv([
... lambda: gym.make("CartPole-v1"),
... lambda: gym.make("MountainCar-v0")
... ])
RuntimeError: Some environments have an observation space different from `Box([-4.8 ...], [4.8 ...], (4,), float32)`. In order to batch observations, the observation spaces from all environments must be equal.
但是,有时访问特定子环境的obs和action空间(而不是批处理的空间)可能很方便。可以使用矢量化环境的属性VectorEnv.single_observation_space
和VectorEnv.single_action_space
访问这些属性。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_observation_space
Box([-4.8 ...], [4.8 ...], (4,), float32)
>>> envs.single_action_space
Discrete(2)
这很方便,例如在实例化一个策略函数的时候。在下面的示例中,使用VectorEnv.single_observation_space
和VectorEnv.single_action_space
来定义线性策略的权重。请注意,由于矢量化环境,我们只需调用一次策略函数,即可将策略直接应用于整批observation。
>>> from gym.spaces.utils import flatdim
>>> from scipy.special import softmax
>>> def policy(weights, observations):
... logits = np.dot(observations, weights)
... return softmax(logits, axis=1)
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> weights = np.random.randn(
... flatdim(envs.single_observation_space),
... envs.single_action_space.n
... )
>>> observations = envs.reset()
>>> actions = policy(weights, observations).argmax(axis=1)
>>> observations, rewards, dones, infos = envs.step(actions)
Intermediate Usage
Shared memory
AsyncVectorEnv
在单个进程内运行每个子环境。在每次调用AsyncVectorEnv.reset
或AsyncVectorEnv.step
时,所有子环境的观察结果都会发送回主进程。为了避免在进程之间进行昂贵的数据传输,特别是对于大型observation(例如图像),AsyncVectorEnv
默认使用共享内存(shared_memory=True
),进程可以以最低的成本写入和读取。这可以提升矢量化环境的整体性能。
>>> env_fns = [lambda: gym.make("BreakoutNoFrameskip-v4")] * 5
>>> envs = gym.vector.AsyncVectorEnv(env_fns, shared_memory=False)
>>> envs.reset()
>>> %timeit envs.step(envs.action_space.sample())
2.23 ms ± 136 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
>>> envs = gym.vector.AsyncVectorEnv(env_fns, shared_memory=True)
>>> envs.reset()
>>> %timeit envs.step(envs.action_space.sample())
1.36 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Exception handling
由于有时事情可能不会按计划进行,因此在矢量化环境中会重新引发子环境中引发的异常,即使子环境与AsyncVectorEnv
并行运行也是如此。这样,您可以选择如何自己处理这些异常(使用try...except
)。
>>> class ErrorEnv(gym.Env):
... observation_space = gym.spaces.Box(-1., 1., (2,), np.float32)
... action_space = gym.spaces.Discrete(2)
...
... def reset(self):
... return np.zeros((2,), dtype=np.float32)
...
... def step(self, action):
... if action == 1:
... raise ValueError("An error occurred.")
... observation = self.observation_space.sample()
... return (observation, 0., False, {})
>>> envs = gym.vector.AsyncVectorEnv([lambda: ErrorEnv()] * 3)
>>> observations = envs.reset()
>>> observations, rewards, dones, infos = envs.step(np.array([0, 0, 1]))
ERROR: Received the following error from Worker-2: ValueError: An error occurred.
ERROR: Shutting down Worker-2.
ERROR: Raising the last exception back to the main process.
ValueError: An error occurred.
Advanced Usage
Custom spaces
矢量化环境会对来自标准Gym空间(如gym.spaces.Box
、gym.spaces.Discrete
或gym.spaces.Dict
)的元素的action和obs进行批处理。但是,如果您使用自定义的action和/或obs空间(继承自gym.space
)创建自己的环境,矢量化环境不会尝试自动批处理操作/观察,而是从所有子环境中返回元素的原始元组。
在下面的示例中,我们创建了一个新环境SMILESEnv
,其观察结果是表示分子结构的SMILES符号的字符串,具有一个自定义观察空间SMILES
。矢量化环境返回的observation包含在字符串元组中。
>>> class SMILES(gym.Space):
... def __init__(self, symbols):
... super().__init__()
... self.symbols = symbols
...
... def __eq__(self, other):
... return self.symbols == other.symbols
>>> class SMILESEnv(gym.Env):
... observation_space = SMILES("][()CO=")
... action_space = gym.spaces.Discrete(7)
...
... def reset(self):
... self._state = "["
... return self._state
...
... def step(self, action):
... self._state += self.observation_space.symbols[action]
... reward = done = (action == 0)
... return (self._state, float(reward), done, {})
>>> envs = gym.vector.AsyncVectorEnv(
... [lambda: SMILESEnv()] * 3,
... shared_memory=False
... )
>>> envs.reset()
>>> observations, rewards, dones, infos = envs.step(np.array([2, 5, 4]))
>>> observations
('[(', '[O', '[C')
自定义观察和动作空间可能继承自gym.Space
。但是,大多数用例应该可以由现有的空间类(例如gym.spaces.Box
,gym.spaces.Discrete
等)和容器类(gym.spaces.Tuple
和gym.spaces.Dict
)覆盖。此外,强化学习算法的某些实现可能无法正确处理自定义空间。请谨慎使用自定义空间。
如果将AsyncVectorEnv
与自定义观测空间一起使用,则必须将shared_memory=False
,因为共享内存和自动批处理与自定义空间不兼容。通常,如果将自定义空间与AsyncVectorEnv
一起使用,则这些空间的元素必须是pickleable
。
API Reference
VectorEnv
action_space
(批量化的)动作空间。 step
函数的输入动作必须是action_space
的合法元素。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.action_space
MultiDiscrete([2 2 2])
observation_space
(批量化的)观测空间。reset
和step
返回的observation必须是observation_space
的合法元素。
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.observation_space
Box([[-4.8 ...]], [[4.8 ...]], (3, 4), float32)
single_action_space
子环境的动作空间
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_action_space
Discrete(2)
single_observation_space
子环境的观测空间
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_action_space
Box([-4.8 ...], [4.8 ...], (4,), float32)
Reset
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
array([[-0.04456399, 0.04653909, 0.01326909, -0.02099827],
[ 0.03073904, 0.00145001, -0.03088818, -0.03131252],
[ 0.03468829, 0.01500225, 0.01230312, 0.01825218]],
dtype=float32)
Step
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)
>>> observations
array([[ 0.00122802, 0.16228443, 0.02521779, -0.23700266],
[ 0.00788269, -0.17490888, 0.03393489, 0.31735462],
[ 0.04918966, 0.19421194, 0.02938497, -0.29495203]],
dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})
Seed
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.seed([1, 3, 5])
>>> envs.reset()
array([[ 0.03073904, 0.00145001, -0.03088818, -0.03131252],
[ 0.02281231, -0.02475473, 0.02306162, 0.02072129],
[-0.03742824, -0.02316945, 0.0148571 , 0.0296055 ]],
dtype=float32)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)