ロボットを強化学習DQN (Deep Q Network) させる

今回は強化学習DQN (Deep Q Network) を↓で学ばせてもらいました
超シンプルにTensorFlowでDQN (Deep Q Network) を実装してみる 〜導入編〜  | ALGO GEEKS
それで、もう少しどういうものかを見るために自分なりに少し変えてみました。

システムとしては、ロボットがあり、あるターゲットに近づくことができればいいといった感じです。
参考ソースとの差は、
・ロボットを上下左右に移動させる
・ターゲットは初期にランダム配置
・explorationを1.0から始め、学習ごとに減らしていく
・ロボットとターゲットの距離が1になったところで報酬=1
・任意の時間以内にターゲットに近づけなければ報酬=-1

以下15000回学習させた結果↓
青丸:ターゲット
三角:ロボット
f:id:weekendproject9:20171129002539g:plain

dqn_agent.py

from collections import deque
import os

import numpy as np
import tensorflow as tf


class DQNAgent:
    """
    Multi Layer Perceptron with Experience Replay
    """

    def __init__(self, enable_actions, environment_name):
        # parameters
        self.name = "agent"#os.path.splitext(os.path.basename(__file__))[0]
        self.environment_name = environment_name
        self.enable_actions = enable_actions
        self.n_actions = len(self.enable_actions)
        self.minibatch_size = 32
        self.replay_memory_size = 1000
        self.learning_rate = 0.001
        self.discount_factor = 0.9
        self.exploration = 1.0#どのくらいで行動に出るか、大きいと活発に動く 
        self.model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models")
        self.model_name = "{}.ckpt".format(self.environment_name)

        # replay memory
        self.D = deque(maxlen=self.replay_memory_size)

        # model
        self.init_model()

        # variables
        self.current_loss = 0.0

    def init_model(self):
        # input layer (8 x 8)
        #self.x = tf.placeholder(tf.float32, [None, 64, 64])
        self.x = tf.placeholder(tf.float32, [None, 8, 8])

        # flatten (64)
        #x_flat = tf.reshape(self.x, [-1, 512])
        x_flat = tf.reshape(self.x, [-1, 64])

        # fully connected layer (32)
        #W_fc1 = tf.Variable(tf.truncated_normal([512, 512], stddev=0.01))#[64, 64]
        W_fc1 = tf.Variable(tf.truncated_normal([64, 64], stddev=0.01))#[64, 64]
        #b_fc1 = tf.Variable(tf.zeros([512]))#64
        b_fc1 = tf.Variable(tf.zeros([64]))#64
        h_fc1 = tf.nn.relu(tf.matmul(x_flat, W_fc1) + b_fc1)

        # output layer (n_actions)
        #W_out = tf.Variable(tf.truncated_normal([512, self.n_actions], stddev=0.01))
        W_out = tf.Variable(tf.truncated_normal([64, self.n_actions], stddev=0.01))
        b_out = tf.Variable(tf.zeros([self.n_actions]))
        self.y = tf.matmul(h_fc1, W_out) + b_out

        # loss function
        self.y_ = tf.placeholder(tf.float32, [None, self.n_actions])
        self.loss = tf.reduce_mean(tf.square(self.y_ - self.y))

        # train operation
        optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
        self.training = optimizer.minimize(self.loss)

        # saver
        self.saver = tf.train.Saver()

        # session
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())

    def Q_values(self, state):
        # Q(state, action) of all actions
        return self.sess.run(self.y, feed_dict={self.x: [state]})[0]

    def select_action(self, state, epsilon):
        if np.random.rand() <= epsilon:
            # random
            return np.random.choice(self.enable_actions)
        else:
            # max_action Q(state, action)
            return self.enable_actions[np.argmax(self.Q_values(state))]

    def store_experience(self, state, action, reward, state_1, terminal):
        self.D.append((state, action, reward, state_1, terminal))

    def experience_replay(self):
        state_minibatch = []
        y_minibatch = []

        # sample random minibatch
        minibatch_size = min(len(self.D), self.minibatch_size)
        minibatch_indexes = np.random.randint(0, len(self.D), minibatch_size)

        for j in minibatch_indexes:
            state_j, action_j, reward_j, state_j_1, terminal = self.D[j]
            action_j_index = self.enable_actions.index(action_j)

            y_j = self.Q_values(state_j)

            if terminal:
                y_j[action_j_index] = reward_j
            else:
                # reward_j + gamma * max_action' Q(state', action')
                y_j[action_j_index] = reward_j + self.discount_factor * np.max(self.Q_values(state_j_1))  # NOQA

            state_minibatch.append(state_j)
            y_minibatch.append(y_j)

        # training
        self.sess.run(self.training, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})

        # for log
        self.current_loss = self.sess.run(self.loss, feed_dict={self.x: state_minibatch, self.y_: y_minibatch})

    def load_model(self, model_path=None):
        if model_path:
            # load from model_path
            print(" model_path:", model_path)
            self.saver.restore(self.sess, model_path)
        else:
            # load from checkpoint
            checkpoint = tf.train.get_checkpoint_state(self.model_dir)
            if checkpoint and checkpoint.model_checkpoint_path:
                self.saver.restore(self.sess, checkpoint.model_checkpoint_path)
            print(" model_dir:", checkpoint.model_checkpoint_path)

    def save_model(self):
        self.saver.save(self.sess, os.path.join(self.model_dir, self.model_name))
robot.py

import os
import numpy as np
import matplotlib.pyplot as plt
import math
import copy

