Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize rewards by standard deviation of discounted return in MuJoCo #149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rlpyt/algos/pg/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
initial_optim_state_dict=None,
gae_lambda=1,
normalize_advantage=False,
normalize_rewards=False,
):
"""Saves the input settings."""
if optim_kwargs is None:
Expand Down Expand Up @@ -84,7 +85,7 @@ def loss(self, samples):
else:
dist_info, value = self.agent(*agent_inputs)
# TODO: try to compute everyone on device.
return_, advantage, valid = self.process_returns(samples)
return_, advantage, valid = self.process_returns(samples, self.normalize_rewards)

dist = self.agent.distribution
logli = dist.log_likelihood(samples.agent.action, dist_info)
Expand Down
22 changes: 17 additions & 5 deletions rlpyt/algos/pg/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@

# import torch
from collections import namedtuple

import torch

from rlpyt.algos.base import RlAlgorithm
from rlpyt.algos.utils import (discount_return, generalized_advantage_estimation,
valid_from_done)
valid_from_done)
from rlpyt.models.running_mean_std import RunningMeanStdModel

# Convention: traj_info fields CamelCase, opt_info fields lowerCamelCase
OptInfo = namedtuple("OptInfo", ["loss", "gradNorm", "entropy", "perplexity"])
Expand Down Expand Up @@ -35,19 +37,29 @@ def initialize(self, agent, n_itr, batch_spec, mid_batch_reset=False,
self.n_itr = n_itr
self.batch_spec = batch_spec
self.mid_batch_reset = mid_batch_reset
self.rets_rms = None

def process_returns(self, samples):
def process_returns(self, samples, normalize_rewards=False):
"""
Compute bootstrapped returns and advantages from a minibatch of
samples. Uses either discounted returns (if ``self.gae_lambda==1``)
or generalized advantage estimation. Mask out invalid samples
according to ``mid_batch_reset`` or for recurrent agent. Optionally,
normalize advantages.
"""
reward, done, value, bv = (samples.env.reward, samples.env.done,
samples.agent.agent_info.value, samples.agent.bootstrap_value)
reward, done, value, bv, discounted_return = (samples.env.reward, samples.env.done,
samples.agent.agent_info.value, samples.agent.bootstrap_value, samples.env.discounted_return)
done = done.type(reward.dtype)

if self.normalize_rewards:
if not self.rets_rms:
self.rets_rms = RunningMeanStdModel(torch.Size([1]))
discounted_return = discounted_return.view(-1)
discounted_return = discounted_return.unsqueeze(1)
self.rets_rms.update(discounted_return)
std_dev = torch.sqrt(self.rets_rms.var)
reward = torch.div(reward, std_dev)

if self.gae_lambda == 1: # GAE reduces to empirical discounted.
return_ = discount_return(reward, done, bv, self.discount)
advantage = return_ - value
Expand Down
3 changes: 2 additions & 1 deletion rlpyt/algos/pg/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
ratio_clip=0.1,
linear_lr_schedule=True,
normalize_advantage=False,
normalize_rewards=False,
):
"""Saves input settings."""
if optim_kwargs is None:
Expand Down Expand Up @@ -72,7 +73,7 @@ def optimize_agent(self, itr, samples):
agent_inputs = buffer_to(agent_inputs, device=self.agent.device)
if hasattr(self.agent, "update_obs_rms"):
self.agent.update_obs_rms(agent_inputs.observation)
return_, advantage, valid = self.process_returns(samples)
return_, advantage, valid = self.process_returns(samples, self.normalize_rewards)
loss_inputs = LossInputs( # So can slice all.
agent_inputs=agent_inputs,
action=samples.agent.action,
Expand Down
2 changes: 2 additions & 0 deletions rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
entropy_loss_coeff=0.0,
value_loss_coeff=0.5,
normalize_advantage=True,
normalize_rewards=True,
),
env=dict(id="Hopper-v3"),
model=dict(normalize_observation=False),
Expand All @@ -24,6 +25,7 @@
batch_T=100,
batch_B=8,
max_decorrelation_steps=1000,
discount=0.99
),
)

