参考用keras搭建DQN莫凡的git

#RL_brain_diy.py
from tensorflow import keras
import numpy as np,time

class DeepQNetwork:
    def __init__(
            self,
            n_actions,
            n_features,
            is_train=True,
            load_model=True,
            learning_rate = .01,
            reward_decay = 0.9,
            e_greedy = 0.9,
            replace_target_iter = 300,
            memory_size = 500,
            batch_size = 32,

            # 第一次运行设置1e-3,之后设置None
            e_greedy_increment = None,
            # e_greedy_increment = 1e-3,
            output_graph = False,
            first_layer_neurno = 8,
            second_layer_neurno = 1):
        self.n_actions=n_actions
        self.n_features=n_features
        self.lr = learning_rate
        self.gamma = reward_decay

        self.replace_target_iter=replace_target_iter
        self.learn_step_counter=1

        self.memory_size = memory_size
        self.memory=np.zeros((memory_size,2+2*n_features))
        self.memory_count=0

        self.batch_size = batch_size
        self.epsilon_max = e_greedy
        self.epsilon = 0 if e_greedy_increment is not None else e_greedy
        self.epsilon_increment=e_greedy_increment
        print(self.epsilon,self.epsilon_increment)

        self.output_graph = output_graph
        self.output_1=first_layer_neurno
        self.output_2=second_layer_neurno
        self.is_train = is_train
        self._build_net()

        if load_model:self.load_model()


    def _build_net(self):
        self.model_eval=keras.Sequential()
        d1=keras.layers.Dense(self.output_1,
                            input_shape=(self.n_features,),
                            activation='relu')
        self.model_eval.add(d1)
        d2 = keras.layers.Dense(self.n_actions)
        self.model_eval.add(d2)

        optimizer=keras.optimizers.Adam(learning_rate=3e-3)
        # optimizer =keras.optimizers.RMSprop(lr=self.lr, rho=0.9, epsilon=1e-08, decay=0.0)
        # optimizer =keras.optimizers.RMSprop(lr=self.lr)
        self.model_eval.compile(loss='mse',optimizer=optimizer)


        self.model_target = keras.Sequential()
        d1=keras.layers.Dense(self.output_1,
                            input_shape=(self.n_features,),
                            activation='relu')
        self.model_target.add(d1)
        d2 = keras.layers.Dense(self.n_actions)
        self.model_target.add(d2)


    def store_transition(self,observation, action, reward, observation_):
        index=self.memory_count%self.memory_size
        data=np.hstack((observation, action, reward, observation_))
        self.memory[index]=data
        self.memory_count += 1

    def choose_action(self,observation):
        if np.random.uniform()<self.epsilon:
            observation = observation[np.newaxis, :]
            actions_value=self.model_eval.predict(observation)

            # action=np.argmax(actions_value)
            # 如果最大值有多个,从中随机取
            action=np.random.choice(np.where(actions_value == np.max(actions_value))[1])
            print(actions_value, action, observation)
        else:
            action = np.random.randint(0,self.n_actions)
            print('random>>', action, observation)
        return action

    def learn(self):
        if self.learn_step_counter%self.replace_target_iter==0:
            self.model_target.set_weights(self.model_eval.get_weights())
            # print('\t target_params_replaced\n')

        sample_index=np.random.choice(min(self.memory_count,self.memory_size),self.batch_size)
        batch=self.memory[sample_index]
        o_s=batch[:,:self.n_features]
        a_s=batch[:,self.n_features].astype(int)
        r_s=batch[:,1+self.n_features]
        o_s_=batch[:,-self.n_features:]

        q_next=self.model_target.predict(o_s_ ,batch_size=self.batch_size)
        q_eval = self.model_eval.predict(o_s,batch_size=self.batch_size)
        # q_target = q_eval.copy()
        q_target = q_eval
        target_part=r_s+self.gamma * np.max(q_next,axis=1)

        q_target[range(self.batch_size),a_s]=target_part
        self.model_eval.train_on_batch(o_s,q_target)

        if self.epsilon < self.epsilon_max:
            self.epsilon +=self.epsilon_increment

        self.learn_step_counter+=1

    def plot_cost(self):
        pass

    def save_model(self):
        self.model_eval.save_weights('model_eval.h5')
        self.model_target.save_weights('model_target.h5')

    def load_model(self):
        try:
            self.model_eval.load_weights('model_eval.h5')
            self.model_target.load_weights('model_target.h5')
        except Exception:
            print('没找到模型,不加载')