class robot:
    def __init__(self):
        fig, ax = plt.subplots(1, 1)

        # parameters
        self.name = os.path.splitext(os.path.basename(__file__))[0]
        hit = 0
        self.fieldX = 8
        self.fieldY = 8
        self.robotX =0
        self.robotY =0
        self.initrobotPositionX = 4
        self.initrobotPositionY = 4
        self.robotMove = 1

        self.enable_actions = (0, 1, 2,3,4)
        self.frame_rate = 15

        self.targetNum = 1
        self.hit = 0
        self.time = 0

        self.around = 1

        # variables
        self.reset()

    def update(self, action):
        """
        action:
            0: do nothing
            1: move up
            2: move right
            3: move down
            4: move left
        """
        # update player position
        if action == self.enable_actions[1]:
            # move up
            self.robotY = max(0, self.robotY - self.robotMove)
        elif action == self.enable_actions[2]:
            # move right
            self.robotX = min(self.robotX + self.robotMove , self.fieldX-1)
        elif action == self.enable_actions[3]:
            # move down
            self.robotY = min(self.fieldY-1, self.robotY + self.robotMove)
        elif action == self.enable_actions[4]:
            # move left
            self.robotX = max(0, self.robotX - self.robotMove)
        else:
            # do nothing
            pass

        # update 
        self.hit = self.targetUpdate(self.initrobotPositionX ,self.initrobotPositionY)


           

        # collision detection
        #reward reset
        self.reward =0
        # end flag
        self.terminal = False
        if(self.time>=5 or self.hit):
         self.terminal = True
         if self.hit:
             # hit
             self.reward = 1
         else:
             # no hit             
             self.reward = -1

    def targetUpdate(self,X,Y):
        i = 0
        self.time+=1
        print("time:",self.time)

        for i in range(self.targetNum):
            #self.rand =np.random.randint(1, 4, self.targetNum)            
            #if self.rand == 1:
            #  self.fannelPointY =  max(0, self.fannelPointY-1)
            #if self.rand == 2:
            #  self.fannelPointY =  min(self.fieldY-1, self.fannelPointY+1)
            #if self.rand == 3:
            #    self.fannelPointX =max(0,   self.fannelPointX-1)
            #if self.rand == 4:
            #    self.fannelPointX = min(self.fannelPointX+1,self.fieldX-1)


            if self.robotX - self.around <= self.fannelPointX and  self.robotX + self.around >= self.fannelPointX:
                if self.robotY - self.around <= self.fannelPointY and self.robotY+self.around >= self.fannelPointY:
                    self.hit = 1

            print("hit:" , self.hit)


            plt.clf()

            plt.plot(-2, -2, "ro")
            plt.plot(10, 10, "ro")            
            plt.plot(self.robotX, self.robotY, c='red',marker='^')
            plt.plot(self.fannelPointX, self.fannelPointY, c='blue',marker='o')
           
            plt.pause(1)

        return self.hit

      

    def draw(self):
        self.screen = np.zeros((self.fieldX, self.fieldY))
        print("robot_position:",self.robotX , "," , self.robotY )
        self.screen[self.robotX, self.robotY] = 1
        print("target_position",self.fannelPointX , "," , self.fannelPointY )

        self.screen[self.fannelPointX, self.fannelPointY] = 2
        print( self.screen)
        print("---")

    def observe(self):
        self.draw()
        return self.screen, self.reward, self.terminal

    def execute_action(self, action):
        self.update(action)

    def reset(self):
        print("robot_init")
        self.hit=0

        # reset other variables
        self.reward = 0
        self.terminal = False

        self.robotX = self.initrobotPositionX
        self.robotY = self.initrobotPositionY
        self.x =np.random.randint(0, self.fieldX, self.targetNum)
        self.y =np.random.randint(0, self.fieldY, self.targetNum)
        rand =np.random.randint(0,(self.fieldX) , self.targetNum)
        i=0

        self.fannelPointX = self.x[i]
        self.fannelPointY = self.y[i]

        if self.robotX - self.around <= self.fannelPointX and  self.robotX + self.around >= self.fannelPointX:
            if self.robotY - self.around <= self.fannelPointY and self.robotY+self.around >= self.fannelPointY:
                if rand >= self.fieldX/2:
                    self.fannelPointX = 0
                    self.fannelPointY = 0
                if rand <= self.fieldX/2:
                    self.fannelPointX = self.fieldX-1
                    self.fannelPointY = self.fieldY-1

        self.time=0

train.py

import numpy as np

from robot import robot
from dqn_agent import DQNAgent


def train():
    # parameters
    n_epochs = 15000

    # environment, agent
    env = robot()
    agent = DQNAgent(env.enable_actions, env.name)

    # variables
    win = 0

    for e in range(n_epochs):
        # reset
        frame = 0
        loss = 0.0
        Q_max = 0.0
        env.reset()
        state_t_1, reward_t, terminal = env.observe()

        while not terminal:
            state_t = state_t_1

            # execute action in environment
            action_t = agent.select_action(state_t, agent.exploration)
            env.execute_action(action_t)

            # observe environment
            state_t_1, reward_t, terminal = env.observe()

            # store experience
            agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)

            # experience replay
            agent.experience_replay()

            # for log
            frame += 1
            loss += agent.current_loss
            Q_max += np.max(agent.Q_values(state_t))
            if reward_t == 1:
                win += 1

        print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(
            e, n_epochs - 1, win, loss / frame, Q_max / frame))

        agent.exploration = agent.exploration - (( 1.0-0.1)/n_epochs)

        if(n_epochs % 1000):
         agent.save_model()

    # save model
    agent.save_model()

 
今後はこうしていきたいですね↓
f:id:weekendproject9:20171129002636p:plain

[参考]
超シンプルにTensorFlowでDQN (Deep Q Network) を実装してみる 〜導入編〜  | ALGO GEEKS
[Python]強化学習(DQN)を実装しながらKerasに慣れる - Qiita
DQNをKerasとTensorFlowとOpenAI Gymで実装する