侵权投诉
搜索
更多>> 热门搜索:
订阅
纠错
加入自媒体

用深度强化学习玩超级马里奥兄弟

2021-08-19 12:17
磐创AI
关注

介绍

从本文中,你将学习如何使用 Deep Q-Network 和 Double Deep Q-Network(带代码!)玩超级马里奥兄弟。

超级马里奥是任天堂在 1980 年代开发和发行的著名游戏。它是历经多年无需解释的经典游戏名称之一。这是一款2D横向卷轴游戏,让玩家可以控制主角——马里奥。游戏玩法包括从左到右移动马里奥,从反派中生存下来,获得硬币,以及到达旗帜以清除关卡。马里奥最终需要拯救公主。这些有不同的奖励系统、硬币、反派、漏洞和完成时间。游戏环境取自 OpenAI Gym,使用 Nintendo Entertainment System (NES) python 模拟器。在本文中,我将展示如何使用深度 Q 网络 (DQN) 和深度双 Q 网络 (DDQN) 算法和PyTorch 库来实现强化学习算法,以检查它们各自的性能。然后评估对每种算法进行的实验。数据理解和预处理超级马里奥兄弟的原始观察空间是 240 x 256 x 3 的 RGB 图像。动作空间是 256,这意味着能够采取 256 种不同的可能动作。为了加快我们模型的训练时间,我们使用了gym的包装器函数对原始环境应用了某些转换:在 4 帧上重复代理的每个动作并减小视频帧大小,即环境中的每个状态都是 4 x 84 x 84 x 1(4 个连续 84 x 84 灰度像素帧的列表)将像素值归一化到 0 到 1 的范围内将动作次数减少到 5(仅右)、7(简单动作)和 12(复杂动作)理论结果最初,我想使用 Q-learning 执行一个实验,该实验使用二维数组来存储状态和动作对值的所有可能组合。但是,在这种环境设置中,我意识到应用 Q-learning 是不可能的,因为需要存储非常大的 Q-table, 而这是不可行的。因此,本项目使用 DQN 算法作为基线模型。DQN 算法使用  Q-learning 来学习在给定状态下采取的最佳动作,并使用深度神经网络来估计 Q 值函数。我使用的深度神经网络类型是一个 3 层卷积神经网络,后跟两个完全连接的线性层,每个可能的动作都有一个输出。该网络的工作原理类似于 Q-Learning 算法中的 Q-table。我们使用的目标损失函数是 Huber 损失或 Q 值的平滑平均绝对误差。Huber loss 结合了 MSE 和 MAE 来最小化目标函数。我们用来优化目标函数的优化器是 Adam。但是,DQN 网络存在高估的问题。

图 1:说明 DQN 网络如何被高估如图1所示,高估的主要原因有两个。第一个原因是由于用于计算目标值的最大化函数。假设action值为True, 表示为:x(a?) … x(a?)。由 DQN 做出的噪声估计由 Q(s,a?;w), ... Q(s, a?;w) 表示,在数学上,

因此它高估了真实的 Q 值。第二个原因是高估的 Q 值再次被用于通过反向传播更新 Q 网络的权重。这使得高估更加严重。高估的主要缺点是由于 DQN 所做的非均匀高估。直观的感觉是,一个特定的状态、操作对在重放缓冲区中出现的频率越高,对该状态-操作对的高估就越高。为了获得更准确的 Q 值,我们想在我们的问题上使用 DDQN 网络,然后将实验结果与之前的 DQN 网络进行比较。为了减轻由最大化引起的高估,DDQN 使用 2 个 Q 网络,一个用于获取动作,另一个用于通过反向传播更新权重。DDQN Q-learning更新方程为:

Q* 用于更新权重,Q^ 用于获取特定状态的动作。Q^ 只是每 n 步复制 Q* 的值。实验结果使用 2 种算法 DQN 和 DDQN,基于智能体的不同运动进行了 5 次实验。不同的动作是复杂动作、简单动作和仅右动作。参数设置如下:观察空间:4 x 84 x 84 x 1动作空间:12(复杂动作)或7(简单动作)或5(仅右动作)损失函数:HuberLoss,δ = 1优化器:Adam,lr = 0.00025betas  = (0.9, 0.999)批大小 = 64 Dropout = 0.2gamma  = 0.9体验回放的最大内存大小 = 30000对于 epsilon greedy:探索衰减 = 0.99,探索最小值 = 0.05在探索开始时,max = 1,代理将采取随机动作。在每一次动作之后,它将以探索衰减率衰减,直到达到 0.05 的探索最小值。实验一进行的第一个实验是比较 DDQN 和 DQN 算法用于智能体的复杂运动。

实验二进行的第二个实验是比较 DDQN 和 DQN 算法对于智能体的简单移动。

实验三进行的第三个实验是比较 DDQN 和 DQN 算法仅适用于代理的右运动。

从以上 3 个实验结果可以看出,在所有情况下,DQN 在第 10,000 集的性能与 DDQN 在第 2,000 集的性能大致相同。因此,我们可以得出结论,DDQN 网络有助于消除由 DQN 网络引起的高估问题。使用 DDQN 和 DQN 对 3 种不同运动进行了进一步的实验。实验四进行的第四个实验是在所有 3 个不同的动作上使用 DDQN 算法。

实验五进行的第五个实验是对所有 3 个不同的动作使用 DQN 算法

从以上 2 个实验结果,我们可以得出结论,该网络能够在仅允许代理在仅右运动的动作空间上进行更好的训练。代码import torch

import torch.nn as nn

import random

from nes_py.wrappers import JoypadSpace

import gym_super_mario_bros

from tqdm import tqdm

import pickle 

from gym_super_mario_bros.actions import RIGHT_ONLY, SIMPLE_MOVEMENT, COMPLEX_MOVEMENT

import gym

import numpy as np

import collections 

import cv2

import matplotlib.pyplot as plt

%matplotlib inline

import time

import pylab as pl

from IPython import display

class MaxAndSkipEnv(gym.Wrapper):

   """

    Each action of the agent is repeated over skip frames

       return only every `skip`-th frame

   """

 def __init__(self, env=None, skip=4):

       super(MaxAndSkipEnv, self).__init__(env)
       # most recent raw observations (for max pooling across time steps)
       self._obs_buffer = collections.deque(maxlen=2)
       self._skip = skip
   def step(self, action):
       total_reward = 0.0
       done = None
       for _ in range(self._skip):
           obs, reward, done, info = self.env.step(action)
           self._obs_buffer.append(obs)
           total_reward += reward
           if done:
               break
       max_frame = np.max(np.stack(self._obs_buffer), axis=0)
       return max_frame, total_reward, done, info
   def reset(self):
       """Clear past frame buffer and init to first obs"""
       self._obs_buffer.clear()
       obs = self.env.reset()
       self._obs_buffer.append(obs)
       return obs
class MarioRescale84x84(gym.ObservationWrapper):
   """
   Downsamples/Rescales each frame to size 84x84 with greyscale
   """
   def __init__(self, env=None):
       super(MarioRescale84x84, self).__init__(env)
       self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
   def observation(self, obs):
       return MarioRescale84x84.process(obs)
   @staticmethod
   def process(frame):
       if frame.size == 240 * 256 * 3:
           img = np.reshape(frame, [240, 256, 3]).astype(np.float32)
       else:
           assert False, "Unknown resolution."
       # image normalization on RBG
       img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
       resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)
       x_t = resized_screen[18:102, :]
       x_t = np.reshape(x_t, [84, 84, 1])
       return x_t.astype(np.uint8)
class ImageToPyTorch(gym.ObservationWrapper):
   """
   Each frame is converted to PyTorch tensors
   """
   def __init__(self, env):
       super(ImageToPyTorch, self).__init__(env)
       old_shape = self.observation_space.shape
       self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32)
   def observation(self, observation):
       return np.moveaxis(observation, 2, 0)
   