#run_this.py
from dqn.day2.maze_env import Maze
# from dqn.day2.RL_brain import DeepQNetwork
# from dqn.day2.RL_brain_Keras import DeepQNetwork
from dqn.day2.RL_brain_diy import DeepQNetwork
import numpy as np


def run_maze():
    step = 0
    success,acc=0,.01
    for episode in range(100):
        # initial observation
        observation = env.reset()
        if episode>0:
            acc=success/episode
            print('episode:',episode,'success:',acc)
        while True:
            # fresh env
            env.render()

            # RL choose action based on observation
            action = RL.choose_action(observation)

            # RL take action and get next observation and reward
            observation_, reward, done = env.step(action)
            if reward==1:success+=1
            RL.store_transition(observation, action, reward, observation_)

            if (step > 20) and (step % 5 == 0):
                RL.learn()
            # swap observation
            observation = observation_

            # break while loop when end of this episode
            if done:
                break
            step += 1

    # end of game
    print('game over')
    env.destroy()


if __name__ == "__main__":
    # maze game
    env = Maze()
    RL = DeepQNetwork(env.n_actions, env.n_features,
                      learning_rate=0.01,
                      reward_decay=0.9,
                      e_greedy=0.9,
                      replace_target_iter=200,
                      memory_size=2000,
                      # output_graph=True
                      )
    env.after(100, run_maze)
    env.mainloop()

    RL.save_model()
    RL.plot_cost()

    for i in range(4):
        for j in range(4):
            x=.25*(i-2)
            y=.25*(j-2)
            z=RL.model_eval.predict(np.array((x,y)).reshape(1,-1))
            print(x,y,z)
#maze_env.py
import numpy as np
import time
import sys
if sys.version_info.major == 2:
    import Tkinter as tk
else:
    import tkinter as tk

UNIT = 40   # pixels
MAZE_H = 4  # grid height
MAZE_W = 4  # grid width


class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)
        self.n_features = 2
        self.title('maze')
        self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
        self._build_maze()

    def _build_maze(self):
        self.canvas = tk.Canvas(self, bg='white',
                           height=MAZE_H * UNIT,
                           width=MAZE_W * UNIT)

        # create grids
        for c in range(0, MAZE_W * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
            self.canvas.create_line(x0, y0, x1, y1)
        for r in range(0, MAZE_H * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)

        # create origin
        origin = np.array([20, 20])

        # hell
        hell1_center = origin + np.array([UNIT * 2, UNIT])
        self.hell1 = self.canvas.create_rectangle(
            hell1_center[0] - 15, hell1_center[1] - 15,
            hell1_center[0] + 15, hell1_center[1] + 15,
            fill='black')
        # hell
        # hell2_center = origin + np.array([UNIT, UNIT * 2])
        # self.hell2 = self.canvas.create_rectangle(
        #     hell2_center[0] - 15, hell2_center[1] - 15,
        #     hell2_center[0] + 15, hell2_center[1] + 15,
        #     fill='black')

        # create oval
        oval_center = origin + UNIT * 2
        self.oval = self.canvas.create_oval(
            oval_center[0] - 15, oval_center[1] - 15,
            oval_center[0] + 15, oval_center[1] + 15,
            fill='yellow')

        # create red rect
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')

        # pack all
        self.canvas.pack()

    def reset(self):
        self.update()
        time.sleep(0.1)
        self.canvas.delete(self.rect)
        origin = np.array([20, 20])
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')
        # return observation
        return (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)

    def step(self, action):
        s = self.canvas.coords(self.rect)
        base_action = np.array([0, 0])
        if action == 0:   # up
            if s[1] > UNIT:
                base_action[1] -= UNIT
        elif action == 1:   # down
            if s[1] < (MAZE_H - 1) * UNIT:
                base_action[1] += UNIT
        elif action == 3:   # right
            if s[0] < (MAZE_W - 1) * UNIT:
                base_action[0] += UNIT
        elif action == 2:   # left
            if s[0] > UNIT:
                base_action[0] -= UNIT

        self.canvas.move(self.rect, base_action[0], base_action[1])  # move agent

        next_coords = self.canvas.coords(self.rect)  # next state

        # reward function
        if next_coords == self.canvas.coords(self.oval):
            reward = 1
            done = True
        elif next_coords in [self.canvas.coords(self.hell1)]:
            reward = -1
            done = True
        else:
            reward = 0
            done = False
        s_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)
        return s_, reward, done

    def render(self):
        # time.sleep(0.01)
        self.update()
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