|
from typing import Optional, Union, List |
|
from ditk import logging |
|
from easydict import EasyDict |
|
from functools import partial |
|
import os |
|
import gym |
|
import gymnasium |
|
import numpy as np |
|
import torch |
|
from ding.framework import task, OnlineRLContext |
|
from ding.framework.middleware import interaction_evaluator_ttorch, PPOFStepCollector, multistep_trainer, CkptSaver, \ |
|
wandb_online_logger, offline_data_saver, termination_checker, ppof_adv_estimator |
|
from ding.envs import BaseEnv, BaseEnvManagerV2, SubprocessEnvManagerV2 |
|
from ding.policy import PPOFPolicy, single_env_forward_wrapper_ttorch |
|
from ding.utils import set_pkg_seed |
|
from ding.utils import get_env_fps, render |
|
from ding.config import save_config_py |
|
from .model import PPOFModel |
|
from .config import get_instance_config, get_instance_env, get_hybrid_shape |
|
from ding.bonus.common import TrainingReturn, EvalReturn |
|
|
|
|
|
class PPOF: |
|
""" |
|
Overview: |
|
Class of agent for training, evaluation and deployment of Reinforcement learning algorithm \ |
|
Proximal Policy Optimization(PPO). |
|
For more information about the system design of RL agent, please refer to \ |
|
<https://di-engine-docs.readthedocs.io/en/latest/03_system/agent.html>. |
|
Interface: |
|
``__init__``, ``train``, ``deploy``, ``collect_data``, ``batch_evaluate``, ``best`` |
|
""" |
|
|
|
supported_env_list = [ |
|
|
|
'LunarLander-v2', |
|
'LunarLanderContinuous-v2', |
|
'BipedalWalker-v3', |
|
'Pendulum-v1', |
|
'acrobot', |
|
|
|
'rocket_landing', |
|
'drone_fly', |
|
'hybrid_moving', |
|
|
|
'evogym_carrier', |
|
'mario', |
|
'di_sheep', |
|
'procgen_bigfish', |
|
|
|
'minigrid_fourroom', |
|
'metadrive', |
|
|
|
'BowlingNoFrameskip-v4', |
|
'BreakoutNoFrameskip-v4', |
|
'GopherNoFrameskip-v4' |
|
'KangarooNoFrameskip-v4', |
|
'PongNoFrameskip-v4', |
|
'QbertNoFrameskip-v4', |
|
'SpaceInvadersNoFrameskip-v4', |
|
|
|
'Hopper-v3', |
|
'HalfCheetah-v3', |
|
'Walker2d-v3', |
|
] |
|
""" |
|
Overview: |
|
List of supported envs. |
|
Examples: |
|
>>> from ding.bonus.ppof import PPOF |
|
>>> print(PPOF.supported_env_list) |
|
""" |
|
|
|
def __init__( |
|
self, |
|
env_id: str = None, |
|
env: BaseEnv = None, |
|
seed: int = 0, |
|
exp_name: str = None, |
|
model: Optional[torch.nn.Module] = None, |
|
cfg: Optional[Union[EasyDict, dict]] = None, |
|
policy_state_dict: str = None |
|
) -> None: |
|
""" |
|
Overview: |
|
Initialize agent for PPO algorithm. |
|
Arguments: |
|
- env_id (:obj:`str`): The environment id, which is a registered environment name in gym or gymnasium. \ |
|
If ``env_id`` is not specified, ``env_id`` in ``cfg`` must be specified. \ |
|
If ``env_id`` is specified, ``env_id`` in ``cfg`` will be ignored. \ |
|
``env_id`` should be one of the supported envs, which can be found in ``PPOF.supported_env_list``. |
|
- env (:obj:`BaseEnv`): The environment instance for training and evaluation. \ |
|
If ``env`` is not specified, ``env_id`` or ``cfg.env_id`` must be specified. \ |
|
``env_id`` or ``cfg.env_id`` will be used to create environment instance. \ |
|
If ``env`` is specified, ``env_id`` and ``cfg.env_id`` will be ignored. |
|
- seed (:obj:`int`): The random seed, which is set before running the program. \ |
|
Default to 0. |
|
- exp_name (:obj:`str`): The name of this experiment, which will be used to create the folder to save \ |
|
log data. Default to None. If not specified, the folder name will be ``env_id``-``algorithm``. |
|
- model (:obj:`torch.nn.Module`): The model of PPO algorithm, which should be an instance of class \ |
|
``ding.model.PPOFModel``. \ |
|
If not specified, a default model will be generated according to the configuration. |
|
- cfg (:obj:`Union[EasyDict, dict]`): The configuration of PPO algorithm, which is a dict. \ |
|
Default to None. If not specified, the default configuration will be used. |
|
- policy_state_dict (:obj:`str`): The path of policy state dict saved by PyTorch a in local file. \ |
|
If specified, the policy will be loaded from this file. Default to None. |
|
|
|
.. note:: |
|
An RL Agent Instance can be initialized in two basic ways. \ |
|
For example, we have an environment with id ``LunarLander-v2`` registered in gym, \ |
|
and we want to train an agent with PPO algorithm with default configuration. \ |
|
Then we can initialize the agent in the following ways: |
|
>>> agent = PPOF(env_id='LunarLander-v2') |
|
or, if we want can specify the env_id in the configuration: |
|
>>> cfg = {'env': {'env_id': 'LunarLander-v2'}, 'policy': ...... } |
|
>>> agent = PPOF(cfg=cfg) |
|
There are also other arguments to specify the agent when initializing. |
|
For example, if we want to specify the environment instance: |
|
>>> env = CustomizedEnv('LunarLander-v2') |
|
>>> agent = PPOF(cfg=cfg, env=env) |
|
or, if we want to specify the model: |
|
>>> model = VAC(**cfg.policy.model) |
|
>>> agent = PPOF(cfg=cfg, model=model) |
|
or, if we want to reload the policy from a saved policy state dict: |
|
>>> agent = PPOF(cfg=cfg, policy_state_dict='LunarLander-v2.pth.tar') |
|
Make sure that the configuration is consistent with the saved policy state dict. |
|
""" |
|
|
|
assert env_id is not None or cfg is not None, "Please specify env_id or cfg." |
|
|
|
if cfg is not None and not isinstance(cfg, EasyDict): |
|
cfg = EasyDict(cfg) |
|
|
|
if env_id is not None: |
|
assert env_id in PPOF.supported_env_list, "Please use supported envs: {}".format(PPOF.supported_env_list) |
|
if cfg is None: |
|
cfg = get_instance_config(env_id, algorithm="PPOF") |
|
|
|
if not hasattr(cfg, "env_id"): |
|
cfg.env_id = env_id |
|
assert cfg.env_id == env_id, "env_id in cfg should be the same as env_id in args." |
|
else: |
|
assert hasattr(cfg, "env_id"), "Please specify env_id in cfg." |
|
assert cfg.env_id in PPOF.supported_env_list, "Please use supported envs: {}".format( |
|
PPOF.supported_env_list |
|
) |
|
|
|
if exp_name is not None: |
|
cfg.exp_name = exp_name |
|
elif not hasattr(cfg, "exp_name"): |
|
cfg.exp_name = "{}-{}".format(cfg.env_id, "PPO") |
|
self.cfg = cfg |
|
self.exp_name = self.cfg.exp_name |
|
|
|
if env is None: |
|
self.env = get_instance_env(self.cfg.env_id) |
|
else: |
|
self.env = env |
|
|
|
logging.getLogger().setLevel(logging.INFO) |
|
self.seed = seed |
|
set_pkg_seed(self.seed, use_cuda=self.cfg.cuda) |
|
|
|
if not os.path.exists(self.exp_name): |
|
os.makedirs(self.exp_name) |
|
save_config_py(self.cfg, os.path.join(self.exp_name, 'policy_config.py')) |
|
|
|
action_space = self.env.action_space |
|
if isinstance(action_space, (gym.spaces.Discrete, gymnasium.spaces.Discrete)): |
|
action_shape = int(action_space.n) |
|
elif isinstance(action_space, (gym.spaces.Tuple, gymnasium.spaces.Tuple)): |
|
action_shape = get_hybrid_shape(action_space) |
|
else: |
|
action_shape = action_space.shape |
|
|
|
|
|
assert self.cfg.value_norm in ['popart', 'value_rescale', 'symlog', 'baseline'] |
|
if model is None: |
|
if self.cfg.value_norm != 'popart': |
|
model = PPOFModel( |
|
self.env.observation_space.shape, |
|
action_shape, |
|
action_space=self.cfg.action_space, |
|
**self.cfg.model |
|
) |
|
else: |
|
model = PPOFModel( |
|
self.env.observation_space.shape, |
|
action_shape, |
|
action_space=self.cfg.action_space, |
|
popart_head=True, |
|
**self.cfg.model |
|
) |
|
self.policy = PPOFPolicy(self.cfg, model=model) |
|
if policy_state_dict is not None: |
|
self.policy.load_state_dict(policy_state_dict) |
|
self.checkpoint_save_dir = os.path.join(self.exp_name, "ckpt") |
|
|
|
def train( |
|
self, |
|
step: int = int(1e7), |
|
collector_env_num: int = 4, |
|
evaluator_env_num: int = 4, |
|
n_iter_log_show: int = 500, |
|
n_iter_save_ckpt: int = 1000, |
|
context: Optional[str] = None, |
|
reward_model: Optional[str] = None, |
|
debug: bool = False, |
|
wandb_sweep: bool = False, |
|
) -> TrainingReturn: |
|
""" |
|
Overview: |
|
Train the agent with PPO algorithm for ``step`` iterations with ``collector_env_num`` collector \ |
|
environments and ``evaluator_env_num`` evaluator environments. Information during training will be \ |
|
recorded and saved by wandb. |
|
Arguments: |
|
- step (:obj:`int`): The total training environment steps of all collector environments. Default to 1e7. |
|
- collector_env_num (:obj:`int`): The number of collector environments. Default to 4. |
|
- evaluator_env_num (:obj:`int`): The number of evaluator environments. Default to 4. |
|
- n_iter_log_show (:obj:`int`): The frequency of logging every training iteration. Default to 500. |
|
- n_iter_save_ckpt (:obj:`int`): The frequency of saving checkpoint every training iteration. \ |
|
Default to 1000. |
|
- context (:obj:`str`): The multi-process context of the environment manager. Default to None. \ |
|
It can be specified as ``spawn``, ``fork`` or ``forkserver``. |
|
- reward_model (:obj:`str`): The reward model name. Default to None. This argument is not supported yet. |
|
- debug (:obj:`bool`): Whether to use debug mode in the environment manager. Default to False. \ |
|
If set True, base environment manager will be used for easy debugging. Otherwise, \ |
|
subprocess environment manager will be used. |
|
- wandb_sweep (:obj:`bool`): Whether to use wandb sweep, \ |
|
which is a hyper-parameter optimization process for seeking the best configurations. \ |
|
Default to False. If True, the wandb sweep id will be used as the experiment name. |
|
Returns: |
|
- (:obj:`TrainingReturn`): The training result, of which the attributions are: |
|
- wandb_url (:obj:`str`): The weight & biases (wandb) project url of the trainning experiment. |
|
""" |
|
|
|
if debug: |
|
logging.getLogger().setLevel(logging.DEBUG) |
|
logging.debug(self.policy._model) |
|
|
|
collector_env = self._setup_env_manager(collector_env_num, context, debug, 'collector') |
|
evaluator_env = self._setup_env_manager(evaluator_env_num, context, debug, 'evaluator') |
|
|
|
if reward_model is not None: |
|
|
|
pass |
|
|
|
with task.start(ctx=OnlineRLContext()): |
|
task.use(interaction_evaluator_ttorch(self.seed, self.policy, evaluator_env)) |
|
task.use(CkptSaver(self.policy, save_dir=self.checkpoint_save_dir, train_freq=n_iter_save_ckpt)) |
|
task.use(PPOFStepCollector(self.seed, self.policy, collector_env, self.cfg.n_sample)) |
|
task.use(ppof_adv_estimator(self.policy)) |
|
task.use(multistep_trainer(self.policy, log_freq=n_iter_log_show)) |
|
task.use( |
|
wandb_online_logger( |
|
metric_list=self.policy.monitor_vars(), |
|
model=self.policy._model, |
|
anonymous=True, |
|
project_name=self.exp_name, |
|
wandb_sweep=wandb_sweep, |
|
) |
|
) |
|
task.use(termination_checker(max_env_step=step)) |
|
task.run() |
|
|
|
return TrainingReturn(wandb_url=task.ctx.wandb_url) |
|
|
|
def deploy( |
|
self, |
|
enable_save_replay: bool = False, |
|
concatenate_all_replay: bool = False, |
|
replay_save_path: str = None, |
|
seed: Optional[Union[int, List]] = None, |
|
debug: bool = False |
|
) -> EvalReturn: |
|
""" |
|
Overview: |
|
Deploy the agent with PPO algorithm by interacting with the environment, during which the replay video \ |
|
can be saved if ``enable_save_replay`` is True. The evaluation result will be returned. |
|
Arguments: |
|
- enable_save_replay (:obj:`bool`): Whether to save the replay video. Default to False. |
|
- concatenate_all_replay (:obj:`bool`): Whether to concatenate all replay videos into one video. \ |
|
Default to False. If ``enable_save_replay`` is False, this argument will be ignored. \ |
|
If ``enable_save_replay`` is True and ``concatenate_all_replay`` is False, \ |
|
the replay video of each episode will be saved separately. |
|
- replay_save_path (:obj:`str`): The path to save the replay video. Default to None. \ |
|
If not specified, the video will be saved in ``exp_name/videos``. |
|
- seed (:obj:`Union[int, List]`): The random seed, which is set before running the program. \ |
|
Default to None. If not specified, ``self.seed`` will be used. \ |
|
If ``seed`` is an integer, the agent will be deployed once. \ |
|
If ``seed`` is a list of integers, the agent will be deployed once for each seed in the list. |
|
- debug (:obj:`bool`): Whether to use debug mode in the environment manager. Default to False. \ |
|
If set True, base environment manager will be used for easy debugging. Otherwise, \ |
|
subprocess environment manager will be used. |
|
Returns: |
|
- (:obj:`EvalReturn`): The evaluation result, of which the attributions are: |
|
- eval_value (:obj:`np.float32`): The mean of evaluation return. |
|
- eval_value_std (:obj:`np.float32`): The standard deviation of evaluation return. |
|
""" |
|
|
|
if debug: |
|
logging.getLogger().setLevel(logging.DEBUG) |
|
|
|
env = self.env.clone(caller='evaluator') |
|
|
|
if seed is not None and isinstance(seed, int): |
|
seeds = [seed] |
|
elif seed is not None and isinstance(seed, list): |
|
seeds = seed |
|
else: |
|
seeds = [self.seed] |
|
|
|
returns = [] |
|
images = [] |
|
if enable_save_replay: |
|
replay_save_path = os.path.join(self.exp_name, 'videos') if replay_save_path is None else replay_save_path |
|
env.enable_save_replay(replay_path=replay_save_path) |
|
else: |
|
logging.warning('No video would be generated during the deploy.') |
|
if concatenate_all_replay: |
|
logging.warning('concatenate_all_replay is set to False because enable_save_replay is False.') |
|
concatenate_all_replay = False |
|
|
|
forward_fn = single_env_forward_wrapper_ttorch(self.policy.eval, self.cfg.cuda) |
|
|
|
|
|
|
|
env.reset() |
|
|
|
for seed in seeds: |
|
env.seed(seed, dynamic_seed=False) |
|
return_ = 0. |
|
step = 0 |
|
obs = env.reset() |
|
images.append(render(env)[None]) if concatenate_all_replay else None |
|
while True: |
|
action = forward_fn(obs) |
|
obs, rew, done, info = env.step(action) |
|
images.append(render(env)[None]) if concatenate_all_replay else None |
|
return_ += rew |
|
step += 1 |
|
if done: |
|
break |
|
logging.info(f'DQN deploy is finished, final episode return with {step} steps is: {return_}') |
|
returns.append(return_) |
|
|
|
env.close() |
|
|
|
if concatenate_all_replay: |
|
images = np.concatenate(images, axis=0) |
|
import imageio |
|
imageio.mimwrite(os.path.join(replay_save_path, 'deploy.mp4'), images, fps=get_env_fps(env)) |
|
|
|
return EvalReturn(eval_value=np.mean(returns), eval_value_std=np.std(returns)) |
|
|
|
def collect_data( |
|
self, |
|
env_num: int = 8, |
|
save_data_path: Optional[str] = None, |
|
n_sample: Optional[int] = None, |
|
n_episode: Optional[int] = None, |
|
context: Optional[str] = None, |
|
debug: bool = False |
|
) -> None: |
|
""" |
|
Overview: |
|
Collect data with PPO algorithm for ``n_episode`` episodes with ``env_num`` collector environments. \ |
|
The collected data will be saved in ``save_data_path`` if specified, otherwise it will be saved in \ |
|
``exp_name/demo_data``. |
|
Arguments: |
|
- env_num (:obj:`int`): The number of collector environments. Default to 8. |
|
- save_data_path (:obj:`str`): The path to save the collected data. Default to None. \ |
|
If not specified, the data will be saved in ``exp_name/demo_data``. |
|
- n_sample (:obj:`int`): The number of samples to collect. Default to None. \ |
|
If not specified, ``n_episode`` must be specified. |
|
- n_episode (:obj:`int`): The number of episodes to collect. Default to None. \ |
|
If not specified, ``n_sample`` must be specified. |
|
- context (:obj:`str`): The multi-process context of the environment manager. Default to None. \ |
|
It can be specified as ``spawn``, ``fork`` or ``forkserver``. |
|
- debug (:obj:`bool`): Whether to use debug mode in the environment manager. Default to False. \ |
|
If set True, base environment manager will be used for easy debugging. Otherwise, \ |
|
subprocess environment manager will be used. |
|
""" |
|
|
|
if debug: |
|
logging.getLogger().setLevel(logging.DEBUG) |
|
if n_episode is not None: |
|
raise NotImplementedError |
|
|
|
env = self._setup_env_manager(env_num, context, debug, 'collector') |
|
if save_data_path is None: |
|
save_data_path = os.path.join(self.exp_name, 'demo_data') |
|
|
|
|
|
with task.start(ctx=OnlineRLContext()): |
|
task.use(PPOFStepCollector(self.seed, self.policy, env, n_sample)) |
|
task.use(offline_data_saver(save_data_path, data_type='hdf5')) |
|
task.run(max_step=1) |
|
logging.info( |
|
f'PPOF collecting is finished, more than {n_sample} samples are collected and saved in `{save_data_path}`' |
|
) |
|
|
|
def batch_evaluate( |
|
self, |
|
env_num: int = 4, |
|
n_evaluator_episode: int = 4, |
|
context: Optional[str] = None, |
|
debug: bool = False, |
|
) -> EvalReturn: |
|
""" |
|
Overview: |
|
Evaluate the agent with PPO algorithm for ``n_evaluator_episode`` episodes with ``env_num`` evaluator \ |
|
environments. The evaluation result will be returned. |
|
The difference between methods ``batch_evaluate`` and ``deploy`` is that ``batch_evaluate`` will create \ |
|
multiple evaluator environments to evaluate the agent to get an average performance, while ``deploy`` \ |
|
will only create one evaluator environment to evaluate the agent and save the replay video. |
|
Arguments: |
|
- env_num (:obj:`int`): The number of evaluator environments. Default to 4. |
|
- n_evaluator_episode (:obj:`int`): The number of episodes to evaluate. Default to 4. |
|
- context (:obj:`str`): The multi-process context of the environment manager. Default to None. \ |
|
It can be specified as ``spawn``, ``fork`` or ``forkserver``. |
|
- debug (:obj:`bool`): Whether to use debug mode in the environment manager. Default to False. \ |
|
If set True, base environment manager will be used for easy debugging. Otherwise, \ |
|
subprocess environment manager will be used. |
|
Returns: |
|
- (:obj:`EvalReturn`): The evaluation result, of which the attributions are: |
|
- eval_value (:obj:`np.float32`): The mean of evaluation return. |
|
- eval_value_std (:obj:`np.float32`): The standard deviation of evaluation return. |
|
""" |
|
|
|
if debug: |
|
logging.getLogger().setLevel(logging.DEBUG) |
|
|
|
env = self._setup_env_manager(env_num, context, debug, 'evaluator') |
|
|
|
|
|
|
|
env.launch() |
|
env.reset() |
|
|
|
|
|
with task.start(ctx=OnlineRLContext()): |
|
task.use(interaction_evaluator_ttorch( |
|
self.seed, |
|
self.policy, |
|
env, |
|
n_evaluator_episode, |
|
)) |
|
task.run(max_step=1) |
|
|
|
return EvalReturn(eval_value=task.ctx.eval_value, eval_value_std=task.ctx.eval_value_std) |
|
|
|
def _setup_env_manager( |
|
self, |
|
env_num: int, |
|
context: Optional[str] = None, |
|
debug: bool = False, |
|
caller: str = 'collector' |
|
) -> BaseEnvManagerV2: |
|
""" |
|
Overview: |
|
Setup the environment manager. The environment manager is used to manage multiple environments. |
|
Arguments: |
|
- env_num (:obj:`int`): The number of environments. |
|
- context (:obj:`str`): The multi-process context of the environment manager. Default to None. \ |
|
It can be specified as ``spawn``, ``fork`` or ``forkserver``. |
|
- debug (:obj:`bool`): Whether to use debug mode in the environment manager. Default to False. \ |
|
If set True, base environment manager will be used for easy debugging. Otherwise, \ |
|
subprocess environment manager will be used. |
|
- caller (:obj:`str`): The caller of the environment manager. Default to 'collector'. |
|
Returns: |
|
- (:obj:`BaseEnvManagerV2`): The environment manager. |
|
""" |
|
assert caller in ['evaluator', 'collector'] |
|
if debug: |
|
env_cls = BaseEnvManagerV2 |
|
manager_cfg = env_cls.default_config() |
|
else: |
|
env_cls = SubprocessEnvManagerV2 |
|
manager_cfg = env_cls.default_config() |
|
if context is not None: |
|
manager_cfg.context = context |
|
return env_cls([partial(self.env.clone, caller) for _ in range(env_num)], manager_cfg) |
|
|
|
@property |
|
def best(self) -> 'PPOF': |
|
""" |
|
Overview: |
|
Load the best model from the checkpoint directory, \ |
|
which is by default in folder ``exp_name/ckpt/eval.pth.tar``. \ |
|
The return value is the agent with the best model. |
|
Returns: |
|
- (:obj:`PPOF`): The agent with the best model. |
|
Examples: |
|
>>> agent = PPOF(env_id='LunarLander-v2') |
|
>>> agent.train() |
|
>>> agent = agent.best() |
|
|
|
.. note:: |
|
The best model is the model with the highest evaluation return. If this method is called, the current \ |
|
model will be replaced by the best model. |
|
""" |
|
|
|
best_model_file_path = os.path.join(self.checkpoint_save_dir, "eval.pth.tar") |
|
|
|
if os.path.exists(best_model_file_path): |
|
policy_state_dict = torch.load(best_model_file_path, map_location=torch.device("cpu")) |
|
self.policy.learn_mode.load_state_dict(policy_state_dict) |
|
return self |
|
|