class BufferWrapper(gym.ObservationWrapper):
   """
   Only every k-th frame is collected by the buffer
   """
   def __init__(self, env, n_steps, dtype=np.float32):
       super(BufferWrapper, self).__init__(env)
       self.dtype = dtype
       old_space = env.observation_space
       self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                               old_space.high.repeat(n_steps, axis=0), dtype=dtype)
   def reset(self):
       self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
       return self.observation(self.env.reset())
   def observation(self, observation):
       self.buffer[:-1] = self.buffer[1:]
       self.buffer[-1] = observation
       return self.buffer
class PixelNormalization(gym.ObservationWrapper):
   """
   Normalize pixel values in frame --> 0 to 1
   """
   def observation(self, obs):
       return np.array(obs).astype(np.float32) / 255.0
def create_mario_env(env):
   env = MaxAndSkipEnv(env)
   env = MarioRescale84x84(env)
   env = ImageToPyTorch(env)
   env = BufferWrapper(env, 4)
   env = PixelNormalization(env)
   return JoypadSpace(env, SIMPLE_MOVEMENT)
 
class DQNSolver(nn.Module):
   """
   Convolutional Neural Net with 3 conv layers and two linear layers
   """
   def __init__(self, input_shape, n_actions):
       super(DQNSolver, self).__init__()
       self.conv = nn.Sequential(
           nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
           nn.ReLU(),
           nn.Conv2d(32, 64, kernel_size=4, stride=2),
           nn.ReLU(),
           nn.Conv2d(64, 64, kernel_size=3, stride=1),
           nn.ReLU()
       )
       conv_out_size = self._get_conv_out(input_shape)
       self.fc = nn.Sequential(
           nn.Linear(conv_out_size, 512),
           nn.ReLU(),
           nn.Linear(512, n_actions)
       )
   
   def _get_conv_out(self, shape):
       o = self.conv(torch.zeros(1, *shape))
       return int(np.prod(o.size()))
   def forward(self, x):
       conv_out = self.conv(x).view(x.size()[0], -1)
       return self.fc(conv_out)
