一、Q-Learning算法简介

下面仅对Q-Learning算法对简单介绍

Q学习是一种异策略(off-policy)算法。

异策略在学习的过程中,有两种不同的策略:目标策略(target policy)和行为策略(behavior policy)。

目标策略就是我们需要去学习的策略,相当于后方指挥的军师,它不需要直接与环境进行交互

行为策略是探索环境的策略,负责与环境交互,然后将采集的轨迹数据送给目标策略进行学习,而且为送给目标策略的数据中不需要 a t + 1 a_{t+1} at+1,而Sarsa是要有 a t + 1 a_{t+1} at+1的。

Q学习不会管我们下一步去往哪里探索,它只选取奖励最大的策略

1.1 更新公式

Q-Learning的更新公式

Q ( s t , a t ) ← Q ( s t , a t ) + α [ r t + 1 + γ max ⁡ a Q ( s t + 1 , a ) − Q ( s t , a t ) ] Q\left(s_t, a_t\right) \leftarrow Q\left(s_t, a_t\right)+\alpha\left[r_{t+1}+\gamma \max _a Q\left(s_{t+1}, a\right)-Q\left(s_t, a_t\right)\right] Q(st,at)Q(st,at)+α[rt+1+γamaxQ(st+1,a)Q(st,at)]

1.2 预测策略

Q-Learning算法采用 ε \varepsilon ε-贪心搜索的策略(和Sarsa算法一样)

1.3 详细资料

关于更加详细的Q-Learning算法的介绍,请看我之前发的博客:【EasyRL学习笔记】第三章 表格型方法(Q-Table、Sarsa、Q-Learning)

在学习Q-Learning算法前你最好能了解以下知识点:

  • 时序差分方法
  • ε \varepsilon ε-贪心搜索策略
  • Q-Table

二、Python代码实战

2.1 运行前配置

准备好一个RL_Utils.py文件,文件内容可以从我的一篇里博客获取:【RL工具类】强化学习常用函数工具类(Python代码)

这一步很重要,后面需要引入该RL_Utils.py文件

在这里插入图片描述

2.2 主要代码

import argparse
import datetime
import math
import os
import time
import turtle
from collections import defaultdict
import dill
import gym
# 这里需要改成自己的RL_Utils.py文件的路径
from Python.ReinforcementLearning.EasyRL.RL_Utils import *


# 悬崖行走地图
class CliffWalkingWapper(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.t = None
        self.unit = 50
        self.max_x = 12
        self.max_y = 4

    def draw_x_line(self, y, x0, x1, color='gray'):
        assert x1 > x0
        self.t.color(color)
        self.t.setheading(0)
        self.t.up()
        self.t.goto(x0, y)
        self.t.down()
        self.t.forward(x1 - x0)

    def draw_y_line(self, x, y0, y1, color='gray'):
        assert y1 > y0
        self.t.color(color)
        self.t.setheading(90)
        self.t.up()
        self.t.goto(x, y0)
        self.t.down()
        self.t.forward(y1 - y0)

    def draw_box(self, x, y, fillcolor='', line_color='gray'):
        self.t.up()
        self.t.goto(x * self.unit, y * self.unit)
        self.t.color(line_color)
        self.t.fillcolor(fillcolor)
        self.t.setheading(90)
        self.t.down()
        self.t.begin_fill()
        for i in range(4):
            self.t.forward(self.unit)
            self.t.right(90)
        self.t.end_fill()

    def move_player(self, x, y):
        self.t.up()
        self.t.setheading(90)
        self.t.fillcolor('red')
        self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)

    def render(self):
        if self.t == None:
            self.t = turtle.Turtle()
            self.wn = turtle.Screen()
            self.wn.setup(self.unit * self.max_x + 100,
                          self.unit * self.max_y + 100)
            self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
                                        self.unit * self.max_y)
            self.t.shape('circle')
            self.t.width(2)
            self.t.speed(0)
            self.t.color('gray')
            for _ in range(2):
                self.t.forward(self.max_x * self.unit)
                self.t.left(90)
                self.t.forward(self.max_y * self.unit)
                self.t.left(90)
            for i in range(1, self.max_y):
                self.draw_x_line(
                    y=i * self.unit, x0=0, x1=self.max_x * self.unit)
            for i in range(1, self.max_x):
                self.draw_y_line(
                    x=i * self.unit, y0=0, y1=self.max_y * self.unit)

            for i in range(1, self.max_x - 1):
                self.draw_box(i, 0, 'black')
            self.draw_box(self.max_x - 1, 0, 'yellow')
            self.t.shape('turtle')

        x_pos = self.s % self.max_x
        y_pos = self.max_y - 1 - int(self.s / self.max_x)
        self.move_player(x_pos, y_pos)


