From a15a93bc683cf3cf44642ec5cb5649dd8a8f6589 Mon Sep 17 00:00:00 2001 From: vzhuang Date: Mon, 20 Apr 2020 21:54:07 -0700 Subject: [PATCH] normalize rewards by standard deviation of discounted return in MuJoCo --- rlpyt/algos/pg/a2c.py | 3 ++- rlpyt/algos/pg/base.py | 22 ++++++++++++++----- rlpyt/algos/pg/ppo.py | 3 ++- .../configs/mujoco/pg/mujoco_a2c.py | 2 ++ .../configs/mujoco/pg/mujoco_ppo.py | 6 +++-- .../mujoco/pg/train/mujoco_ff_ppo_gpu.py | 6 ++--- rlpyt/runners/minibatch_rl.py | 1 + rlpyt/samplers/base.py | 1 + rlpyt/samplers/buffer.py | 4 ++++ rlpyt/samplers/collections.py | 2 +- rlpyt/samplers/collectors.py | 1 + rlpyt/samplers/parallel/cpu/collectors.py | 11 +++++++++- rlpyt/samplers/parallel/gpu/collectors.py | 13 +++++++++-- rlpyt/samplers/serial/sampler.py | 1 + 14 files changed, 60 insertions(+), 16 deletions(-) diff --git a/rlpyt/algos/pg/a2c.py b/rlpyt/algos/pg/a2c.py index 6f3d8556..fae0303e 100644 --- a/rlpyt/algos/pg/a2c.py +++ b/rlpyt/algos/pg/a2c.py @@ -28,6 +28,7 @@ def __init__( initial_optim_state_dict=None, gae_lambda=1, normalize_advantage=False, + normalize_rewards=False, ): """Saves the input settings.""" if optim_kwargs is None: @@ -84,7 +85,7 @@ def loss(self, samples): else: dist_info, value = self.agent(*agent_inputs) # TODO: try to compute everyone on device. - return_, advantage, valid = self.process_returns(samples) + return_, advantage, valid = self.process_returns(samples, self.normalize_rewards) dist = self.agent.distribution logli = dist.log_likelihood(samples.agent.action, dist_info) diff --git a/rlpyt/algos/pg/base.py b/rlpyt/algos/pg/base.py index e8b0afca..30c40d11 100644 --- a/rlpyt/algos/pg/base.py +++ b/rlpyt/algos/pg/base.py @@ -1,10 +1,12 @@ -# import torch from collections import namedtuple +import torch + from rlpyt.algos.base import RlAlgorithm from rlpyt.algos.utils import (discount_return, generalized_advantage_estimation, - valid_from_done) + valid_from_done) +from rlpyt.models.running_mean_std import RunningMeanStdModel # Convention: traj_info fields CamelCase, opt_info fields lowerCamelCase OptInfo = namedtuple("OptInfo", ["loss", "gradNorm", "entropy", "perplexity"]) @@ -35,8 +37,9 @@ def initialize(self, agent, n_itr, batch_spec, mid_batch_reset=False, self.n_itr = n_itr self.batch_spec = batch_spec self.mid_batch_reset = mid_batch_reset + self.rets_rms = None - def process_returns(self, samples): + def process_returns(self, samples, normalize_rewards=False): """ Compute bootstrapped returns and advantages from a minibatch of samples. Uses either discounted returns (if ``self.gae_lambda==1``) @@ -44,10 +47,19 @@ def process_returns(self, samples): according to ``mid_batch_reset`` or for recurrent agent. Optionally, normalize advantages. """ - reward, done, value, bv = (samples.env.reward, samples.env.done, - samples.agent.agent_info.value, samples.agent.bootstrap_value) + reward, done, value, bv, discounted_return = (samples.env.reward, samples.env.done, + samples.agent.agent_info.value, samples.agent.bootstrap_value, samples.env.discounted_return) done = done.type(reward.dtype) + if self.normalize_rewards: + if not self.rets_rms: + self.rets_rms = RunningMeanStdModel(torch.Size([1])) + discounted_return = discounted_return.view(-1) + discounted_return = discounted_return.unsqueeze(1) + self.rets_rms.update(discounted_return) + std_dev = torch.sqrt(self.rets_rms.var) + reward = torch.div(reward, std_dev) + if self.gae_lambda == 1: # GAE reduces to empirical discounted. return_ = discount_return(reward, done, bv, self.discount) advantage = return_ - value diff --git a/rlpyt/algos/pg/ppo.py b/rlpyt/algos/pg/ppo.py index d28eed60..ed415855 100644 --- a/rlpyt/algos/pg/ppo.py +++ b/rlpyt/algos/pg/ppo.py @@ -37,6 +37,7 @@ def __init__( ratio_clip=0.1, linear_lr_schedule=True, normalize_advantage=False, + normalize_rewards=False, ): """Saves input settings.""" if optim_kwargs is None: @@ -72,7 +73,7 @@ def optimize_agent(self, itr, samples): agent_inputs = buffer_to(agent_inputs, device=self.agent.device) if hasattr(self.agent, "update_obs_rms"): self.agent.update_obs_rms(agent_inputs.observation) - return_, advantage, valid = self.process_returns(samples) + return_, advantage, valid = self.process_returns(samples, self.normalize_rewards) loss_inputs = LossInputs( # So can slice all. agent_inputs=agent_inputs, action=samples.agent.action, diff --git a/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py b/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py index 58578dbe..69da8ffe 100644 --- a/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py +++ b/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py @@ -12,6 +12,7 @@ entropy_loss_coeff=0.0, value_loss_coeff=0.5, normalize_advantage=True, + normalize_rewards=True, ), env=dict(id="Hopper-v3"), model=dict(normalize_observation=False), @@ -24,6 +25,7 @@ batch_T=100, batch_B=8, max_decorrelation_steps=1000, + discount=0.99 ), ) diff --git a/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py b/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py index 9d668dbc..d32c4b7c 100644 --- a/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py +++ b/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py @@ -8,7 +8,7 @@ algo=dict( discount=0.99, learning_rate=3e-4, - clip_grad_norm=1e6, + clip_grad_norm=1e3, entropy_loss_coeff=0.0, gae_lambda=0.95, minibatches=32, @@ -16,10 +16,11 @@ ratio_clip=0.2, normalize_advantage=True, linear_lr_schedule=True, + normalize_rewards=True, # bootstrap_timelimit=False, ), env=dict(id="Hopper-v3"), - model=dict(normalize_observation=False), + model=dict(normalize_observation=True), optim=dict(), runner=dict( n_steps=1e6, @@ -29,6 +30,7 @@ batch_T=2048, batch_B=1, max_decorrelation_steps=0, + discount=0.99 ), ) diff --git a/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_gpu.py b/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_gpu.py index 3e5549e4..cf42315c 100644 --- a/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_gpu.py +++ b/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_gpu.py @@ -2,8 +2,8 @@ import sys from rlpyt.utils.launching.affinity import affinity_from_code -from rlpyt.samplers.gpu.parallel_sampler import GpuSampler -from rlpyt.samplers.gpu.collectors import ResetCollector +from rlpyt.samplers.parallel.gpu.sampler import GpuSampler +from rlpyt.samplers.parallel.gpu.collectors import GpuResetCollector from rlpyt.envs.gym import make as gym_make from rlpyt.algos.pg.ppo import PPO from rlpyt.agents.pg.mujoco import MujocoFfAgent @@ -23,7 +23,7 @@ def build_and_train(slot_affinity_code, log_dir, run_ID, config_key): sampler = GpuSampler( EnvCls=gym_make, env_kwargs=config["env"], - CollectorCls=ResetCollector, + CollectorCls=GpuResetCollector, **config["sampler"] ) algo = PPO(optim_kwargs=config["optim"], **config["algo"]) diff --git a/rlpyt/runners/minibatch_rl.py b/rlpyt/runners/minibatch_rl.py index 86456f24..9fd1dfe5 100644 --- a/rlpyt/runners/minibatch_rl.py +++ b/rlpyt/runners/minibatch_rl.py @@ -265,6 +265,7 @@ def train(self): def initialize_logging(self): self._traj_infos = deque(maxlen=self.log_traj_window) self._new_completed_trajs = 0 + logger.log("seed %d" % self.seed) logger.log(f"Optimizing over {self.log_interval_itrs} iterations.") super().initialize_logging() self.pbar = ProgBarCounter(self.log_interval_itrs) diff --git a/rlpyt/samplers/base.py b/rlpyt/samplers/base.py index 2d3922dd..2016b17f 100644 --- a/rlpyt/samplers/base.py +++ b/rlpyt/samplers/base.py @@ -38,6 +38,7 @@ def __init__( eval_env_kwargs=None, eval_max_steps=None, # int if using evaluation. eval_max_trajectories=None, # Optional earlier cutoff. + discount=1., # discount factor for discounted return of trajectories ): eval_max_steps = None if eval_max_steps is None else int(eval_max_steps) eval_max_trajectories = (None if eval_max_trajectories is None else diff --git a/rlpyt/samplers/buffer.py b/rlpyt/samplers/buffer.py index 0d56baab..b4220136 100644 --- a/rlpyt/samplers/buffer.py +++ b/rlpyt/samplers/buffer.py @@ -41,6 +41,7 @@ def build_samples_buffer(agent, env, batch_spec, bootstrap_value=False, observation = buffer_from_example(examples["observation"], (T, B), env_shared) all_reward = buffer_from_example(examples["reward"], (T + 1, B), env_shared) + discounted_return = buffer_from_example(examples["discounted_return"], (T, B), env_shared) reward = all_reward[1:] prev_reward = all_reward[:-1] # Writing to reward will populate prev_reward. done = buffer_from_example(examples["done"], (T, B), env_shared) @@ -51,6 +52,7 @@ def build_samples_buffer(agent, env, batch_spec, bootstrap_value=False, prev_reward=prev_reward, done=done, env_info=env_info, + discounted_return=discounted_return, ) samples_np = Samples(agent=agent_buffer, env=env_buffer) samples_pyt = torchify_buffer(samples_np) @@ -65,6 +67,7 @@ def get_example_outputs(agent, env, examples, subprocess=False): torch.set_num_threads(1) # Some fix to prevent MKL hang. o = env.reset() a = env.action_space.sample() + discounted_return = 0. o, r, d, env_info = env.step(a) r = np.asarray(r, dtype="float32") # Must match torch float dtype here. agent.reset() @@ -79,3 +82,4 @@ def get_example_outputs(agent, env, examples, subprocess=False): examples["env_info"] = env_info examples["action"] = a # OK to put torch tensor here, could numpify. examples["agent_info"] = agent_info + examples["discounted_return"] = r diff --git a/rlpyt/samplers/collections.py b/rlpyt/samplers/collections.py index 66e57dbc..8df3493a 100644 --- a/rlpyt/samplers/collections.py +++ b/rlpyt/samplers/collections.py @@ -11,7 +11,7 @@ AgentSamplesBsv = namedarraytuple("AgentSamplesBsv", ["action", "prev_action", "agent_info", "bootstrap_value"]) EnvSamples = namedarraytuple("EnvSamples", - ["observation", "reward", "prev_reward", "done", "env_info"]) + ["observation", "reward", "prev_reward", "done", "env_info", "discounted_return"]) class BatchSpec(namedtuple("BatchSpec", "T B")): diff --git a/rlpyt/samplers/collectors.py b/rlpyt/samplers/collectors.py index 32410abf..bb6685cb 100644 --- a/rlpyt/samplers/collectors.py +++ b/rlpyt/samplers/collectors.py @@ -22,6 +22,7 @@ def __init__( step_buffer_np=None, global_B=1, env_ranks=None, + discount=1., ): save__init__args(locals()) diff --git a/rlpyt/samplers/parallel/cpu/collectors.py b/rlpyt/samplers/parallel/cpu/collectors.py index b64a60f3..e8aba5ff 100644 --- a/rlpyt/samplers/parallel/cpu/collectors.py +++ b/rlpyt/samplers/parallel/cpu/collectors.py @@ -55,6 +55,10 @@ def collect_batch(self, agent_inputs, traj_infos, itr): env_buf.env_info[t, b] = env_info agent_buf.action[t] = action env_buf.reward[t] = reward + if t == 0: + env_buf.discounted_return[t] = reward + else: + env_buf.discounted_return[t] = (env_buf.discounted_return[t-1] * (1 - env_buf.done[t-1])) * self.discount + reward if agent_info: agent_buf.agent_info[t] = agent_info @@ -92,7 +96,7 @@ def __init__(self, *args, **kwargs): self.temp_observation = buffer_method( self.samples_np.env.observation[0, :len(self.envs)], "copy") - def collect_batch(self, agent_inputs, traj_infos, itr): + def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.): # Numpy arrays can be written to from numpy arrays or torch tensors # (whereas torch tensors can only be written to from torch tensors). agent_buf, env_buf = self.samples_np.agent, self.samples_np.env @@ -137,6 +141,11 @@ def collect_batch(self, agent_inputs, traj_infos, itr): agent_buf.action[t] = action env_buf.reward[t] = reward env_buf.done[t] = self.done + if t == 0: + env_buf.discounted_return[t] + else: + env_buf.discounted_return[t] = (env_buf.discounted_return[t - 1]) * ( + 1 - env_buf.done[t - 1]) * discount + reward if agent_info: agent_buf.agent_info[t] = agent_info diff --git a/rlpyt/samplers/parallel/gpu/collectors.py b/rlpyt/samplers/parallel/gpu/collectors.py index 3990fcbb..e150b341 100644 --- a/rlpyt/samplers/parallel/gpu/collectors.py +++ b/rlpyt/samplers/parallel/gpu/collectors.py @@ -15,7 +15,7 @@ class GpuResetCollector(DecorrelatingStartCollector): mid_batch_reset = True - def collect_batch(self, agent_inputs, traj_infos, itr): + def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.): """Params agent_inputs and itr unused.""" act_ready, obs_ready = self.sync.act_ready, self.sync.obs_ready step = self.step_buffer_np @@ -43,6 +43,11 @@ def collect_batch(self, agent_inputs, traj_infos, itr): agent_buf.action[t] = step.action # OPTIONAL BY SERVER env_buf.reward[t] = step.reward env_buf.done[t] = step.done + if t == 0: + env_buf.discounted_return[t] = step.reward + else: + env_buf.discounted_return[t] = env_buf.discounted_return[t] = (env_buf.discounted_return[t-1]) * \ + (1 - env_buf.done[t-1]) * discount + step.reward if step.agent_info: agent_buf.agent_info[t] = step.agent_info # OPTIONAL BY SERVER obs_ready.release() # Ready for server to use/write step buffer. @@ -67,7 +72,7 @@ def __init__(self, *args, **kwargs): # next batch. self.temp_observation = buffer_method(self.step_buffer_np.observation, "copy") - def collect_batch(self, agent_inputs, traj_infos, itr): + def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.): """Params agent_inputs and itr unused.""" act_ready, obs_ready = self.sync.act_ready, self.sync.obs_ready step = self.step_buffer_np @@ -108,6 +113,10 @@ def collect_batch(self, agent_inputs, traj_infos, itr): agent_buf.action[t] = step.action # OPTIONAL BY SERVER env_buf.reward[t] = step.reward env_buf.done[t] = step.done + if t == 0: + env_buf.discounted_return[t] = r + else: + env_buf.discounted_return[t] = (env_buf.discounted_return[t-1] * (1 - env_buf.done[t-1])) * discount + r if step.agent_info: agent_buf.agent_info[t] = step.agent_info # OPTIONAL BY SERVER obs_ready.release() # Ready for server to use/write step buffer. diff --git a/rlpyt/samplers/serial/sampler.py b/rlpyt/samplers/serial/sampler.py index 0dd69d81..0ea6e2c0 100644 --- a/rlpyt/samplers/serial/sampler.py +++ b/rlpyt/samplers/serial/sampler.py @@ -64,6 +64,7 @@ def initialize( agent=agent, global_B=global_B, env_ranks=env_ranks, # Might get applied redundantly to agent. + discount=self.discount, ) if self.eval_n_envs > 0: # May do evaluation. eval_envs = [self.EnvCls(**self.eval_env_kwargs)