Expand Down
6 changes: 4 additions & 2 deletions rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
algo=dict(
discount=0.99,
learning_rate=3e-4,
clip_grad_norm=1e6,
clip_grad_norm=1e3,
entropy_loss_coeff=0.0,
gae_lambda=0.95,
minibatches=32,
epochs=10,
ratio_clip=0.2,
normalize_advantage=True,
linear_lr_schedule=True,
normalize_rewards=True,
# bootstrap_timelimit=False,
),
env=dict(id="Hopper-v3"),
model=dict(normalize_observation=False),
model=dict(normalize_observation=True),
optim=dict(),
runner=dict(
n_steps=1e6,
Expand All @@ -29,6 +30,7 @@
batch_T=2048,
batch_B=1,
max_decorrelation_steps=0,
discount=0.99
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import sys

from rlpyt.utils.launching.affinity import affinity_from_code
from rlpyt.samplers.gpu.parallel_sampler import GpuSampler
from rlpyt.samplers.gpu.collectors import ResetCollector
from rlpyt.samplers.parallel.gpu.sampler import GpuSampler
from rlpyt.samplers.parallel.gpu.collectors import GpuResetCollector
from rlpyt.envs.gym import make as gym_make
from rlpyt.algos.pg.ppo import PPO
from rlpyt.agents.pg.mujoco import MujocoFfAgent
Expand All @@ -23,7 +23,7 @@ def build_and_train(slot_affinity_code, log_dir, run_ID, config_key):
sampler = GpuSampler(
EnvCls=gym_make,
env_kwargs=config["env"],
CollectorCls=ResetCollector,
CollectorCls=GpuResetCollector,
**config["sampler"]
)
algo = PPO(optim_kwargs=config["optim"], **config["algo"])
Expand Down
1 change: 1 addition & 0 deletions rlpyt/runners/minibatch_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def train(self):
def initialize_logging(self):
self._traj_infos = deque(maxlen=self.log_traj_window)
self._new_completed_trajs = 0
logger.log("seed %d" % self.seed)
logger.log(f"Optimizing over {self.log_interval_itrs} iterations.")
super().initialize_logging()
self.pbar = ProgBarCounter(self.log_interval_itrs)
Expand Down
1 change: 1 addition & 0 deletions rlpyt/samplers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
eval_env_kwargs=None,
eval_max_steps=None, # int if using evaluation.
eval_max_trajectories=None, # Optional earlier cutoff.
discount=1., # discount factor for discounted return of trajectories
):
eval_max_steps = None if eval_max_steps is None else int(eval_max_steps)
eval_max_trajectories = (None if eval_max_trajectories is None else
Expand Down
4 changes: 4 additions & 0 deletions rlpyt/samplers/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def build_samples_buffer(agent, env, batch_spec, bootstrap_value=False,

observation = buffer_from_example(examples["observation"], (T, B), env_shared)
all_reward = buffer_from_example(examples["reward"], (T + 1, B), env_shared)
discounted_return = buffer_from_example(examples["discounted_return"], (T, B), env_shared)
reward = all_reward[1:]
prev_reward = all_reward[:-1] # Writing to reward will populate prev_reward.
done = buffer_from_example(examples["done"], (T, B), env_shared)
Expand All @@ -51,6 +52,7 @@ def build_samples_buffer(agent, env, batch_spec, bootstrap_value=False,
prev_reward=prev_reward,
done=done,
env_info=env_info,
discounted_return=discounted_return,
)
samples_np = Samples(agent=agent_buffer, env=env_buffer)
samples_pyt = torchify_buffer(samples_np)
Expand All @@ -65,6 +67,7 @@ def get_example_outputs(agent, env, examples, subprocess=False):
torch.set_num_threads(1) # Some fix to prevent MKL hang.
o = env.reset()
a = env.action_space.sample()
discounted_return = 0.
o, r, d, env_info = env.step(a)
r = np.asarray(r, dtype="float32") # Must match torch float dtype here.
agent.reset()
Expand All @@ -79,3 +82,4 @@ def get_example_outputs(agent, env, examples, subprocess=False):
examples["env_info"] = env_info
examples["action"] = a # OK to put torch tensor here, could numpify.
examples["agent_info"] = agent_info
examples["discounted_return"] = r
2 changes: 1 addition & 1 deletion rlpyt/samplers/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
AgentSamplesBsv = namedarraytuple("AgentSamplesBsv",
["action", "prev_action", "agent_info", "bootstrap_value"])
EnvSamples = namedarraytuple("EnvSamples",
["observation", "reward", "prev_reward", "done", "env_info"])
["observation", "reward", "prev_reward", "done", "env_info", "discounted_return"])


class BatchSpec(namedtuple("BatchSpec", "T B")):
Expand Down
1 change: 1 addition & 0 deletions rlpyt/samplers/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(
step_buffer_np=None,
global_B=1,
env_ranks=None,
discount=1.,
):
save__init__args(locals())

Expand Down
11 changes: 10 additions & 1 deletion rlpyt/samplers/parallel/cpu/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def collect_batch(self, agent_inputs, traj_infos, itr):
env_buf.env_info[t, b] = env_info
agent_buf.action[t] = action
env_buf.reward[t] = reward
if t == 0:
env_buf.discounted_return[t] = reward
else:
env_buf.discounted_return[t] = (env_buf.discounted_return[t-1] * (1 - env_buf.done[t-1])) * self.discount + reward
if agent_info:
agent_buf.agent_info[t] = agent_info

Expand Down Expand Up @@ -92,7 +96,7 @@ def __init__(self, *args, **kwargs):
self.temp_observation = buffer_method(
self.samples_np.env.observation[0, :len(self.envs)], "copy")

def collect_batch(self, agent_inputs, traj_infos, itr):
def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.):
# Numpy arrays can be written to from numpy arrays or torch tensors
# (whereas torch tensors can only be written to from torch tensors).
agent_buf, env_buf = self.samples_np.agent, self.samples_np.env
Expand Down Expand Up @@ -137,6 +141,11 @@ def collect_batch(self, agent_inputs, traj_infos, itr):
agent_buf.action[t] = action
env_buf.reward[t] = reward
env_buf.done[t] = self.done
if t == 0:
env_buf.discounted_return[t]
else:
env_buf.discounted_return[t] = (env_buf.discounted_return[t - 1]) * (
1 - env_buf.done[t - 1]) * discount + reward
if agent_info:
agent_buf.agent_info[t] = agent_info