class DQNAgent:
   def __init__(self, state_space, action_space, max_memory_size, batch_size, gamma, lr,
                dropout, exploration_max, exploration_min, exploration_decay, double_dqn, pretrained):
       # Define DQN Layers
       self.state_space = state_space
       self.action_space = action_space
       self.double_dqn = double_dqn
       self.pretrained = pretrained
       self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
       
       # Double DQN network
       if self.double_dqn:  
           self.local_net = DQNSolver(state_space, action_space).to(self.device)
           self.target_net = DQNSolver(state_space, action_space).to(self.device)
           
           if self.pretrained:
               self.local_net.load_state_dict(torch.load("DQN1.pt", map_location=torch.device(self.device)))
               self.target_net.load_state_dict(torch.load("DQN2.pt", map_location=torch.device(self.device)))
                   
           self.optimizer = torch.optim.Adam(self.local_net.parameters(), lr=lr)
           self.copy = 5000  # Copy the local model weights into the target network every 5000 steps
           self.step = 0
       # DQN network
       else:  
           self.dqn = DQNSolver(state_space, action_space).to(self.device)
           
           if self.pretrained:
               self.dqn.load_state_dict(torch.load("DQN.pt", map_location=torch.device(self.device)))
           self.optimizer = torch.optim.Adam(self.dqn.parameters(), lr=lr)
       # Create memory
       self.max_memory_size = max_memory_size
       if self.pretrained:
           self.STATE_MEM = torch.load("STATE_MEM.pt")
           self.ACTION_MEM = torch.load("ACTION_MEM.pt")
           self.REWARD_MEM = torch.load("REWARD_MEM.pt")
           self.STATE2_MEM = torch.load("STATE2_MEM.pt")
           self.DONE_MEM = torch.load("DONE_MEM.pt")
           with open("ending_position.pkl", 'rb') as f:
               self.ending_position = pickle.load(f)
           with open("num_in_queue.pkl", 'rb') as f:
               self.num_in_queue = pickle.load(f)
       else:
           self.STATE_MEM = torch.zeros(max_memory_size, *self.state_space)
           self.ACTION_MEM = torch.zeros(max_memory_size, 1)
           self.REWARD_MEM = torch.zeros(max_memory_size, 1)
           self.STATE2_MEM = torch.zeros(max_memory_size, *self.state_space)
           self.DONE_MEM = torch.zeros(max_memory_size, 1)
           self.ending_position = 0
           self.num_in_queue = 0
       
       self.memory_sample_size = batch_size
       
       # Learning parameters
       self.gamma = gamma
       self.l1 = nn.SmoothL1Loss().to(self.device) # Also known as Huber loss
       self.exploration_max = exploration_max
       self.exploration_rate = exploration_max
       self.exploration_min = exploration_min
       self.exploration_decay = exploration_decay
   def remember(self, state, action, reward, state2, done):
       """Store the experiences in a buffer to use later"""
       self.STATE_MEM[self.ending_position] = state.float()
       self.ACTION_MEM[self.ending_position] = action.float()
       self.REWARD_MEM[self.ending_position] = reward.float()
       self.STATE2_MEM[self.ending_position] = state2.float()
       self.DONE_MEM[self.ending_position] = done.float()
       self.ending_position = (self.ending_position + 1) % self.max_memory_size  # FIFO tensor
       self.num_in_queue = min(self.num_in_queue + 1, self.max_memory_size)
   
   def batch_experiences(self):
       """Randomly sample 'batch size' experiences"""
       idx = random.choices(range(self.num_in_queue), k=self.memory_sample_size)
       STATE = self.STATE_MEM[idx]
       ACTION = self.ACTION_MEM[idx]
       REWARD = self.REWARD_MEM[idx]
       STATE2 = self.STATE2_MEM[idx]
       DONE = self.DONE_MEM[idx]      
       return STATE, ACTION, REWARD, STATE2, DONE
   
   def act(self, state):
       """Epsilon-greedy action"""
       if self.double_dqn:
           self.step += 1
       if random.random() < self.exploration_rate:  
           return torch.tensor([[random.randrange(self.action_space)]])
       if self.double_dqn:
           # Local net is used for the policy
           return torch.argmax(self.local_net(state.to(self.device))).unsqueeze(0).unsqueeze(0).cpu()
       else:
           return torch.argmax(self.dqn(state.to(self.device))).unsqueeze(0).unsqueeze(0).cpu()
   
   def copy_model(self):
       """Copy local net weights into target net for DDQN network"""
       self.target_net.load_state_dict(self.local_net.state_dict())
   
   def experience_replay(self):
       """Use the double Q-update or Q-update equations to update the network weights"""
       if self.double_dqn and self.step % self.copy == 0:
           self.copy_model()
       if self.memory_sample_size > self.num_in_queue:
           return
   
       # Sample a batch of experiences
       STATE, ACTION, REWARD, STATE2, DONE = self.batch_experiences()
       STATE = STATE.to(self.device)
       ACTION = ACTION.to(self.device)
       REWARD = REWARD.to(self.device)
       STATE2 = STATE2.to(self.device)
       DONE = DONE.to(self.device)
       
       self.optimizer.zero_grad()
       if self.double_dqn:
           # Double Q-Learning target is Q*(S, A) <- r + γ max_a Q_target(S', a)
           target = REWARD + torch.mul((self.gamma * self.target_net(STATE2).max(1).values.unsqueeze(1)),  1 - DONE)
           current = self.local_net(STATE).gather(1, ACTION.long()) # Local net approximation of Q-value
       else:
           # Q-Learning target is Q*(S, A) <- r + γ max_a Q(S', a)
           target = REWARD + torch.mul((self.gamma * self.dqn(STATE2).max(1).values.unsqueeze(1)), 1 - DONE)
               
           current = self.dqn(STATE).gather(1, ACTION.long())
       
       loss = self.l1(current, target)
       loss.backward() # Compute gradients
       self.optimizer.step() # Backpropagate error
       self.exploration_rate *= self.exploration_decay
       
       # Makes sure that exploration rate is always at least 'exploration min'
       self.exploration_rate = max(self.exploration_rate, self.exploration_min)
       
def show_state(env, ep=0, info=""):
   """While testing show the mario playing environment"""
   plt.figure(3)
   plt.clf()
   plt.imshow(env.render(mode='rgb_array'))
   plt.title("Episode: %d %s" % (ep, info))
   plt.axis('off')
   display.clear_output(wait=True)
   display.display(plt.gcf())
   