# Q_Learning智能体对象
class Q_Learning:
    def __init__(self, arg_dict):
        # 采样次数
        self.sample_count = 0
        # 动作数
        self.n_actions = arg_dict['n_actions']
        # 学习率
        self.lr = arg_dict['lr']
        # 未来奖励衰减系数
        self.gamma = arg_dict['gamma']
        # 当前的epsilon值
        self.epsilon = arg_dict['epsilon_start']
        # 初始的epsilon值
        self.epsilon_start = arg_dict['epsilon_start']
        # 最后的epsilon值
        self.epsilon_end = arg_dict['epsilon_end']
        # epsilon衰变参数
        self.epsilon_decay = arg_dict['epsilon_decay']
        # 使用嵌套字典表示Q(s,a),这里首先将所有Q(s、a)设置为0
        self.Q_table = defaultdict(lambda: np.zeros(self.n_actions))

    # 训练过程: 用e-greedy policy获取行动
    def sample_action(self, state):
        # 采样数更新
        self.sample_count += 1
        # 计算当前epsilon值
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                       math.exp(-1. * self.sample_count / self.epsilon_decay)
        # 根据均匀分布获取一个0-1的随机值,如果随机值大于当前epsilon,则按照最大Q值来选择动作,否则随机选择一个动作
        return np.argmax(self.Q_table[str(state)]) if np.random.uniform(0, 1) > self.epsilon else np.random.choice(
            self.n_actions)

    # 测试过程: 用最大Q值获取行动
    def predict_action(self, state):
        return np.argmax(self.Q_table[str(state)])

    # 更新Q表格
    def update(self, state, action, reward, next_state, done):
        # 计算Q估计
        Q_predict = self.Q_table[str(state)][action]
        # 计算Q现实
        if done:
            # 如果回合结束,则直接等于当前奖励
            Q_target = reward
        else:
            # 如果回合每结束,则按照
            Q_target = reward + self.gamma * np.max(self.Q_table[str(next_state)])
        # 根据Q估计和Q现实,差分地更新Q表格
        self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)

    # 保存模型
    def save_model(self, path):
        # 如果路径不存在,就自动创建
        Path(path).mkdir(parents=True, exist_ok=True)
        torch.save(
            obj=self.Q_table,
            f=path + "checkpoint.pkl",
            pickle_module=dill
        )

    # 加载模型
    def load_model(self, path):
        self.Q_table = torch.load(f=path + 'checkpoint.pkl', pickle_module=dill)


# 训练函数
def train(arg_dict, env, agent):
    # 开始计时
    startTime = time.time()
    print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")
    print("开始训练智能体......")
    # 记录每个epoch的奖励
    rewards = []
    # 记录每个epoch的智能体到达终点用的步数
    steps = []
    for epoch in range(arg_dict['train_eps']):
        # 每个epoch的总奖励
        ep_reward = 0
        # 每个epoch的步数记录器
        ep_step = 0
        # 重置环境,并获取初始状态
        state = env.reset()
        while True:
            # 画图
            if arg_dict['train_render']:
                env.render()
            # 根据e-贪心策略获取动作
            action = agent.sample_action(state)
            # 执行动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境
            next_state, reward, done, _ = env.step(action)
            # 智能体更新,根据当前状态和动作、下一个状态和奖励,改进Q函数
            agent.update(state, action, reward, next_state, done)
            # 更新当前状态为下一时刻状态
            state = next_state
            # 累加记录奖励
            ep_reward += reward
            # 步数+1
            ep_step += 1
            # 如果当前回合结束,则跳出循环
            if done:
                break
        # 记录奖励、步数信息
        rewards.append(ep_reward)
        steps.append(ep_step)
        # 每隔10次迭代就输出一次
        if (epoch + 1) % 10 == 0:
            print(
                f'Epoch: {epoch + 1}/{arg_dict["train_eps"]}, Reward: {ep_reward:.2f}, Steps:{ep_step}, Epislon: {agent.epsilon:.3f}')
    print("智能体训练结束 , 用时: " + str(time.time() - startTime) + " s")
    return {'epochs': range(len(rewards)), 'rewards': rewards, 'steps': steps}


