File size: 5,542 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from typing import Union, Optional, Dict, Callable, List
import torch
import torch.nn as nn
from easydict import EasyDict

from ding.torch_utils import get_lstm
from ding.utils import MODEL_REGISTRY, SequenceType, squeeze
from ..common import FCEncoder, ConvEncoder, DiscreteHead, DuelingHead, \
        MultiHead, RegressionHead, ReparameterizationHead, independent_normal_dist


@MODEL_REGISTRY.register('pg')
class PG(nn.Module):
    """
    Overview:
        The neural network and computation graph of algorithms related to Policy Gradient(PG) \
        (https://proceedings.neurips.cc/paper/1999/file/464d828b85b0bed98e80ade0a5c43b0f-Paper.pdf). \
        The PG model is composed of two parts: encoder and head. Encoders are used to extract the feature \
        from various observation. Heads are used to predict corresponding action logit.
    Interface:
        ``__init__``, ``forward``.
    """

    def __init__(
            self,
            obs_shape: Union[int, SequenceType],
            action_shape: Union[int, SequenceType],
            action_space: str = 'discrete',
            encoder_hidden_size_list: SequenceType = [128, 128, 64],
            head_hidden_size: Optional[int] = None,
            head_layer_num: int = 1,
            activation: Optional[nn.Module] = nn.ReLU(),
            norm_type: Optional[str] = None
    ) -> None:
        """
        Overview:
            Initialize the PG model according to corresponding input arguments.
        Arguments:
            - obs_shape (:obj:`Union[int, SequenceType]`): Observation space shape, such as 8 or [4, 84, 84].
            - action_shape (:obj:`Union[int, SequenceType]`): Action space shape, such as 6 or [2, 3, 3].
            - action_space (:obj:`str`): The type of different action spaces, including ['discrete', 'continuous'], \
                then will instantiate corresponding head, including ``DiscreteHead`` and ``ReparameterizationHead``.
            - encoder_hidden_size_list (:obj:`SequenceType`): Collection of ``hidden_size`` to pass to ``Encoder``, \
                the last element must match ``head_hidden_size``.
            - head_hidden_size (:obj:`Optional[int]`): The ``hidden_size`` of ``head`` network, defaults \
                to None, it must match the last element of ``encoder_hidden_size_list``.
            - head_layer_num (:obj:`int`): The num of layers used in the ``head`` network to compute action.
            - activation (:obj:`Optional[nn.Module]`): The type of activation function in networks \
                if ``None`` then default set it to ``nn.ReLU()``.
            - norm_type (:obj:`Optional[str]`): The type of normalization in networks, see \
                ``ding.torch_utils.fc_block`` for more details. you can choose one of ['BN', 'IN', 'SyncBN', 'LN']
        Examples:
            >>> model = PG((4, 84, 84), 5)
            >>> inputs = torch.randn(8, 4, 84, 84)
            >>> outputs = model(inputs)
            >>> assert isinstance(outputs, dict)
            >>> assert outputs['logit'].shape == (8, 5)
            >>> assert outputs['dist'].sample().shape == (8, )
        """
        super(PG, self).__init__()
        # For compatibility: 1, (1, ), [4, 32, 32]
        obs_shape, action_shape = squeeze(obs_shape), squeeze(action_shape)
        if head_hidden_size is None:
            head_hidden_size = encoder_hidden_size_list[-1]
        # FC Encoder
        if isinstance(obs_shape, int) or len(obs_shape) == 1:
            self.encoder = FCEncoder(obs_shape, encoder_hidden_size_list, activation=activation, norm_type=norm_type)
        # Conv Encoder
        elif len(obs_shape) == 3:
            self.encoder = ConvEncoder(obs_shape, encoder_hidden_size_list, activation=activation, norm_type=norm_type)
        else:
            raise RuntimeError(
                "not support obs_shape for pre-defined encoder: {}, please customize your own BC".format(obs_shape)
            )
        self.action_space = action_space
        # Head
        if self.action_space == 'discrete':
            self.head = DiscreteHead(
                head_hidden_size, action_shape, head_layer_num, activation=activation, norm_type=norm_type
            )
        elif self.action_space == 'continuous':
            self.head = ReparameterizationHead(
                head_hidden_size,
                action_shape,
                head_layer_num,
                activation=activation,
                norm_type=norm_type,
                sigma_type='independent'
            )
        else:
            raise KeyError("not support action space: {}".format(self.action_space))

    def forward(self, x: torch.Tensor) -> Dict:
        """
        Overview:
            PG forward computation graph, input observation tensor to predict policy distribution.
        Arguments:
            - x (:obj:`torch.Tensor`): The input observation tensor data.
        Returns:
            - outputs (:obj:`torch.distributions`): The output policy distribution. If action space is \
            discrete, the output is Categorical distribution; if action space is continuous, the output is Normal \
            distribution.
        """
        x = self.encoder(x)
        x = self.head(x)
        if self.action_space == 'discrete':
            x['dist'] = torch.distributions.Categorical(logits=x['logit'])
        elif self.action_space == 'continuous':
            x = {'logit': {'mu': x['mu'], 'sigma': x['sigma']}}
            x['dist'] = independent_normal_dist(x['logit'])
        return x