def run(training_mode, pretrained, double_dqn, num_episodes=1000, exploration_max=1):
 
   env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
   env = create_mario_env(env)  # Wraps the environment so that frames are grayscale
   observation_space = env.observation_space.shape
   action_space = env.action_space.n
   agent = DQNAgent(state_space=observation_space,
                    action_space=action_space,
                    max_memory_size=30000,
                    batch_size=32,
                    gamma=0.90,
                    lr=0.00025,
                    dropout=0.2,
                    exploration_max=1.0,
                    exploration_min=0.02,
                    exploration_decay=0.99,
                    double_dqn=double_dqn,
                    pretrained=pretrained)
   
   # Restart the enviroment for each episode
   num_episodes = num_episodes
   env.reset()
   
   total_rewards = []
   if training_mode and pretrained:
       with open("total_rewards.pkl", 'rb') as f:
           total_rewards = pickle.load(f)
   
   for ep_num in tqdm(range(num_episodes)):
       state = env.reset()
       state = torch.Tensor([state])
       total_reward = 0
       steps = 0
       while True:
           if not training_mode:
               show_state(env, ep_num)
           action = agent.act(state)
           steps += 1
           
           state_next, reward, terminal, info = env.step(int(action[0]))
           total_reward += reward
           state_next = torch.Tensor([state_next])
           reward = torch.tensor([reward]).unsqueeze(0)
           
           terminal = torch.tensor([int(terminal)]).unsqueeze(0)
           
           if training_mode:
               agent.remember(state, action, reward, state_next, terminal)
               agent.experience_replay()
           
           state = state_next
           if terminal:
               break
       
       total_rewards.append(total_reward)
       
       if ep_num != 0 and ep_num % 100 == 0:
           print("Episode {} score = {}, average score = {}".format(ep_num + 1, total_rewards[-1], np.mean(total_rewards)))
       num_episodes += 1  
   print("Episode {} score = {}, average score = {}".format(ep_num + 1, total_rewards[-1], np.mean(total_rewards)))
   
   # Save the trained memory so that we can continue from where we stop using 'pretrained' = True
   if training_mode:
       with open("ending_position.pkl", "wb") as f:
           pickle.dump(agent.ending_position, f)
       with open("num_in_queue.pkl", "wb") as f:
           pickle.dump(agent.num_in_queue, f)
       with open("total_rewards.pkl", "wb") as f:
           pickle.dump(total_rewards, f)
       if agent.double_dqn:
           torch.save(agent.local_net.state_dict(), "DQN1.pt")
           torch.save(agent.target_net.state_dict(), "DQN2.pt")
       else:
           torch.save(agent.dqn.state_dict(), "DQN.pt")  
       torch.save(agent.STATE_MEM,  "STATE_MEM.pt")
       torch.save(agent.ACTION_MEM, "ACTION_MEM.pt")
       torch.save(agent.REWARD_MEM, "REWARD_MEM.pt")
       torch.save(agent.STATE2_MEM, "STATE2_MEM.pt")
       torch.save(agent.DONE_MEM,   "DONE_MEM.pt")
   
   env.close()

# For training

run(training_mode=True, pretrained=False, double_dqn=True, num_episodes=1, exploration_max = 1)

# For Testing

run(training_mode=False, pretrained=True, double_dqn=True, num_episodes=1, exploration_max = 0.05)

结论与DQN相比,DDQN需要的训练片段要少得多。因此,DDQN网络有助于消除DQN网络中存在的高估问题。DQN和DDQN网络相比简单和复杂的运动动作空间,都能更好地进行仅右运动训练。

声明: 本文由入驻维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。

发表评论

0条评论,0人参与

请输入评论内容...

请输入评论/评论长度6~500个字

您提交的评论过于频繁,请输入验证码继续

暂无评论

暂无评论

    通信 猎头职位 更多
    文章纠错
    x
    *文字标题:
    *纠错内容:
    联系邮箱:
    *验 证 码:

    粤公网安备 44030502002758号