ロボットを強化学習DQN (Deep Q Network) させる
今回は強化学習DQN (Deep Q Network) を↓で学ばせてもらいました
超シンプルにTensorFlowでDQN (Deep Q Network) を実装してみる 〜導入編〜 | ALGO GEEKS
それで、もう少しどういうものかを見るために自分なりに少し変えてみました。
システムとしては、ロボットがあり、あるターゲットに近づくことができればいいといった感じです。
参考ソースとの差は、
・ロボットを上下左右に移動させる
・ターゲットは初期にランダム配置
・explorationを1.0から始め、学習ごとに減らしていく
・ロボットとターゲットの距離が1になったところで報酬=1
・任意の時間以内にターゲットに近づけなければ報酬=-1
以下15000回学習させた結果↓
青丸:ターゲット
三角:ロボット
dqn_agent.py from collections import deque import os import numpy as np import tensorflow as tf class DQNAgent: """ Multi Layer Perceptron with Experience Replay """ def __init__(self, enable_actions, environment_name): # parameters self.name = "agent"#os.path.splitext(os.path.basename(__file__))[0] self.environment_name = environment_name self.enable_actions = enable_actions self.n_actions = len(self.enable_actions) self.minibatch_size = 32 self.replay_memory_size = 1000 self.learning_rate = 0.001 self.discount_factor = 0.9 self.exploration = 1.0#どのくらいで行動に出るか、大きいと活発に動く self.model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models") self.model_name = "{}.ckpt".format(self.environment_name) # replay memory self.D = deque(maxlen=self.replay_memory_size) # model self.init_model() # variables self.current_loss = 0.0 def init_model(self): # input layer (8 x 8) #self.x = tf.placeholder(tf.float32, [None, 64, 64]) self.x = tf.placeholder(tf.float32, [None, 8, 8]) # flatten (64) #x_flat = tf.reshape(self.x, [-1, 512]) x_flat = tf.reshape(self.x, [-1, 64]) # fully connected layer (32) #W_fc1 = tf.Variable(tf.truncated_normal([512, 512], stddev=0.01))#[64, 64] W_fc1 = tf.Variable(tf.truncated_normal([64, 64], stddev=0.01))#[64, 64] #b_fc1 = tf.Variable(tf.zeros([512]))#64 b_fc1 = tf.Variable(tf.zeros([64]))#64 h_fc1 = tf.nn.relu(tf.matmul(x_flat, W_fc1) + b_fc1) # output layer (n_actions) #W_out = tf.Variable(tf.truncated_normal([512, self.n_actions], stddev=0.01)) W_out = tf.Variable(tf.truncated_normal([64, self.n_actions], stddev=0.01)) b_out = tf.Variable(tf.zeros([self.n_actions])) self.y = tf.matmul(h_fc1, W_out) + b_out # loss function self.y_ = tf.placeholder(tf.float32, [None, self.n_actions]) self.loss = tf.reduce_mean(tf.square(self.y_ - self.y)) # train operation optimizer = tf.train.RMSPropOptimizer(self.learning_rate) self.training = optimizer.minimize(self.loss) # saver self.saver = tf.train.Saver() # session self.sess = tf.Session() self.sess.run(tf.global_variables_initializer()) def Q_values(self, state): # Q(state, action) of all actions return self.sess.run(self.y, feed_dict={self.x: [state]})[0] def select_action(self, state, epsilon): if np.random.rand() <= epsilon: # random return np.random.choice(self.enable_actions) else: # max_action Q(state, action) return self.enable_actions[np.argmax(self.Q_values(state))] def store_experience(self, state, action, reward, state_1, terminal): self.D.append((state, action, reward, state_1, terminal)) def experience_replay(self): state_minibatch = [] y_minibatch = [] # sample random minibatch minibatch_size = min(len(self.D), self.minibatch_size) minibatch_indexes = np.random.randint(0, len(self.D), minibatch_size) for j in minibatch_indexes: state_j, action_j, reward_j, state_j_1, terminal = self.D[j] action_j_index = self.enable_actions.index(action_j) y_j = self.Q_values(state_j) if terminal: y_j[action_j_index] = reward_j else: # reward_j + gamma * max_action' Q(state', action') y_j[action_j_index] = reward_j + self.discount_factor * np.max(self.Q_values(state_j_1)) # NOQA state_minibatch.append(state_j) y_minibatch.append(y_j) # training self.sess.run(self.training, feed_dict={self.x: state_minibatch, self.y_: y_minibatch}) # for log self.current_loss = self.sess.run(self.loss, feed_dict={self.x: state_minibatch, self.y_: y_minibatch}) def load_model(self, model_path=None): if model_path: # load from model_path print(" model_path:", model_path) self.saver.restore(self.sess, model_path) else: # load from checkpoint checkpoint = tf.train.get_checkpoint_state(self.model_dir) if checkpoint and checkpoint.model_checkpoint_path: self.saver.restore(self.sess, checkpoint.model_checkpoint_path) print(" model_dir:", checkpoint.model_checkpoint_path) def save_model(self): self.saver.save(self.sess, os.path.join(self.model_dir, self.model_name))
robot.py import os import numpy as np import matplotlib.pyplot as plt import math import copy class robot: def __init__(self): fig, ax = plt.subplots(1, 1) # parameters self.name = os.path.splitext(os.path.basename(__file__))[0] hit = 0 self.fieldX = 8 self.fieldY = 8 self.robotX =0 self.robotY =0 self.initrobotPositionX = 4 self.initrobotPositionY = 4 self.robotMove = 1 self.enable_actions = (0, 1, 2,3,4) self.frame_rate = 15 self.targetNum = 1 self.hit = 0 self.time = 0 self.around = 1 # variables self.reset() def update(self, action): """ action: 0: do nothing 1: move up 2: move right 3: move down 4: move left """ # update player position if action == self.enable_actions[1]: # move up self.robotY = max(0, self.robotY - self.robotMove) elif action == self.enable_actions[2]: # move right self.robotX = min(self.robotX + self.robotMove , self.fieldX-1) elif action == self.enable_actions[3]: # move down self.robotY = min(self.fieldY-1, self.robotY + self.robotMove) elif action == self.enable_actions[4]: # move left self.robotX = max(0, self.robotX - self.robotMove) else: # do nothing pass # update self.hit = self.targetUpdate(self.initrobotPositionX ,self.initrobotPositionY) # collision detection #reward reset self.reward =0 # end flag self.terminal = False if(self.time>=5 or self.hit): self.terminal = True if self.hit: # hit self.reward = 1 else: # no hit self.reward = -1 def targetUpdate(self,X,Y): i = 0 self.time+=1 print("time:",self.time) for i in range(self.targetNum): #self.rand =np.random.randint(1, 4, self.targetNum) #if self.rand == 1: # self.fannelPointY = max(0, self.fannelPointY-1) #if self.rand == 2: # self.fannelPointY = min(self.fieldY-1, self.fannelPointY+1) #if self.rand == 3: # self.fannelPointX =max(0, self.fannelPointX-1) #if self.rand == 4: # self.fannelPointX = min(self.fannelPointX+1,self.fieldX-1) if self.robotX - self.around <= self.fannelPointX and self.robotX + self.around >= self.fannelPointX: if self.robotY - self.around <= self.fannelPointY and self.robotY+self.around >= self.fannelPointY: self.hit = 1 print("hit:" , self.hit) plt.clf() plt.plot(-2, -2, "ro") plt.plot(10, 10, "ro") plt.plot(self.robotX, self.robotY, c='red',marker='^') plt.plot(self.fannelPointX, self.fannelPointY, c='blue',marker='o') plt.pause(1) return self.hit def draw(self): self.screen = np.zeros((self.fieldX, self.fieldY)) print("robot_position:",self.robotX , "," , self.robotY ) self.screen[self.robotX, self.robotY] = 1 print("target_position",self.fannelPointX , "," , self.fannelPointY ) self.screen[self.fannelPointX, self.fannelPointY] = 2 print( self.screen) print("---") def observe(self): self.draw() return self.screen, self.reward, self.terminal def execute_action(self, action): self.update(action) def reset(self): print("robot_init") self.hit=0 # reset other variables self.reward = 0 self.terminal = False self.robotX = self.initrobotPositionX self.robotY = self.initrobotPositionY self.x =np.random.randint(0, self.fieldX, self.targetNum) self.y =np.random.randint(0, self.fieldY, self.targetNum) rand =np.random.randint(0,(self.fieldX) , self.targetNum) i=0 self.fannelPointX = self.x[i] self.fannelPointY = self.y[i] if self.robotX - self.around <= self.fannelPointX and self.robotX + self.around >= self.fannelPointX: if self.robotY - self.around <= self.fannelPointY and self.robotY+self.around >= self.fannelPointY: if rand >= self.fieldX/2: self.fannelPointX = 0 self.fannelPointY = 0 if rand <= self.fieldX/2: self.fannelPointX = self.fieldX-1 self.fannelPointY = self.fieldY-1 self.time=0
train.py import numpy as np from robot import robot from dqn_agent import DQNAgent def train(): # parameters n_epochs = 15000 # environment, agent env = robot() agent = DQNAgent(env.enable_actions, env.name) # variables win = 0 for e in range(n_epochs): # reset frame = 0 loss = 0.0 Q_max = 0.0 env.reset() state_t_1, reward_t, terminal = env.observe() while not terminal: state_t = state_t_1 # execute action in environment action_t = agent.select_action(state_t, agent.exploration) env.execute_action(action_t) # observe environment state_t_1, reward_t, terminal = env.observe() # store experience agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal) # experience replay agent.experience_replay() # for log frame += 1 loss += agent.current_loss Q_max += np.max(agent.Q_values(state_t)) if reward_t == 1: win += 1 print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format( e, n_epochs - 1, win, loss / frame, Q_max / frame)) agent.exploration = agent.exploration - (( 1.0-0.1)/n_epochs) if(n_epochs % 1000): agent.save_model() # save model agent.save_model()
今後はこうしていきたいですね↓
[参考]
超シンプルにTensorFlowでDQN (Deep Q Network) を実装してみる 〜導入編〜 | ALGO GEEKS
[Python]強化学習(DQN)を実装しながらKerasに慣れる - Qiita
DQNをKerasとTensorFlowとOpenAI Gymで実装する