# 测试函数
def test(arg_dict, env, agent):
    startTime = time.time()
    print("开始测试智能体......")
    print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")
    # 记录每个epoch的奖励
    rewards = []
    # 记录每个epoch的智能体到达终点用的步数
    steps = []
    for epoch in range(arg_dict['test_eps']):
        # 每个epoch的总奖励
        ep_reward = 0
        # 每个epoch的步数记录器
        ep_step = 0
        # 重置环境,并获取初始状态
        state = env.reset()
        while True:
            # 画图
            if arg_dict['test_render']:
                env.render()
            # 根据最大Q值选择动作
            action = agent.predict_action(state)
            # 执行动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境
            next_state, reward, done, _ = env.step(action)
            # 更新当前状态为下一时刻状态
            state = next_state
            # 累加记录奖励
            ep_reward += reward
            # 步数+1
            ep_step += 1
            # 如果当前回合结束,则跳出循环
            if done:
                break
        # 记录奖励、步数信息
        rewards.append(ep_reward)
        steps.append(ep_step)
        # 输出测试信息
        print(f"Epochs: {epoch + 1}/{arg_dict['test_eps']}, Steps:{ep_step}, Reward: {ep_reward:.2f}")
    print("测试结束 , 用时: " + str(time.time() - startTime) + " s")
    return {'episodes': range(len(rewards)), 'rewards': rewards, 'steps': steps}


# 创建环境和智能体
def create_env_agent(arg_dict):
    # 创建环境
    env = gym.make(arg_dict['env_name'])
    env = CliffWalkingWapper(env)
    # 设置随机种子
    all_seed(env, seed=arg_dict["seed"])
    # 获取状态数
    try:
        n_states = env.observation_space.n
    except AttributeError:
        n_states = env.observation_space.shape[0]
    # 获取动作数
    n_actions = env.action_space.n
    print(f"状态数: {n_states}, 动作数: {n_actions}")
    # 将状态数和动作数加入算法参数字典
    arg_dict.update({"n_states": n_states, "n_actions": n_actions})
    # 实例化智能体对象
    agent = Q_Learning(arg_dict)
    # 返回环境,智能体
    return env, agent


if __name__ == '__main__':
    # 防止报错 OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.
    os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
    # 获取当前路径
    curr_path = os.path.dirname(os.path.abspath(__file__))
    # 获取当前时间
    curr_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
    # 相关参数设置
    parser = argparse.ArgumentParser(description="hyper parameters")
    parser.add_argument('--algo_name', default='Q-learning', type=str, help="name of algorithm")
    parser.add_argument('--env_name', default='CliffWalking-v0', type=str, help="name of environment")
    parser.add_argument('--train_eps', default=400, type=int, help="episodes of training")
    parser.add_argument('--test_eps', default=20, type=int, help="episodes of testing")
    parser.add_argument('--gamma', default=0.90, type=float, help="discounted factor")
    parser.add_argument('--epsilon_start', default=0.95, type=float, help="initial value of epsilon")
    parser.add_argument('--epsilon_end', default=0.01, type=float, help="final value of epsilon")
    parser.add_argument('--epsilon_decay', default=300, type=int, help="decay rate of epsilon")
    parser.add_argument('--lr', default=0.1, type=float, help="learning rate")
    parser.add_argument('--device', default='cpu', type=str, help="cpu or cuda")
    parser.add_argument('--seed', default=520, type=int, help="seed")
    parser.add_argument('--show_fig', default=False, type=bool, help="if show figure or not")
    parser.add_argument('--save_fig', default=True, type=bool, help="if save figure or not")
    parser.add_argument('--train_render', default=False, type=bool,
                        help="Whether to render the environment during training")
    parser.add_argument('--test_render', default=True, type=bool,
                        help="Whether to render the environment during testing")
    args = parser.parse_args()
    default_args = {'result_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/results/",
                    'model_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/models/",
                    }
    # 将参数转化为字典 type(dict)
    arg_dict = {**vars(args), **default_args}
    print("算法参数字典:", arg_dict)

    # 创建环境和智能体
    env, agent = create_env_agent(arg_dict)
    # 传入算法参数、环境、智能体,然后开始训练
    res_dic = train(arg_dict, env, agent)
    print("算法返回结果字典:", res_dic)
    # 保存相关信息
    agent.save_model(path=arg_dict['model_path'])
    save_args(arg_dict, path=arg_dict['result_path'])
    save_results(res_dic, tag='train', path=arg_dict['result_path'])
    plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="train")

    # =================================================================================================
    # 创建新环境和智能体用来测试
    print("=" * 300)
    env, agent = create_env_agent(arg_dict)
    # 加载已保存的智能体
    agent.load_model(path=arg_dict['model_path'])
    res_dic = test(arg_dict, env, agent)
    save_results(res_dic, tag='test', path=arg_dict['result_path'])
    plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="test")

2.3 运行结果展示

由于有些输出太长,下面只展示部分输出

