强化学习系列文章(二十六):向量化环境Vectorized Environments

OpenAI Gym最近公布了官方API手册,可以趁机学习一下环境运行的并行化技术。

https://www.gymlibrary.ml/pages/vector_api/index

Vectorized Environments

所谓“矢量化环境”,是运行多个(独立)子环境的环境,可以按顺序运行,也可以使用多处理并行运行。矢量化环境将一批action作为输入,并返回一批observation。例如,当策略被定义为对一批obs进行操作的神经网络时,矢量化环境就特别有用了。

Gym 提供两种类型的矢量化环境:

  • gym.vector.SyncVectorEnv,其中的子环境按顺序执行。
  • gym.vector.AsyncVectorEnv,其中的子环境使用多进程并行执行。这将为每个子环境创建一个进程。

gym.make类似,您可以使用gym.vector.make函数运行已注册环境的矢量化版本。这将运行同一环境的多个副本(默认情况下并行运行)。

下面的示例并行运行CartPole-v1环境的3个副本,将3个二值操作(每个子环境一个)的向量作为输入,并返回一个沿第一维度堆叠的3个observation的数组,其中包含每个子环境返回的奖励数组,以及一个指示每个子环境中的episode是否已结束的布尔数组。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)

>>> observations
array([[ 0.00122802,  0.16228443,  0.02521779, -0.23700266],
        [ 0.00788269, -0.17490888,  0.03393489,  0.31735462],
        [ 0.04918966,  0.19421194,  0.02938497, -0.29495203]],
        dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})

函数gym.vector.make仅用于基本情况(例如,运行同一注册环境的多个副本)。对于任何其他用例,请使用SyncVectorEnv进行顺序执行,或使用AsyncVectorEnv进行并行执行。这些用例可能包括:

  • 使用不同的参数运行同一环境的多个实例(例如,具有不同重力值的"Pendulum-v0"
  • 运行未注册环境(例如自定义环境)的多个实例
  • 在某些(但不是全部)子环境中使用包装器。

Creating a vectorized environment

若要创建运行多个子环境的矢量化环境,可以将子环境包装在gym.vector.SyncVectorEnv(用于顺序执行)或gym.vector.AsyncVectorEnv(用于并行执行,具有多进程)中。这些创建矢量化环境的API的输入是“一个指定如何创建子环境的可调用对象的列表”。

>>> envs = gym.vector.AsyncVectorEnv([
        lambda: gym.make("CartPole-v1"),
        lambda: gym.make("CartPole-v1"),
        lambda: gym.make("CartPole-v1")
    ])

或者,要创建同一环境的多个副本的矢量化环境,可以使用函数gym.vector.make()

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)  # Equivalent

要启用action和observation的自动批处理,所有子环境必须共享相同的action_spaceobservation_space。但是,所有子环境都不需要是彼此的精确副本。例如,可以使用以下命令在矢量化环境中运行2个具有不同重力值的Pendulum-v0实例:

>>> env = gym.vector.AsyncVectorEnv([
        lambda: gym.make("Pendulum-v0", g=9.81),
        lambda: gym.make("Pendulum-v0", g=1.62)
    ])

关于自动批处理的详细信息,请参考Observation & Action spaces章节。

AsyncVectorEnvspawnforkserver start方法一起使用时,必须使用'if __name__ == '__main__":' 包装包含矢量化环境的代码。有关详细信息,请参阅此文档

if __name__ == "__main__":
    envs = gym.vector.make("CartPole-v1", num_envs=3, context="spawn")

Working with vectorized environments

虽然标准Gym环境执行单个action并返回单个observation(包括reward和done),但矢量化环境是将一批action作为输入,并返回一批observation,以及一系列奖励和布尔值Done,指示episode是否在每个子环境中结束。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
array([[ 0.00198895, -0.00569421, -0.03170966,  0.00126465],
       [-0.02658334,  0.00755256,  0.04376719, -0.00266695],
       [-0.02898625,  0.04779156,  0.02686412, -0.01298284]],
      dtype=float32)