Expand Down
13 changes: 11 additions & 2 deletions rlpyt/samplers/parallel/gpu/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class GpuResetCollector(DecorrelatingStartCollector):

mid_batch_reset = True

def collect_batch(self, agent_inputs, traj_infos, itr):
def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.):
"""Params agent_inputs and itr unused."""
act_ready, obs_ready = self.sync.act_ready, self.sync.obs_ready
step = self.step_buffer_np
Expand Down Expand Up @@ -43,6 +43,11 @@ def collect_batch(self, agent_inputs, traj_infos, itr):
agent_buf.action[t] = step.action # OPTIONAL BY SERVER
env_buf.reward[t] = step.reward
env_buf.done[t] = step.done
if t == 0:
env_buf.discounted_return[t] = step.reward
else:
env_buf.discounted_return[t] = env_buf.discounted_return[t] = (env_buf.discounted_return[t-1]) * \
(1 - env_buf.done[t-1]) * discount + step.reward
if step.agent_info:
agent_buf.agent_info[t] = step.agent_info # OPTIONAL BY SERVER
obs_ready.release() # Ready for server to use/write step buffer.
Expand All @@ -67,7 +72,7 @@ def __init__(self, *args, **kwargs):
# next batch.
self.temp_observation = buffer_method(self.step_buffer_np.observation, "copy")

def collect_batch(self, agent_inputs, traj_infos, itr):
def collect_batch(self, agent_inputs, traj_infos, itr, discount=1.):
"""Params agent_inputs and itr unused."""
act_ready, obs_ready = self.sync.act_ready, self.sync.obs_ready
step = self.step_buffer_np
Expand Down Expand Up @@ -108,6 +113,10 @@ def collect_batch(self, agent_inputs, traj_infos, itr):
agent_buf.action[t] = step.action # OPTIONAL BY SERVER
env_buf.reward[t] = step.reward
env_buf.done[t] = step.done
if t == 0:
env_buf.discounted_return[t] = r
else:
env_buf.discounted_return[t] = (env_buf.discounted_return[t-1] * (1 - env_buf.done[t-1])) * discount + r
if step.agent_info:
agent_buf.agent_info[t] = step.agent_info # OPTIONAL BY SERVER
obs_ready.release() # Ready for server to use/write step buffer.
Expand Down
1 change: 1 addition & 0 deletions rlpyt/samplers/serial/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def initialize(
agent=agent,
global_B=global_B,
env_ranks=env_ranks, # Might get applied redundantly to agent.
discount=self.discount,
)
if self.eval_n_envs > 0: # May do evaluation.
eval_envs = [self.EnvCls(**self.eval_env_kwargs)
Expand Down