状态数: 48, 动作数: 4
环境名: CliffWalking-v0, 算法名: Q-learning, Device: cpu
开始训练智能体......
Epoch: 10/400, Reward: -182.00, Steps:182, Epislon: 0.010
Epoch: 20/400, Reward: -63.00, Steps:63, Epislon: 0.010
Epoch: 30/400, Reward: -48.00, Steps:48, Epislon: 0.010
Epoch: 40/400, Reward: -54.00, Steps:54, Epislon: 0.010
Epoch: 50/400, Reward: -51.00, Steps:51, Epislon: 0.010
Epoch: 60/400, Reward: -194.00, Steps:95, Epislon: 0.010
Epoch: 70/400, Reward: -46.00, Steps:46, Epislon: 0.010
Epoch: 80/400, Reward: -57.00, Steps:57, Epislon: 0.010
Epoch: 90/400, Reward: -37.00, Steps:37, Epislon: 0.010
Epoch: 100/400, Reward: -68.00, Steps:68, Epislon: 0.010
Epoch: 110/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 120/400, Reward: -47.00, Steps:47, Epislon: 0.010
Epoch: 130/400, Reward: -23.00, Steps:23, Epislon: 0.010
Epoch: 140/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 150/400, Reward: -53.00, Steps:53, Epislon: 0.010
Epoch: 160/400, Reward: -42.00, Steps:42, Epislon: 0.010
Epoch: 170/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 180/400, Reward: -28.00, Steps:28, Epislon: 0.010
Epoch: 190/400, Reward: -22.00, Steps:22, Epislon: 0.010
Epoch: 200/400, Reward: -14.00, Steps:14, Epislon: 0.010
Epoch: 210/400, Reward: -26.00, Steps:26, Epislon: 0.010
Epoch: 220/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 230/400, Reward: -28.00, Steps:28, Epislon: 0.010
Epoch: 240/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 250/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 260/400, Reward: -25.00, Steps:25, Epislon: 0.010
Epoch: 270/400, Reward: -18.00, Steps:18, Epislon: 0.010
Epoch: 280/400, Reward: -113.00, Steps:14, Epislon: 0.010
Epoch: 290/400, Reward: -14.00, Steps:14, Epislon: 0.010
Epoch: 300/400, Reward: -127.00, Steps:28, Epislon: 0.010
Epoch: 310/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 320/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 330/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 340/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 350/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 360/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 370/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 380/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 390/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 400/400, Reward: -13.00, Steps:13, Epislon: 0.010
智能体训练结束 , 用时: 0.38664937019348145 s
============================================================================================================================================================================================================================================================================================================
状态数: 48, 动作数: 4
开始测试智能体......
环境名: CliffWalking-v0, 算法名: Q-learning, Device: cpu
Epochs: 1/20, Steps:13, Reward: -13.00
Epochs: 2/20, Steps:13, Reward: -13.00
Epochs: 3/20, Steps:13, Reward: -13.00
Epochs: 4/20, Steps:13, Reward: -13.00
Epochs: 5/20, Steps:13, Reward: -13.00
Epochs: 6/20, Steps:13, Reward: -13.00
Epochs: 7/20, Steps:13, Reward: -13.00
Epochs: 8/20, Steps:13, Reward: -13.00
Epochs: 9/20, Steps:13, Reward: -13.00
Epochs: 10/20, Steps:13, Reward: -13.00
Epochs: 11/20, Steps:13, Reward: -13.00
Epochs: 12/20, Steps:13, Reward: -13.00
Epochs: 13/20, Steps:13, Reward: -13.00
Epochs: 14/20, Steps:13, Reward: -13.00
Epochs: 15/20, Steps:13, Reward: -13.00
Epochs: 16/20, Steps:13, Reward: -13.00
Epochs: 17/20, Steps:13, Reward: -13.00
Epochs: 18/20, Steps:13, Reward: -13.00
Epochs: 19/20, Steps:13, Reward: -13.00
Epochs: 20/20, Steps:13, Reward: -13.00
测试结束 , 用时: 16.472306728363037 s

奖励曲线:

在这里插入图片描述

测试阶段的寻路过程可视化

在这里插入图片描述

可以看出Q-Learning算法相比Sarsa算法而言就比较激进了,Sarsa算法只敢离悬崖远一点走,而Q-Learning算法却敢贴着悬崖边走,说明了off-policy在探险方面更具优势,具有更强的探险能力。

2.4 关于可视化寻路过程的设置

如果你觉得寻路过程可视化比较耗时,你可以进行设置,取消可视化。
或者你想看看训练过程的可视化,也可以进行相关设置(当然我看过了,前期很无聊,智能体一直在乱走)

在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