レオリョスタジオ
OpenAI Gymのブロック崩しを挑戦します。
OpenAIからお借りしたイメージ
https://gym.openai.com/videos/2019-10-21–mqt8Qj1mwo/Breakout-v0/poster.jpg
今回はKerasを使ってActor-Criticモデルを構築して訓練ます。Kerasはより簡単にディープラーニングを導入するAPI、自分で調整する必要がある変数や関数が少ないので初心者向けいいサンプルだと思います。
Actor-Criticモデルは強化学習モデルの一つです。詳しい説明はTensorflowのブログに載せています。 (https://www.tensorflow.org/tutorials/reinforcement_learning/actor_critic) しかしリンクのCartPole例と違って、今回ブロック崩しをやります。CartPoleのような4つの変数より、ブロック崩し(每フレーム1枚84×84の画像)の方が遥かに多いのでCNNネットワークを使います。詳しい構成は後で説明します。
この記事は7月にGithubに投稿した記事に基づいて作成します。
https://github.com/leolui2004/atari_rl_ac
ゲームプレイと強化学習分かれて説明します。ゲームプレイの部分はOpenAIのGym環境の中にゲームをやります。強化学習の部分はゲームからもらった変数を訓練して、予測したアクションをゲームに反映されます。
import gym import random import numpy as np env = gym.make('Breakout-v4') episode_limit = 5000 random_step = 20 timestep_limit = 100000 #永遠にプレーできないように制限 model_train = 1 #0にすると訓練しない(ただのランダムプレー) log_save = 1 #0にするとログ保存しない log_path = 'atari_ac_log.txt' score_list = [] step_list = [] for episode in range(episode_limit): #毎回プレーする前に環境をリセット observation = env.reset() score = 0 #ボールの位置をランダムさせるために最初のランダムのステップ数で何もしない for _ in range(random.randint(1, random_step)): observation_last = observation observation, _, _, _ = env.step(0) #行動したの観測データをエンコード(後ほど説明) state = encode_initialize(observation, observation_last) for timestep in range(timestep_limit): observation_last = observation #予測するアクションをモデルから取得(後ほど説明) action = action_choose(state[np.newaxis, :], epsilon, episode, action_space) #予測したアクションに基づいて行動 observation, reward, done, _ = env.step(action) #行動したの観測データをエンコード(後ほど説明) state_next = encode(observation, observation_last, state) if model_train == 1: #行動したの観測データをモデルに送って学習させる(後ほど説明) network_learn(state[np.newaxis, :], action, reward, state_next[np.newaxis, :], done) state = state_next score += reward #ゲーム終わりもしくはtimestep_limit到達(強制終了) if done or timestep == timestep_limit - 1: #結果を記録 score_list.append(score) step_list.append(timestep) if log_save == 1: log(log_path, episode, timestep, score) print('Episode {} Timestep {} Score {}'.format(episode + 1, timestep, score)) break #アクションを一定の程度にランダムさせる関数(後ほど説明) epsilon = epsilon_reduce(epsilon, episode) env.close()
エンコードの部分はグレースケール転換、リサイズと4つ連続の84×84の画像(フレーム)を合成します。これによるとよりボールのアクションを記録できて、訓練しやすいということです。
from skimage.color import rgb2gray from skimage.transform import resize frame_length = 4 frame_width = 84 frame_height = 84 def encode_initialize(observation, last_observation): processed_observation = np.maximum(observation, last_observation) processed_observation_resize = np.uint8(resize(rgb2gray(processed_observation), (frame_width, frame_height)) * 255) state = [processed_observation_resize for _ in range(frame_length)] state_encode = np.stack(state, axis=0) return state_encode def encode(observation, last_observation, state): processed_observation = np.maximum(observation, last_observation) processed_observation_resize = np.uint8(resize(rgb2gray(processed_observation), (frame_width, frame_height)) * 255) state_next_return = np.reshape(processed_observation_resize, (1, frame_width, frame_height)) state_encode = np.append(state[1:, :, :], state_next_return, axis=0) return state_encode
from keras import backend as K from keras.layers import Dense, Input, Flatten, Conv2D from keras.models import Model, load_model from keras.optimizers import Adam from keras.utils import plot_model verbose = 0 action_dim = env.action_space.n action_space = [i for i in range(action_dim)] # ['NOOP', 'FIRE', 'RIGHT', 'LEFT'] discount = 0.97 actor_lr = 0.001 #actorの学習率 critic_lr = 0.001 #criticの学習率 pretrain_use = 0 #1にすると訓練されたモデルを使う actor_h5_path = 'atari_ac_actor.h5' critic_h5_path = 'atari_ac_critic.h5' #モデル構築 input = Input(shape=(frame_length, frame_width, frame_height)) delta = Input(shape=[1]) con1 = Conv2D(32, (8, 8), strides=(4, 4), padding='same', activation='relu')(input) con2 = Conv2D(64, (4, 4), strides=(2, 2), padding='same', activation='relu')(con1) fla1 = Flatten()(con2) dense = Dense(128, activation='relu')(fla1) #prob, valueをシェア prob = Dense(action_dim, activation='softmax')(dense) #actor部分 value = Dense(1, activation='linear')(dense) #critic部分 #ロス関数の定義 def custom_loss(y_true, y_pred): out = K.clip(y_pred, 1e-8, 1-1e-8) #限界を設定 log_lik = y_true * K.log(out) #方策勾配 return K.sum(-log_lik * delta) if pretrain_use == 1: #訓練されたモデルを使う actor = load_model(actor_h5_path, custom_objects={'custom_loss': custom_loss}, compile=False) critic = load_model(critic_h5_path) actor = Model(inputs=[input, delta], outputs=[prob]) critic = Model(inputs=[input], outputs=[value]) policy = Model(inputs=[input], outputs=[prob]) actor.compile(optimizer=Adam(lr=actor_lr), loss=custom_loss) critic.compile(optimizer=Adam(lr=critic_lr), loss='mean_squared_error') #アクションを予測 def action_choose(state, epsilon, episode, action_space): #epsilonは最初に1に設定して徐々に下げる #毎回行動する時ランダム数字と比べて #epsilonの方が大きいならランダムアクションを取る if epsilon >= random.random() or episode < initial_replay: action = random.randrange(action_dim) else: probabiliy = policy.predict(state)[0] #予測した結果は4つのアクションに対してそれぞれ確率がある #その確率に沿ってアクションを選ぶ action = np.random.choice(action_space, p=probabiliy) return action #データを学習 def network_learn(state, action, reward, state_next, done): reward_clip = np.sign(reward) critic_value = critic.predict(state) critic_value_next = critic.predict(state_next) target = reward_clip + discount * critic_value_next * (1 - int(done)) delta = target - critic_value actions = np.zeros([1, action_dim]) actions[np.arange(1), action] = 1 actor.fit([state, delta], actions, verbose=verbose) critic.fit(state, target, verbose=verbose)
この部分は他の機能として強化学習との直接関係がないですけどを合わせて書きます。
import matplotlib.pyplot as plt model_save = 1 #0にするとモデルを保存しない score_avg_freq = 50 epsilon_start = 1.0 #epsilon開始時の確率 epsilon_end = 0.1 #epsilon最低の確率(最低でも10%でランダムアクション) epsilon_step = episode_limit epsilon = 1.0 epsilon_reduce_step = (epsilon_start - epsilon_end) / epsilon_step initial_replay = 200 actor_graph_path = 'atari_ac_actor.png' critic_graph_path = 'atari_ac_critic.png' policy_graph_path = 'atari_ac_policy.png' #epsilon下げさせるの関数 def epsilon_reduce(epsilon, episode): if epsilon > epsilon_end and episode >= initial_replay: epsilon -= epsilon_reduce_step return epsilon #ログを書く def log(log_path, episode, timestep, score): logger = open(log_path, 'a') if episode == 0: logger.write('Episode Timestep Score\n') logger.write('{} {} {}\n'.format(episode + 1, timestep, score)) logger.close() if pretrain_use == 1: if model_save == 1: actor.save(actor_h5_path) critic.save(critic_h5_path) else: if model_save == 1: actor.save(actor_h5_path) critic.save(critic_h5_path) #モデル構成を図に出力 plot_model(actor, show_shapes=True, to_file=actor_graph_path) plot_model(critic, show_shapes=True, to_file=critic_graph_path) plot_model(policy, show_shapes=True, to_file=policy_graph_path) #結果を図に出力 xaxis = [] score_avg_list = [] step_avg_list = [] for i in range(1, episode_limit + 1): xaxis.append(i) if i < score_avg_freq: score_avg_list.append(np.mean(score_list[:])) step_avg_list.append(np.mean(step_list[:])) else: score_avg_list.append(np.mean(score_list[i - score_avg_freq:i])) step_avg_list.append(np.mean(step_list[i - score_avg_freq:i])) plt.plot(xaxis, score_avg_list) plt.show() plt.plot(xaxis, step_avg_list) plt.show()