多智能体强化学习:基本概念,通信方式,IPPO,MADDPG
1,基本概念1.1,简介单个RL智能体通过与外界的交互来学习知识,具体过程是根据当前环境的状态,智能体通过策略给出的动作来对环境进行响应,相应地,智能体会得到一个奖励值以反馈动作的好坏程度。RL最重要的目标就是学习到能够使奖励最大化的策略,并且与监督学习的不同是这种奖励在很多情况下存在延迟。大多数RL的成功应用都是在单智能体场景下,无须建模和预测环境中的其他智能体。但是有很多重要的应用场景涉及多个
1,基本概念
1.1,简介
单个RL智能体通过与外界的交互来学习知识,具体过程是根据当前环境的状态,智能体通过策略给出的动作来对环境进行响应,相应地,智能体会得到一个奖励值以反馈动作的好坏程度。RL最重要的目标就是学习到能够使奖励最大化的策略,并且与监督学习的不同是这种奖励在很多情况下存在延迟。
大多数RL的成功应用都是在单智能体场景下,无须建模和预测环境中的其他智能体。但是有很多重要的应用场景涉及多个智能体之间的交互,问题会变得很复杂。比如,多个机器人协同、多玩家游戏等,这些都是多智能体的场景。另外,多智能体自我博弈也是一个有效的RL方式。因此,把RL从单智能体成功地扩展到多智能体环境对于设计能够与人类或者智能体之间互相交互的智能体系统非常重要。
多智能体系系统往往是在不可预测的动态环境中进行问题求解,所以集中控制机制无法很好地预测每个个体下一步的行为。为了解决这个问题,主要有三种解决方案:
- 设计有效约束多智能体系统的规则,规范智能体行为的选择,避免冲突。
- 利用通信手段,使得智能体之间能通过有效的交流避免冲突并增进协作。
- 增加学习机制,让智能体能够在执行动作和交互中学习,并且越学越聪明。
1.2,环境设置
POMDP可以被看成是MDP的扩展,它的状态空间包括所有所有定义在对应的MDP的状态集合上的概率分布,这些概率分布表现了信念状态。POMDP将MDP问题延伸到系统状态不能完全观察的情况下。MDP的这种扩展极大地增加了POMDP的复杂性。为了采取有效的动作,智能体可能需要考虑所有以往的观察和行动历时,而不仅仅是当前所处的状态。
通常,用一个七元组 来描述POMDP,其中 与MDP中的定义一致,另外还有:
- :一组观察结果集,比如机器人的传感器获得的环境数据。在MDP中,由于智能体完全了解系统状态,因此 。而在部分可见环境中,观察仅在概率上取决于潜在的环境状态。因为在不同的环境状态中可以得到相同的观察,因此确定智能体所处的状态变得困难。
- : 是一个观察函数,表明系统状态和观察值之间的关系。具体是在智能体在执行动作 进入环境状态 后得到观察值的概率 。
设:有 个智能体, 表示智能体状态, 表示第 个智能体。
状态转移概率:,下一状态 会受到所有Agent动作的影响,Agent之间会相互影响,并非独立。
奖励值: 表示第 个智能体获得的奖励。 不仅取决于它自己的动作 ,还受到其他智能体动作 的影响。例如:足球中的乌龙球。
- Fully cooperative:
- Fully comperative:
回报:用 表示第 个智能体在第 时刻获得的奖励, 依赖于状态 和 , 因为状态转移概率具有随机性,而 因为网络输出是动作的概率分布也具有随机性,因此 具有随机性。用 表示第 时刻获得的回报。 用 或 表示折扣回报。
策略网络:每个智能体都有自己的策略网络来近似策略函数,策略网络的输入是状态 ,输出是动作的概率值,策略网络用 表示。
- 有情况下,可以使用相同的策略网络参数可以互换 ,如同一款无人车的策略网络相同。
- 大部分情况下,策略网络结构可以相同,但参数不能相同,如在足球中,球员有的进攻,有的防守,还有守门员。
状态价值函数:用 表示第 个智能体在状态 下的状态价值函数。其中取期望是为了消除 外的所有的状态,消除掉所有Agent的动作,只依赖于状态 。动作是随机的 ,所以第 个网络会影响到第 个智能体的动作选择,进而影响到 ,影响到状态价值函数 ,尽管 。也就是说:一个智能体的策略会影响到所有智能体的状态价值 ,仅优化自己的策略不一定会让 变大,因为其他人的策略都在发生变化(牵一发而动全身)。
智能体关系:
- Fully cooperative(完全合作关系):Agent利益一致,获得的奖励相同,有共同的目标。
- Fully comperative(完全竞争关系):一方获得的奖励是另一方的损失。比如比赛场上的两个机器人。
- Mixed Cooperative & competitive(混合关系):既有竞争,也有合作。例如:足球机器人,两支球队是竞争关系,每个队伍内部是合作关系。
- Self-interested (利己主义):一个Agent的动作会改变环境的状态,此动作可能让其他Agent受益或受损,但是它不在乎,只在乎自身的利益最大化。
1.3,收敛问题
收敛:无法通过改进策略,获得更大的期望回报。即:如果所有Agent都找不到更好的策略,就说明已经收敛,可以终止训练。
单智能体设定下的策略学习:只有一个Agent,所以只有一个策略网络 ,它的参数是 ,把状态价值函数记做 ,它只依赖于当前状态 和策略网络参数 。对 关于状态 求期望,把状态 给消掉,把期望记作 ,可以评价网络的好坏,策略越好, 越大。
由于只有一个Agent,所以 只取决于这一个 Agent 的策略,通过改进策略,可以让 变大,也就是说我们希望找到参数 ,使得 最大化。如果 停止增长,说明无法让策略变得更好,说明已经收敛。
多智能体设定下的策略学习:采用纳什均衡(当其余所有Agent都不改变策略的情况下,一个Agent单独改变策略,不会让自己获得更高的回报)判断是否收敛。有多个智能体参与博弈,一个Agent指定策略的时候,要考虑到其他各方的策略,在纳什均衡的情况下,每一个Agent都在以最优的方式来应对其他各方的策略。如果所有Agent都是理性的,在纳什均衡的情况下,谁也没有动机去改变自己的策略,因为改变策略不会增加自己的收益,这样就达到一种平衡状态,所有Agent都找不到最好的策略,这种平衡状态就算是收敛。
1.4,MARL&RL对比
对于多智能体强化学习直接套用单智能体强化学习算法效果不好,可能会不收敛。例如直接将但智能体策略梯度应用到多智能体中:系统中有 个Agent,独立跟环境交互,每个Agent都能观察到环境的状态 ,能接收到环境反馈的奖励 ,每个智能体都能自主决策,算出动作 然后执行 ,每个Agent独立使用 和 以及动作 来计算策略梯度,然后更新自己的策略网络,跟单智能体策略梯度完全相同,自始至终这些Agent都不通信,不知道其他Agent的信息。
设第 个智能体策略网络 ,第 个智能体的状态价值函数 ,把 关于状态 求期望: 消除掉 ,为了获得最大的回报,通过改进自己的策略参数 使得 变大,意味着期望回报变大。
第1个智能体:
第2个智能体:
......
第n个智能体:
这些 各不相同,各个Agent更新自己的参数 ,这种计算方法那么可能此算法永远不能收敛,一个Agent更新策略会导致其他Agent的目标函数发生改变,大家的目标函数都在不停的变化。
假如第 个Agent已经找到了最优策略,即:,它无法通过改进策略使得自己获得更高的回报,这时候另外一个Agent改变了策略,这时候 发生了改变, 不再是最优策略,因此还需要继续更新策略,重新寻找最优策略。这时候又会影响其他Agent的目标函数,这就会导致所有的Agent都在寻找最优策略。
2,Agent的通信方式
Fully decentralized(去中心化):每个agent独立和环境交互并且用自己的观测和奖励来更新自己的策略,Agent是独立的个体,它们彼此之间不沟通交流。
Fully centralized(完全中心化):所有Agent都把信息传送给中央控制器,中央控制器知道所有Agent的观测、动作、以及奖励,Agent上没有策略网络,Agent自己不做决策,决策都是由中央做的,Agent只是执行指令。
Centralized training with decentralized execution(中心化训练,去中心化执行):Agent各自有各自的策略网络,训练的时候有一个中央控制器,它会收集所有Agent的观测、动作、以及奖励,中央控制器帮助Agent训练策略网络,训练结束之后,就不再用中央控制器了,每个Agent根据自己的观测,用自己的策略网络做决策,不需要跟中央控制器通信。
2.1,部分可观测
Multi-agent强化学习通常假设partial observation(不完全观测),这是因为Agent往往只能观测到它局部的状态,而看不到全局的状态 ,把第 个Agent的观测记作 ,它是对状态 的局部观测,满足 。而不同的Agent有通常有不同的局部观测,完全观测的意思是每个Agent都能看到全局的状态 。
2.2,Full decentralized(去中心化)
Fully decentralized(去中心化):每个agent独立和环境交互并且用自己的观测和奖励来更新自己的策略,Agent是独立的个体,它们彼此之间不沟通交流。
Agent获取观测值 和奖励 ,不知道别人的观测和动作,每个Agent都有自己独立的策略网络,Agent独立训练自己的策略网络,跟单智能体强化学习完全一样,训练结束之后,每个Agent用自己的策略网络来做决策,把观测的 输入策略网络,策略网络输出概率分布,抽样得到 ,然后执行动作 。
不论是训练还是执行Agent之间都没有沟通交流,这种去中心化的本质是单智能体强化学习,而不是真正的多智能体强化学习。这种方法的缺点在于:认为agent之间完全独立,忽视了Agent相互之间的影响,这是不合理的。
2.3,Fully centralized(完全中心化)
Fully centralized(完全中心化):所有Agent都把信息传送给中央控制器,中央控制器知道所有Agent的观测、动作、以及奖励,Agent上没有策略网络,Agent自己不做决策,决策都是由中央做的,Agent只是执行指令。
有 个Agent跟环境交互,每个Agent动作都会改变环境,进而影响其他智能体。智能体受控于中央控制器,Agent上没有策略网络,因此不能做决策,都需要听中央控制器安排。训练的时候Agent将自己的观测 和奖励 向中央报告,策略网络在中央,中央来进行决策,中央把决策 传达到Agent,每个Agent都按照中央的指示来做动作跟换进交互,训练在中央做,中央控制器用所有的观察,奖励以及动作来训练策略网络。
中央控制器训练出 个策略网络 ,不同网络结构可能相同,但是参数不同。 表示第 个策略网络的参数,策略网络的输入是 ,决定第 个智能体的动作 。决策只能由中央来做,这是因为策略网络用到所有Agent的观测,一个Agent只能得到自己的观测 ,无法得到全部观测。
这种方法的缺点在于:因为要收集完所有agent的信息后才能决策,因此整体速度要根据最慢的成员来决定,算法速度慢是其缺点,此算法一般无法做到实时决策。
2.4,Centralized training with decentralized execution(中心化训练,去中心化执行)
Centralized training with decentralized execution(中心化训练,去中心化执行):Agent各自有各自的策略网络,训练的时候有一个中央控制器,它会收集所有Agent的观测、动作、以及奖励,中央控制器帮助Agent训练策略网络,训练结束之后,就不再用中央控制器了,每个Agent根据自己的观测,用自己的策略网络做决策,不需要跟中央控制器通信。
系统利用 个Agent,每个Agent(Actor)上有一个策略网络,Agent跟环境交互,从环境中获得 ,Agent自己做决策得到动作 ,然后执行 ,训练的时候需要中央控制器,Agent跟中央控制器通信,把动作,观测和奖励发送给中央处理器。
中央控制器有所有Agent的信息:。中央控制器有 个价值网络 ,每个价值网络都对应一个Agent,在中央网络训练价值网络 ,使用TD算法更新 。
训练结束之后不需要中央控制器了,每个Agent独立跟环境交互,Agent从局部环境观测到 ,每个Agent都有自己的决策网络 ,将观察作为输入,策略网络会输出一个动作概率分布,根据概率分布选择动作 ,每个Agent执行自己的动作 ,然后环境会改变状态。
3, IPPO算法
3.1,算法流程
IPPO:一个完全去中心化的算法,此算法被称为独立学习。由于对每个智能体使用单智能体算法 PPO 进行训练,因此这个算法叫作独立PPO(IPPO)。其中使用的PPO算法版本为PPO-截断,算法流程:
对于N个智能体,为每个智能体初始化各自的策略以及价值函数 for 训练轮数 k=0,1,2... do 所有智能体在环境中交互分别获得各自的一条轨迹数据 对每个智能体,基于当前的价值函数用GAE计算优势函数的估计 对每个智能体,通过最大化其PPO-截断的目标来更细策略 对每个智能体,通过均方误差损失函数优化其价值函数 end for
3.2,算法实践
使用到的多智能体环境:ma_gym库中的Combat环境(https://github.com/boyu-ai/ma-gym.git)。Combat是一个在二维的格子世界上进行的两个队伍的对战模拟游戏,每个智能体的动作集合为:向周围移动1格,攻击周围3*3格范围内的其他敌对智能体或者不采取任何动作。起初每个智能体有3点生命值,如果智能体在敌人的攻击范围被击到了,则会扣1点生命值,生命值掉为0则死亡,最后存活的队伍获胜。每个智能体的攻击又一轮的冷却时间。
在游戏中,我们能够控制一个队伍的所有智能体与另一个队伍的智能体对战。另一个队伍的智能体使用固定的算法:攻击范围内最近的敌人,如果攻击范围内没有敌人,则向敌人靠近。
import torch import torch.nn.functional as F import numpy as np import rl_utils from tqdm import tqdm import matplotlib.pyplot as plt from magym.ma_gym.envs.combat.combat import Combat class PolicyNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc2(F.relu(self.fc1(x)))) return F.softmax(self.fc3(x), dim=1) class ValueNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc2(F.relu(self.fc1(x)))) return self.fc3(x) class PPO: ''' PPO算法,采用截断方式 ''' def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps, gamma, device): self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.eps = eps # PPO中截断范围的参数 self.device = device def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update(self, transition_dict): states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device) td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) td_delta = td_target - self.critic(states) advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device) old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach() log_probs = torch.log(self.actor(states).gather(1, actions)) ratio = torch.exp(log_probs - old_log_probs) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage # 截断 actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数 critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() actor_lr = 3e-4 critic_lr = 1e-3 num_episodes = 100000 hidden_dim = 64 gamma = 0.99 lmbda = 0.97 eps = 0.2 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") team_size = 2 grid_size = (15, 15) #创建Combat环境,格子世界的大小为15x15,己方智能体和敌方智能体数量都为2 env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size) state_dim = env.observation_space[0].shape[0] action_dim = env.action_space[0].n #两个智能体共享同一个策略 agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps, gamma, device) win_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): transition_dict_1 = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [] } transition_dict_2 = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [] } s = env.reset() terminal = False while not terminal: a_1 = agent.take_action(s[0]) a_2 = agent.take_action(s[1]) next_s, r, done, info = env.step([a_1, a_2]) transition_dict_1['states'].append(s[0]) transition_dict_1['actions'].append(a_1) transition_dict_1['next_states'].append(next_s[0]) transition_dict_1['rewards'].append( r[0] + 100 if info['win'] else r[0] - 0.1) transition_dict_1['dones'].append(False) transition_dict_2['states'].append(s[1]) transition_dict_2['actions'].append(a_2) transition_dict_2['next_states'].append(next_s[1]) transition_dict_2['rewards'].append( r[1] + 100 if info['win'] else r[1] - 0.1) transition_dict_2['dones'].append(False) s = next_s terminal = all(done) win_list.append(1 if info["win"] else 0) agent.update(transition_dict_1) agent.update(transition_dict_2) if (i_episode + 1) % 100 == 0: pbar.set_postfix({ 'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(win_list[-100:]) }) pbar.update(1) win_array = np.array(win_list) #每100条轨迹取一次平均 win_array = np.mean(win_array.reshape(-1, 100), axis=1) episodes_list = np.arange(win_array.shape[0]) * 100 plt.plot(episodes_list, win_array) plt.xlabel('Episodes') plt.ylabel('Win rate') plt.title('IPPO on Combat') plt.show()
4,MADDPG
4.1,MADDPG算法
在多智能体环境(Multi-Agent environments)中,智能体之间对资源的恶性竞争现象无疑是通往通用人工智能路上的一块绊脚石。多智能体环境具有两大实用的特性:
- 首先,它提供了一个原生的课程(Natural curriculum)——这里环境的困难程度取决于竞争对手的能力(而如果你是与自身的克隆进行竞争,则该环境与你的能力等级是相当匹配的)。
- 其次,多智能体环境不具有稳定的平衡:因为无论智能体多么聪明,总是存在着更大压力使得它更加聪明。这些环境与传统环境有着非常大的不同,因此还有更多的研究有待进行。
中心化训练去中心化执行(CTDE)是指在训练的时候使用一些单智能体看不到的全局信息以达到更好的训练效果,而在执行时不使用这些信息,每个智能体完全根据自己的策略直接动作以达到去中心化执行的效果。中心化训练去中心化执行算法能够在训练时有效地利用全局信息以达到去中心化执行效果,同时在进行策略模型推断时可以仅利用局部信息,使得算法具有一定的扩展性。CTDE可以类比成一个足球队的训练和比赛过程:在训练时,11个球员可以直接获得教练的指导从而完成球队的整体配合,而教练本身掌握着比赛全局信息,教练的指导也是从整支队伍、整场比赛的角度进行的;而训练好的11个球员在上场比赛时,则根据场上的实时情况直接做出决策,不再有教练的指导。
CTDE算法主要分为两种:一种是基于值函数的方法,例如VDN、QMIX等;另一种是基于Actor-Critic的方法,例如MADDPG和COMA等。
CTDE算法的应用场景通常可以被建模为一个部分可观测马尔可夫博弈:用 代表 个智能体所有可能的状态空间,这是全局的信息。对于每个智能体 ,其动作空间 ,观测空间为 ,每个智能体的策略 是一个概率分布,用来表示智能体在每个观测下采取各个动作的概率。环境的状态空间转移函数为 。每个智能体的奖励函数为 ,每个智能体从全局状态得到的部分观测信息为 ,初始状态分布为 。每个智能体的目标是最大化其期望积累奖励 。
多智能体DDPG(multi-agent DDPG)本质上是对每个智能体实现一个DDPG的算法。所有智能体共享一个中心化的Critic网络,该Critic网络在训练的过程中同时对每个智能体的Actor网络给出指导,而执行时每个智能体的Actor网络则完全独立做出动作,即去中心化地执行。该算法将模拟中的每个智能体视为一个“Actor”,并且每个Actor将从“Critic”那儿获得建议,这些建议可以帮助Actor在训练过程中决定哪些行为是需要加强的。通常而言,Critic试图预测在某一特定状态下的行动所带来的价值(比如,我们期望能够获得的奖励),而这一价值将被智能体(Actor)用于更新它的行动策略。这么做比起直接使用奖励来的更加稳定,因为直接使用奖励可能出现较大的差异变动。另外,为了使训练按全局协调方式行动的多个智能体变得可行,OpenAI的研究员还增强了Critic的级别,以便于它们可以获取所有智能体的行为和观察。
每个智能体用 Actor-Critic的方法训练,但不同于传统单智能体的情况,在MADDPG中每个智能体的Critic部分都能够获得其他智能体的策略信息。MADDPG中的智能体在测试期间不需要访问中央的Critic,智能体们将根据自己的观察和对其它代理行为的预测而行动。
具体来说,考虑一个有 个智能体的博弈,每个智能体的策略参数为 ,记 为所有智能体的策略集合,那么可以写出随机性策略情况下每个智能体的期望收益的策略梯度:
其中, 就是一个中心化的动作价值函数。一般来说 包含所有智能体的观测,另外 也需要输入所有智能体此刻的动作,因此 工作的前提就是所有智能体要同时给出自己的观测和相应的动作。
对于确定性策略,考虑现在有 个连续的策略 ,可以得到DDPG的梯度公式:
其中, 是用来存储数据的经验回放池,它存储的每一个数据为
而在MADDPG中,中心化动作价值函数可以按照如下损失函数来更新:
其中 是更新价值函数中使用的目标策略的集合,它们有着延迟更新的参数。
4.2,算法实现
import torch import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt import random import rl_utils
使用的环境为多智能体粒子环境(MPE),它是一个面向多智能体交互的环境组合,在这个环境中,粒子智能体可以移动、通信、观察其他智能体,也可以和固定位置的地标交互。由于MPE官方仓库代码已经不再维护了,而其依赖于gym的旧版本,因此需要重新安装gym库。
git clone https://github.com/boyu-ai/multiagent-particle-envs.git #详情见:https://github.com/openai/multiagent-particle-envs pip install -e multiagent-particle-envs pip install --upgrade gym==0.10.5
import gym from multiagent.environment import MultiAgentEnv import multiagent.scenarios as scenarios def make_env(scenario_name): # 从环境文件脚本中创建环境 scenario = scenarios.load(scenario_name + ".py").Scenario() world = scenario.make_world() env = MultiAgentEnv(world, scenario.reset_world, scenario.reward,scenario.observation) return env
MPE中的simple_adversary环境为代码实践:该环境中有一个红色的对抗智能体(adversary), 个蓝色的正常智能体,以及 个地点(通常为2),这 个地点中有一个是目标地点(绿色)。这 个正常智能体知道那个地点是目标地点,但对抗智能体不知道。正常智能体之间是合作关系:它们其中任意一个距离目标地点足够近,则每个正常智能体都能获得相同的奖励。对抗智能体如果距离目标地点足够近,也能获得奖励,但它需要猜测哪一个才是目标地点。因此,正常智能体需要进行合作,分散到不同的坐标地点,以此欺骗对抗智能体。
MPE环境中的每个智能体的动作空间是离散的。DDPG算法本身需要使智能体的动作对于其策略参数可导,这队连续空间来说是成立的,但是对于离散的动作空间并不成立。但这并不意味着当前的任务不能使用MADDPG算法求解,因为可以使用Gumbel-Softmax来将离散分布近似采样。
【问题】假设有一个随机变量 服从某个离散分布 ,其中 表示 且满足 。当我们希望按照这个分布即 进行采样时,可以发现这个采样并不是可导的。
【解决】引入一个重参数因子 ,它是一个采样自 的噪声:
Gumbel-Softmax采样可以写成:
此时,如果通过 计算离散值,该离散值就近似等价于离散采样 的值。更进一步,采样结果 中自然地引入了对于 的梯度。 被称作分布的温度参数 。通过调整它可以控制生成的 Gumbel-Softmax 分布与离散分布的近似程度: 越小,生成的分布越趋向于 的结果; 越大,生成的分布趋向于均匀分布。
def onehot_from_logits(logits, eps=0.01): ''' 生成最优动作的独热(one-hot)形式 ''' argmax_acs = (logits == logits.max(1, keepdim=True)[0]).float() # 生成随机动作,转换成独热形式 rand_acs = torch.autograd.Variable(torch.eye(logits.shape[1])[[np.random.choice(range(logits.shape[1]), size=logits.shape[0])]],requires_grad=False).to(logits.device) # 通过epsilon-贪婪算法来选择用哪个动作 return torch.stack([argmax_acs[i] if r > eps else rand_acs[i]for i, r in enumerate(torch.rand(logits.shape[0]))]) def sample_gumbel(shape, eps=1e-20, tens_type=torch.FloatTensor): """从Gumbel(0,1)分布中采样""" U = torch.autograd.Variable(tens_type(*shape).uniform_(),requires_grad=False) return -torch.log(-torch.log(U + eps) + eps) def gumbel_softmax_sample(logits, temperature): """ 从Gumbel-Softmax分布中采样""" y = logits + sample_gumbel(logits.shape, tens_type=type(logits.data)).to(logits.device) return F.softmax(y / temperature, dim=1) def gumbel_softmax(logits, temperature=1.0): """从Gumbel-Softmax分布中采样,并进行离散化""" y = gumbel_softmax_sample(logits, temperature) y_hard = onehot_from_logits(y) y = (y_hard.to(logits.device) - y).detach() + y # 返回一个y_hard的独热量,但是它的梯度是y,我们既能够得到一个与环境交互的离散动作,又可以 # 正确地反传梯度 return y
class TwoLayerFC(torch.nn.Module): def __init__(self, num_in, num_out, hidden_dim): super().__init__() self.fc1 = torch.nn.Linear(num_in, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, num_out) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return self.fc3(x) class DDPG: ''' DDPG算法 ''' def __init__(self, state_dim, action_dim, critic_input_dim, hidden_dim, actor_lr, critic_lr, device): self.actor = TwoLayerFC(state_dim, action_dim, hidden_dim).to(device) self.target_actor = TwoLayerFC(state_dim, action_dim,hidden_dim).to(device) self.critic = TwoLayerFC(critic_input_dim, 1, hidden_dim).to(device) self.target_critic = TwoLayerFC(critic_input_dim, 1, hidden_dim).to(device) self.target_critic.load_state_dict(self.critic.state_dict()) self.target_actor.load_state_dict(self.actor.state_dict()) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) def take_action(self, state, explore=False): action = self.actor(state) if explore: action = gumbel_softmax(action) else: action = onehot_from_logits(action) return action.detach().cpu().numpy()[0] def soft_update(self, net, target_net, tau): for param_target, param in zip(target_net.parameters(),net.parameters()): param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
class MADDPG: def __init__(self, env, device, actor_lr, critic_lr, hidden_dim,state_dims, action_dims, critic_input_dim, gamma, tau): self.agents = [] for i in range(len(env.agents)): self.agents.append( DDPG(state_dims[i], action_dims[i], critic_input_dim,hidden_dim, actor_lr, critic_lr, device)) self.gamma = gamma self.tau = tau self.critic_criterion = torch.nn.MSELoss() self.device = device @property def policies(self): return [agt.actor for agt in self.agents] @property def target_policies(self): return [agt.target_actor for agt in self.agents] def take_action(self, states, explore): states = [ torch.tensor([states[i]], dtype=torch.float, device=self.device) for i in range(len(env.agents)) ] return [ agent.take_action(state, explore) for agent, state in zip(self.agents, states) ] def update(self, sample, i_agent): obs, act, rew, next_obs, done = sample cur_agent = self.agents[i_agent] cur_agent.critic_optimizer.zero_grad() all_target_act = [onehot_from_logits(pi(_next_obs)) for pi, _next_obs in zip(self.target_policies, next_obs)] target_critic_input = torch.cat((*next_obs, *all_target_act), dim=1) target_critic_value = rew[i_agent].view(-1, 1) + self.gamma * cur_agent.target_critic(target_critic_input) * (1 - done[i_agent].view(-1, 1)) critic_input = torch.cat((*obs, *act), dim=1) critic_value = cur_agent.critic(critic_input) critic_loss = self.critic_criterion(critic_value,target_critic_value.detach()) critic_loss.backward() cur_agent.critic_optimizer.step() cur_agent.actor_optimizer.zero_grad() cur_actor_out = cur_agent.actor(obs[i_agent]) cur_act_vf_in = gumbel_softmax(cur_actor_out) all_actor_acs = [] for i, (pi, _obs) in enumerate(zip(self.policies, obs)): if i == i_agent: all_actor_acs.append(cur_act_vf_in) else: all_actor_acs.append(onehot_from_logits(pi(_obs))) vf_in = torch.cat((*obs, *all_actor_acs), dim=1) actor_loss = -cur_agent.critic(vf_in).mean() actor_loss += (cur_actor_out**2).mean() * 1e-3 actor_loss.backward() cur_agent.actor_optimizer.step() def update_all_targets(self): for agt in self.agents: agt.soft_update(agt.actor, agt.target_actor, self.tau) agt.soft_update(agt.critic, agt.target_critic, self.tau)
num_episodes = 5000 episode_length = 25 # 每条序列的最大长度 buffer_size = 100000 hidden_dim = 64 actor_lr = 1e-2 critic_lr = 1e-2 gamma = 0.95 tau = 1e-2 batch_size = 1024 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") update_interval = 100 minimal_size = 4000 env_id = "simple_adversary" env = make_env(env_id) replay_buffer = rl_utils.ReplayBuffer(buffer_size) state_dims = [] action_dims = [] for action_space in env.action_space: action_dims.append(action_space.n) for state_space in env.observation_space: state_dims.append(state_space.shape[0]) critic_input_dim = sum(state_dims) + sum(action_dims) maddpg = MADDPG(env, device, actor_lr, critic_lr, hidden_dim, state_dims, action_dims, critic_input_dim, gamma, tau)
def evaluate(env_id, maddpg, n_episode=10, episode_length=25): # 对学习的策略进行评估,此时不会进行探索 env = make_env(env_id) returns = np.zeros(len(env.agents)) for _ in range(n_episode): obs = env.reset() for t_i in range(episode_length): actions = maddpg.take_action(obs, explore=False) obs, rew, done, info = env.step(actions) rew = np.array(rew) returns += rew / n_episode return returns.tolist() return_list = [] # 记录每一轮的回报(return) total_step = 0 for i_episode in range(num_episodes): state = env.reset() # ep_returns = np.zeros(len(env.agents)) for e_i in range(episode_length): actions = maddpg.take_action(state, explore=True) next_state, reward, done, _ = env.step(actions) replay_buffer.add(state, actions, reward, next_state, done) state = next_state total_step += 1 if replay_buffer.size( ) >= minimal_size and total_step % update_interval == 0: sample = replay_buffer.sample(batch_size) def stack_array(x): rearranged = [[sub_x[i] for sub_x in x] for i in range(len(x[0]))] return [ torch.FloatTensor(np.vstack(aa)).to(device) for aa in rearranged ] sample = [stack_array(x) for x in sample] for a_i in range(len(env.agents)): maddpg.update(sample, a_i) maddpg.update_all_targets() if (i_episode + 1) % 100 == 0: ep_returns = evaluate(env_id, maddpg, n_episode=100) return_list.append(ep_returns) print(f"Episode: {i_episode+1}, {ep_returns}")
return_array = np.array(return_list) for i, agent_name in enumerate(["adversary_0", "agent_0", "agent_1"]): plt.figure() plt.plot( np.arange(return_array.shape[0]) * 100, rl_utils.moving_average(return_array[:, i], 9)) plt.xlabel("Episodes") plt.ylabel("Returns") plt.title(f"{agent_name} by MADDPG")
可以看到,正常智能体 agent_0和agent_1的回报结果完全一致,这是因为它们的奖励函数完全一致。正常智能体最终保持了正向的回报,说明它们通过合作成功地占领了两个不同的地点,进而让对抗智能体无法知道哪个地点是目标地点。另外,MADDPG的收敛速度和稳定性都比较不错。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)