>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)

>>> observations
array([[ 0.00187507,  0.18986781, -0.03168437, -0.301252  ],
       [-0.02643229, -0.18816885,  0.04371385,  0.3034975 ],
       [-0.02803041,  0.24251814,  0.02660446, -0.29707024]],
      dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})

矢量化环境与任何子环境兼容,无论动作和观察空间如何(例如,像gym.spaces.Dict这样的容器空间,或任何任意嵌套的空间)。特别是,矢量化环境可以自动批处理VectorEnv.resetVectorEnv.step返回的任何标准GymSpace定义的observation(例如gym.spaces.Boxgym.spaces.Discretegym.spaces.Dict或其任何嵌套结构)。同样,矢量化环境也可以执行批量action,而这些action可以是任何标准的GymSpace定义的。

>>> class DictEnv(gym.Env):
...     observation_space = gym.spaces.Dict({
...         "position": gym.spaces.Box(-1., 1., (3,), np.float32),
...         "velocity": gym.spaces.Box(-1., 1., (2,), np.float32)
...     })
...     action_space = gym.spaces.Dict({
...         "fire": gym.spaces.Discrete(2),
...         "jump": gym.spaces.Discrete(2),
...         "acceleration": gym.spaces.Box(-1., 1., (2,), np.float32)
...     })
...
...     def reset(self):
...         return self.observation_space.sample()
...
...     def step(self, action):
...         observation = self.observation_space.sample()
...         return (observation, 0., False, {})

>>> envs = gym.vector.AsyncVectorEnv([lambda: DictEnv()] * 3)
>>> envs.observation_space
Dict(position:Box(-1.0, 1.0, (3, 3), float32), velocity:Box(-1.0, 1.0, (3, 2), float32))
>>> envs.action_space
Dict(fire:MultiDiscrete([2 2 2]), jump:MultiDiscrete([2 2 2]), acceleration:Box(-1.0, 1.0, (3, 2), float32))

>>> envs.reset()
>>> actions = {
...     "fire": np.array([1, 1, 0]),
...     "jump": np.array([0, 1, 0]),
...     "acceleration": np.random.uniform(-1., 1., size=(3, 2))
... }
>>> observations, rewards, dones, infos = envs.step(actions)
>>> observations
{"position": array([[-0.5337036 ,  0.7439302 ,  0.41748118],
                    [ 0.9373266 , -0.5780453 ,  0.8987405 ],
                    [-0.917269  , -0.5888639 ,  0.812942  ]], dtype=float32),
"velocity": array([[ 0.23626241, -0.0616814 ],
                   [-0.4057572 , -0.4875375 ],
                   [ 0.26341468,  0.72282314]], dtype=float32)}

矢量化环境中的子环境在episode结束时会自动调用obj:reset。在下面的示例中,第 3 个子环境的episode在2个step后结束(agent掉入一个洞中),子环境被重置(observation0)。

>>> envs = gym.vector.make("FrozenLake-v1", num_envs=3, is_slippery=False)
>>> envs.reset()
array([0, 0, 0])
>>> observations, rewards, dones, infos = envs.step(np.array([1, 2, 2]))
>>> observations, rewards, dones, infos = envs.step(np.array([1, 2, 1]))

>>> dones
array([False, False,  True])
>>> observations
array([8, 2, 0])

Observation & Action spaces

与任何Gym环境一样,矢量化环境包含VectorEnv.observation_spaceVectorEnv.action_space两个属性,用于指定环境的观察和操作空间。由于矢量化环境在多个子环境中运行,其中所有子环境执行的操作和返回的观测值一起批处理,因此obs和action空间也进行批处理,只要输入action是VectorEnv.action_space的有效元素,并且observation是VectorEnv.observation_space的有效元素。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.observation_space
Box([[-4.8 ...]], [[4.8 ...]], (3, 4), float32)
>>> envs.action_space
MultiDiscrete([2 2 2])

