diff --git a/src/garage/torch/modules/__init__.py b/src/garage/torch/modules/__init__.py index c02b7a0254..36f4fbeae6 100644 --- a/src/garage/torch/modules/__init__.py +++ b/src/garage/torch/modules/__init__.py @@ -14,6 +14,8 @@ from garage.torch.modules.discrete_cnn_module import DiscreteCNNModule from garage.torch.modules.discrete_dueling_cnn_module import ( DiscreteDuelingCNNModule) +from garage.torch.modules.gru_module import GRUModule +from garage.torch.modules.categorical_gru_module import CategoricalGRUModule # yapf: enable __all__ = [ @@ -26,4 +28,6 @@ 'GaussianMLPModule', 'GaussianMLPIndependentStdModule', 'GaussianMLPTwoHeadedModule', + 'GRUModule', + 'CategoricalGRUModule', ] diff --git a/src/garage/torch/modules/categorical_gru_module.py b/src/garage/torch/modules/categorical_gru_module.py new file mode 100644 index 0000000000..486efa530a --- /dev/null +++ b/src/garage/torch/modules/categorical_gru_module.py @@ -0,0 +1,93 @@ +"""Categorical GRU Module. + +A model represented by a Categorical distribution +which is parameterized by a Gated Recurrent Unit (GRU) +followed a multilayer perceptron (MLP). +""" +import torch +from torch import nn +from torch.distributions import Categorical + +from garage.torch.modules.gru_module import GRUModule +from garage.torch import global_device + +class CategoricalGRUModule(nn.Module): + """Categorical GRU Model. + A model represented by a Categorical distribution + which is parameterized by a gated recurrent unit (GRU) + followed by a fully-connected layer. + + Args: + input_dim (int): Dimension of the network input. + output_dim (int): Dimension of the network output. + hidden_dim (int): Hidden dimension for GRU cell. + hidden_nonlinearity (callable): Activation function for intermediate + dense layer(s). It should return a torch.Tensor. Set it to + None to maintain a linear activation. + hidden_w_init (callable): Initializer function for the weight + of intermediate dense layer(s). The function should return a + torch.Tensor. + hidden_b_init (callable): Initializer function for the bias + of intermediate dense layer(s). The function should return a + torch.Tensor. + output_nonlinearity (callable): Activation function for output dense + layer. It should return a torch.Tensor. Set it to None to + maintain a linear activation. + output_w_init (callable): Initializer function for the weight + of output dense layer(s). The function should return a + torch.Tensor. + output_b_init (callable): Initializer function for the bias + of output dense layer(s). The function should return a + torch.Tensor. + layer_normalization (bool): Bool for using layer normalization or not. + """ + + def __init__( + self, + input_dim, + output_dim, + hidden_dim, + hidden_nonlinearity=nn.Tanh, + hidden_w_init=nn.init.xavier_uniform_, + hidden_b_init=nn.init.zeros_, + output_nonlinearity=None, + output_w_init=nn.init.xavier_uniform_, + output_b_init=nn.init.zeros_, + layer_normalization=False, + ): + super().__init__() + + self._gru_module = GRUModule( + input_dim, + hidden_dim, + hidden_nonlinearity, + hidden_w_init, + hidden_b_init, + layer_normalization, + ) + + self._linear_layer = nn.Sequential() + hidden_layer = nn.Linear(hidden_dim, output_dim) + output_w_init(hidden_layer.weight) + output_b_init(hidden_layer.bias) + self._linear_layer.add_module("output", hidden_layer) + if output_nonlinearity: + self._linear_layer.add_module( + "output_activation", NonLinearity(output_nonlinearity) + ) + + def forward(self, *inputs): + """Forward method. + + Args: + *inputs: Input to the module. + + Returns: + torch.distributions.Categorical: Policy distribution. + + """ + assert len(inputs) == 1 + gru_output = self._gru_module(inputs[0]) + fc_output = self._linear_layer(gru_output) + dist = Categorical(logits=fc_output.unsqueeze(0)) + return dist diff --git a/src/garage/torch/modules/gru_module.py b/src/garage/torch/modules/gru_module.py new file mode 100644 index 0000000000..da0bb93d4b --- /dev/null +++ b/src/garage/torch/modules/gru_module.py @@ -0,0 +1,79 @@ +"""GRU Module.""" +import copy + +import torch +from torch import nn +from torch.autograd import Variable + +from garage.experiment import deterministic +from garage.torch import global_device, NonLinearity + + +# pytorch v1.6 issue, see https://github.com/pytorch/pytorch/issues/42305 +# pylint: disable=abstract-method +# pylint: disable=unused-argument +class GRUModule(nn.Module): + """Gated Recurrent Unit (GRU) model in pytorch. + + Args: + input_dim (int): Dimension of the network input. + hidden_dim (int): Hidden dimension for GRU cell. + hidden_nonlinearity (callable): Activation function for intermediate + dense layer(s). It should return a torch.Tensor. Set it to + None to maintain a linear activation. + hidden_w_init (callable): Initializer function for the weight + of intermediate dense layer(s). The function should return a + torch.Tensor. + hidden_b_init (callable): Initializer function for the bias + of intermediate dense layer(s). The function should return a + torch.Tensor. + layer_normalization (bool): Bool for using layer normalization or not. + """ + + def __init__( + self, + input_dim, + hidden_dim, + hidden_nonlinearity=nn.Tanh, + hidden_w_init=nn.init.xavier_uniform_, + hidden_b_init=nn.init.zeros_, + layer_normalization=False, + ): + super().__init__() + self._layers = nn.Sequential() + self.hidden_dim = hidden_dim + self._gru_cell = nn.GRUCell(input_dim, hidden_dim) + hidden_w_init(self._gru_cell.weight_ih) + hidden_w_init(self._gru_cell.weight_hh) + hidden_b_init(self._gru_cell.bias_ih) + hidden_b_init(self._gru_cell.bias_hh) + self.hidden_nonlinearity = NonLinearity(hidden_nonlinearity) + + self._layers.add_module("activation", self.hidden_nonlinearity) + if layer_normalization: + self._layers.add_module("layer_normalization", nn.LayerNorm(hidden_dim)) + + # pylint: disable=arguments-differ + def forward(self, input_val): + """Forward method. + + Args: + input_val (torch.Tensor): Input values with (N, *, input_dim) shape. + + Returns: + torch.Tensor: Output values with (N, *, hidden_dim) shape. + + """ + if len(input_val.size()) == 2: + input_val = input_val.unsqueeze(0) + h0 = Variable( + torch.zeros(input_val.size(0), self.hidden_dim)).to(global_device()) + outs = [] + hn = h0 + for seq in range(input_val.size(1)): + hn = self._gru_cell(input_val[:, seq, :], hn) + outs.append(hn) + out = outs[-1].squeeze(dim=1) + out = self._layers(out) + outs = torch.stack(outs) + return out \ No newline at end of file diff --git a/src/garage/torch/policies/__init__.py b/src/garage/torch/policies/__init__.py index c50d46bc2f..68419a91d8 100644 --- a/src/garage/torch/policies/__init__.py +++ b/src/garage/torch/policies/__init__.py @@ -11,6 +11,7 @@ from garage.torch.policies.policy import Policy from garage.torch.policies.tanh_gaussian_mlp_policy import ( TanhGaussianMLPPolicy) +from garage.torch.policies.categorical_gru_policy import CategoricalGRUPolicy __all__ = [ 'CategoricalCNNPolicy', @@ -21,4 +22,5 @@ 'Policy', 'TanhGaussianMLPPolicy', 'ContextConditionedPolicy', + 'CategoricalGRUPolicy', ] diff --git a/src/garage/torch/policies/categorical_gru_policy.py b/src/garage/torch/policies/categorical_gru_policy.py new file mode 100644 index 0000000000..e72fd6af25 --- /dev/null +++ b/src/garage/torch/policies/categorical_gru_policy.py @@ -0,0 +1,177 @@ +"""CategoricalGRUPolicy.""" +import akro +import numpy as np +import torch +from torch import nn + +from garage.torch.modules import CategoricalGRUModule +from garage.torch.policies.stochastic_policy import StochasticPolicy + + +class CategoricalGRUPolicy(StochasticPolicy): + """CategoricalGRUPolicy. + + A policy that contains a GRU and a MLP to make prediction based on + a categorical distribution. + + It only works with akro.Discrete action space. + + Args: + env_spec (EnvSpec): Environment specification. + hidden_dim (int): Hidden dimension for GRU cell. + hidden_nonlinearity (callable): Activation function for intermediate + dense layer(s). It should return a torch.Tensor. Set it to + None to maintain a linear activation. + hidden_w_init (callable): Initializer function for the weight + of intermediate dense layer(s). The function should return a + torch.Tensor. + hidden_b_init (callable): Initializer function for the bias + of intermediate dense layer(s). The function should return a + torch.Tensor. + output_nonlinearity (callable): Activation function for output dense + layer. It should return a torch.Tensor. Set it to None to + maintain a linear activation. + output_w_init (callable): Initializer function for the weight + of output dense layer(s). The function should return a + torch.Tensor. + output_b_init (callable): Initializer function for the bias + of output dense layer(s). The function should return a + torch.Tensor. + state_include_action (bool): Whether the state includes action. + If True, input dimension will be + (observation dimension + action dimension). + layer_normalization (bool): Bool for using layer normalization or not. + name (str): Name of policy. + """ + + def __init__( + self, + env_spec, + hidden_dim=32, + hidden_nonlinearity=nn.Tanh, + hidden_w_init=nn.init.xavier_uniform_, + hidden_b_init=nn.init.zeros_, + output_nonlinearity=None, + output_w_init=nn.init.xavier_uniform_, + output_b_init=nn.init.zeros_, + state_include_action=True, + layer_normalization=False, + name="CategoricalGRUPolicy", + ): + if not isinstance(env_spec.action_space, akro.Discrete): + raise ValueError('CategoricalGRUPolicy only works' + 'with akro.Discrete action space.') + + super().__init__(env_spec, name) + self._env_spec = env_spec + self._obs_dim = env_spec.observation_space.flat_dim + self._action_dim = env_spec.action_space.n + + self._hidden_dim = hidden_dim + self._hidden_nonlinearity = hidden_nonlinearity + self._hidden_w_init = hidden_w_init + self._hidden_b_init = hidden_b_init + self._output_nonlinearity = output_nonlinearity + self._output_w_init = output_w_init + self._output_b_init = output_b_init + self._layer_normalization = layer_normalization + self._state_include_action = state_include_action + + if state_include_action: + self._input_dim = self._obs_dim + self._action_dim + else: + self._input_dim = self._obs_dim + + self._prev_actions = None + + self._module = CategoricalGRUModule( + input_dim=self._input_dim, + output_dim=self._action_dim, + hidden_dim=self._hidden_dim, + hidden_nonlinearity=self._hidden_nonlinearity, + hidden_w_init=self._hidden_w_init, + hidden_b_init=self._hidden_b_init, + output_nonlinearity=self._output_nonlinearity, + output_w_init=self._output_w_init, + output_b_init=self._output_b_init, + layer_normalization=self._layer_normalization, + ) + + def forward(self, observations): + """Compute the action distributions from the observations. + + Args: + observations (torch.Tensor): Batch of observations on default + torch device. + + Returns: + torch.distributions.Distribution: Batch distribution of actions. + dict[str, torch.Tensor]: Additional agent_info, as torch Tensors. + Do not need to be detached, and can be on any device. + """ + dist = self._module(observations) + return dist, {} + + def reset(self, do_resets=None): + """Reset the policy. + + Note: + If `do_resets` is None, it will be by default np.array([True]), + which implies the policy will not be "vectorized", i.e. number of + paralle environments for training data sampling = 1. + + Args: + do_resets (numpy.ndarray): Bool that indicates terminal state(s). + + """ + if do_resets is None: + do_resets = [True] + do_resets = np.asarray(do_resets) + if self._prev_actions is None or len(do_resets) != len( + self._prev_actions): + self._prev_actions = np.zeros( + (len(do_resets), self.action_space.flat_dim)) + self._prev_hiddens = np.zeros((len(do_resets), self._hidden_dim)) + + self._prev_actions[do_resets] = 0. + + def get_actions(self, observations): + """Return multiple actions. + + Args: + observations (numpy.ndarray): Observations. + + Returns: + list[int]: Actions given input observations. + dict(numpy.ndarray): Distribution parameters. + + """ + if self._state_include_action: + assert self._prev_actions is not None + all_input = np.concatenate([observations, self._prev_actions], + axis=-1) + else: + all_input = observations + prev_actions = self._prev_actions + actions, agent_info = super().get_actions(all_input) + self._prev_actions = self.action_space.flatten_n([a.item() for a in actions]) + if self._state_include_action: + agent_info['prev_action'] = np.copy(prev_actions) + return actions, agent_info + + @property + def input_dim(self): + """int: Dimension of the policy input.""" + return self._input_dim + + @property + def env_spec(self): + """Policy environment specification. + + Returns: + garage.EnvSpec: Environment specification. + + """ + return self._env_spec + + diff --git a/tests/garage/torch/modules/test_categorical_gru_module.py b/tests/garage/torch/modules/test_categorical_gru_module.py new file mode 100644 index 0000000000..dbc0d0b1e4 --- /dev/null +++ b/tests/garage/torch/modules/test_categorical_gru_module.py @@ -0,0 +1,33 @@ +"""Test CategoricalGRUModule.""" +import pickle + +import numpy as np +import pytest +import torch +from torch.distributions import Categorical +import torch.nn as nn + +from garage.torch.modules.categorical_gru_module import CategoricalGRUModule + + +class TestCategoricalGRUModule: + def setup_method(self): + self.batch_size = 1 + self.time_step = 1 + self.feature_shape = 2 + self.output_dim = 1 + self.dtype = torch.float32 + + self.input = torch.full( + (self.batch_size, self.time_step, self.feature_shape), 1.) + + def test_dist(self): + model = CategoricalGRUModule(input_dim=self.feature_shape, output_dim=self.output_dim, hidden_dim=1) + dist = model(self.input) + assert isinstance(dist, Categorical) + + @pytest.mark.parametrize('output_dim', [1, 2, 5, 10]) + def test_output_normalized(self, output_dim): + model = CategoricalGRUModule(input_dim=self.feature_shape, output_dim=output_dim, hidden_dim=1) + dist = model(self.input) + assert np.isclose(dist.probs.squeeze().sum().detach().numpy(), 1) \ No newline at end of file diff --git a/tests/garage/torch/policies/test_categorical_gru_policy.py b/tests/garage/torch/policies/test_categorical_gru_policy.py new file mode 100644 index 0000000000..ec77ebfde5 --- /dev/null +++ b/tests/garage/torch/policies/test_categorical_gru_policy.py @@ -0,0 +1,121 @@ +import cloudpickle + +import numpy as np +import pytest +import torch + +from garage.envs import GymEnv +from garage.torch.policies import CategoricalGRUPolicy + +# yapf: disable +from tests.fixtures.envs.dummy import (DummyBoxEnv, + DummyDictEnv, + DummyDiscreteEnv) + +# yapf: enable + + +class TestCategoricalGRUPolicy: + + def test_invalid_env(self): + env = GymEnv(DummyBoxEnv()) + with pytest.raises(ValueError): + CategoricalGRUPolicy(env_spec=env.spec) + + @pytest.mark.parametrize('obs_dim, action_dim, hidden_dim', [ + ((1, ), 1, 4), + ((2, ), 2, 4), + ((1, 1), 1, 4), + ((2, 2), 2, 4), + ]) + def test_get_action_state_include_action(self, obs_dim, action_dim, + hidden_dim): + env = GymEnv(DummyDiscreteEnv(obs_dim=obs_dim, action_dim=action_dim)) + policy = CategoricalGRUPolicy(env_spec=env.spec, + hidden_dim=hidden_dim, + state_include_action=True) + policy.reset() + obs = env.reset()[0] + action, _ = policy.get_action(obs.flatten()) + assert env.action_space.contains(action) + + actions, _ = policy.get_actions([obs.flatten()]) + for action in actions: + assert env.action_space.contains(action) + + @pytest.mark.parametrize('obs_dim, action_dim, hidden_dim, obs_type', [ + ((1, ), 1, 4, 'discrete'), + ((2, ), 2, 4, 'discrete'), + ((1, 1), 1, 4, 'discrete'), + ((2, 2), 2, 4, 'discrete'), + ((1, ), 1, 4, 'dict'), + ]) + def test_get_action(self, obs_dim, action_dim, hidden_dim, obs_type): + assert obs_type in ['discrete', 'dict'] + if obs_type == 'discrete': + env = GymEnv( + DummyDiscreteEnv(obs_dim=obs_dim, action_dim=action_dim)) + else: + env = GymEnv( + DummyDictEnv(obs_space_type='box', act_space_type='discrete')) + policy = CategoricalGRUPolicy(env_spec=env.spec, + hidden_dim=hidden_dim, + state_include_action=False) + policy.reset(do_resets=None) + obs = env.reset()[0] + + if obs_type == 'discrete': + obs = obs.flatten() + + action, _ = policy.get_action(obs) + assert env.action_space.contains(action) + + actions, _ = policy.get_actions([obs]) + for action in actions: + assert env.action_space.contains(action) + + + @pytest.mark.parametrize('obs_dim, action_dim, hidden_dim, obs_type', [ + ((1, ), 1, 4, 'discrete'), + ((2, ), 2, 4, 'discrete'), + ((1, 1), 1, 4, 'discrete'), + ((2, 2), 2, 4, 'discrete'), + ]) + def test_get_actions(self, obs_dim, action_dim, hidden_dim, obs_type): + if obs_type == 'discrete': + env = GymEnv( + DummyDiscreteEnv(obs_dim=obs_dim, action_dim=action_dim)) + else: + env = GymEnv( + DummyDictEnv(obs_space_type='box', act_space_type='discrete')) + policy = CategoricalGRUPolicy(env_spec=env.spec, + hidden_dim=hidden_dim, + state_include_action=False) + env.reset() + obs = env.step(1).observation + actions, _ = policy.get_actions([obs, obs, obs]) + for action in actions: + assert env.action_space.contains(action) + torch_obs = torch.Tensor(obs) + actions, _ = policy.get_actions([torch_obs, torch_obs, torch_obs]) + for action in actions: + assert env.action_space.contains(action) + + # pylint: disable=no-member + def test_is_pickleable(self): + env = GymEnv(DummyDiscreteEnv(obs_dim=(1, ), action_dim=1)) + policy = CategoricalGRUPolicy(env_spec=env.spec, + state_include_action=False) + + env.reset() + obs = env.step(1).observation + + output_action_1, _ = policy.get_action(obs) + + p = cloudpickle.dumps(policy) + policy_pickled = cloudpickle.loads(p) + output_action_2, _ = policy_pickled.get_action(obs) + + assert env.action_space.contains(output_action_1) + assert env.action_space.contains(output_action_2) + assert output_action_1.shape == output_action_2.shape