为了在矢量化环境中对obs和action进行适当的批处理,所有子环境的观测值和动作空间必须相同。

>>> envs = gym.vector.AsyncVectorEnv([
...     lambda: gym.make("CartPole-v1"),
...     lambda: gym.make("MountainCar-v0")
... ])
RuntimeError: Some environments have an observation space different from `Box([-4.8 ...], [4.8 ...], (4,), float32)`. In order to batch observations, the observation spaces from all environments must be equal.

但是,有时访问特定子环境的obs和action空间(而不是批处理的空间)可能很方便。可以使用矢量化环境的属性VectorEnv.single_observation_spaceVectorEnv.single_action_space访问这些属性。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_observation_space
Box([-4.8 ...], [4.8 ...], (4,), float32)
>>> envs.single_action_space
Discrete(2)

这很方便,例如在实例化一个策略函数的时候。在下面的示例中,使用VectorEnv.single_observation_spaceVectorEnv.single_action_space来定义线性策略的权重。请注意,由于矢量化环境,我们只需调用一次策略函数,即可将策略直接应用于整批observation。

>>> from gym.spaces.utils import flatdim
>>> from scipy.special import softmax

>>> def policy(weights, observations):
...     logits = np.dot(observations, weights)
...     return softmax(logits, axis=1)

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> weights = np.random.randn(
...     flatdim(envs.single_observation_space),
...     envs.single_action_space.n
... )
>>> observations = envs.reset()
>>> actions = policy(weights, observations).argmax(axis=1)
>>> observations, rewards, dones, infos = envs.step(actions)

Intermediate Usage

Shared memory

AsyncVectorEnv在单个进程内运行每个子环境。在每次调用AsyncVectorEnv.resetAsyncVectorEnv.step时,所有子环境的观察结果都会发送回主进程。为了避免在进程之间进行昂贵的数据传输,特别是对于大型observation(例如图像),AsyncVectorEnv默认使用共享内存(shared_memory=True),进程可以以最低的成本写入和读取。这可以提升矢量化环境的整体性能。

>>> env_fns = [lambda: gym.make("BreakoutNoFrameskip-v4")] * 5

>>> envs = gym.vector.AsyncVectorEnv(env_fns, shared_memory=False)
>>> envs.reset()
>>> %timeit envs.step(envs.action_space.sample())
2.23 ms ± 136 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

>>> envs = gym.vector.AsyncVectorEnv(env_fns, shared_memory=True)
>>> envs.reset()
>>> %timeit envs.step(envs.action_space.sample())
1.36 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Exception handling

由于有时事情可能不会按计划进行,因此在矢量化环境中会重新引发子环境中引发的异常,即使子环境与AsyncVectorEnv并行运行也是如此。这样,您可以选择如何自己处理这些异常(使用try...except)。

>>> class ErrorEnv(gym.Env):
...     observation_space = gym.spaces.Box(-1., 1., (2,), np.float32)
...     action_space = gym.spaces.Discrete(2)
...
...     def reset(self):
...         return np.zeros((2,), dtype=np.float32)
...
...     def step(self, action):
...         if action == 1:
...             raise ValueError("An error occurred.")
...         observation = self.observation_space.sample()
...         return (observation, 0., False, {})

>>> envs = gym.vector.AsyncVectorEnv([lambda: ErrorEnv()] * 3)
>>> observations = envs.reset()
>>> observations, rewards, dones, infos = envs.step(np.array([0, 0, 1]))
ERROR: Received the following error from Worker-2: ValueError: An error occurred.
ERROR: Shutting down Worker-2.
ERROR: Raising the last exception back to the main process.
ValueError: An error occurred.

Advanced Usage

Custom spaces

矢量化环境会对来自标准Gym空间(如gym.spaces.Boxgym.spaces.Discretegym.spaces.Dict)的元素的action和obs进行批处理。但是,如果您使用自定义的action和/或obs空间(继承自gym.space)创建自己的环境,矢量化环境不会尝试自动批处理操作/观察,而是从所有子环境中返回元素的原始元组。

在下面的示例中,我们创建了一个新环境SMILESEnv,其观察结果是表示分子结构的SMILES符号的字符串,具有一个自定义观察空间SMILES。矢量化环境返回的observation包含在字符串元组中。

>>> class SMILES(gym.Space):
...     def __init__(self, symbols):
...         super().__init__()
...         self.symbols = symbols
...
...     def __eq__(self, other):
...         return self.symbols == other.symbols

>>> class SMILESEnv(gym.Env):
...     observation_space = SMILES("][()CO=")
...     action_space = gym.spaces.Discrete(7)
...
...     def reset(self):
...         self._state = "["
...         return self._state
...
...     def step(self, action):
...         self._state += self.observation_space.symbols[action]
...         reward = done = (action == 0)
...         return (self._state, float(reward), done, {})

>>> envs = gym.vector.AsyncVectorEnv(
...     [lambda: SMILESEnv()] * 3,
...     shared_memory=False
... )
>>> envs.reset()
>>> observations, rewards, dones, infos = envs.step(np.array([2, 5, 4]))
>>> observations
('[(', '[O', '[C')

自定义观察和动作空间可能继承自gym.Space。但是,大多数用例应该可以由现有的空间类(例如gym.spaces.Boxgym.spaces.Discrete等)和容器类(gym.spaces.Tuplegym.spaces.Dict)覆盖。此外,强化学习算法的某些实现可能无法正确处理自定义空间。请谨慎使用自定义空间。

如果将AsyncVectorEnv与自定义观测空间一起使用,则必须将shared_memory=False,因为共享内存和自动批处理与自定义空间不兼容。通常,如果将自定义空间与AsyncVectorEnv一起使用,则这些空间的元素必须是pickleable

API Reference

VectorEnv

action_space
(批量化的)动作空间。 step函数的输入动作必须是action_space的合法元素。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.action_space
MultiDiscrete([2 2 2])

observation_space
(批量化的)观测空间。resetstep返回的observation必须是observation_space的合法元素。

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.observation_space
Box([[-4.8 ...]], [[4.8 ...]], (3, 4), float32)

single_action_space
子环境的动作空间

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_action_space
Discrete(2)

single_observation_space
子环境的观测空间

>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.single_action_space
Box([-4.8 ...], [4.8 ...], (4,), float32)
Reset
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
array([[-0.04456399,  0.04653909,  0.01326909, -0.02099827],
        [ 0.03073904,  0.00145001, -0.03088818, -0.03131252],
        [ 0.03468829,  0.01500225,  0.01230312,  0.01825218]],
        dtype=float32)
Step
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.reset()
>>> actions = np.array([1, 0, 1])
>>> observations, rewards, dones, infos = envs.step(actions)

>>> observations
array([[ 0.00122802,  0.16228443,  0.02521779, -0.23700266],
        [ 0.00788269, -0.17490888,  0.03393489,  0.31735462],
        [ 0.04918966,  0.19421194,  0.02938497, -0.29495203]],
        dtype=float32)
>>> rewards
array([1., 1., 1.])
>>> dones
array([False, False, False])
>>> infos
({}, {}, {})
Seed
>>> envs = gym.vector.make("CartPole-v1", num_envs=3)
>>> envs.seed([1, 3, 5])
>>> envs.reset()
array([[ 0.03073904,  0.00145001, -0.03088818, -0.03131252],
        [ 0.02281231, -0.02475473,  0.02306162,  0.02072129],
        [-0.03742824, -0.02316945,  0.0148571 ,  0.0296055 ]],
        dtype=float32)
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