File size: 5,947 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import copy
import os
from typing import Dict, Optional
import gym
import gym_hybrid
import matplotlib.pyplot as plt
import numpy as np
from easydict import EasyDict
from matplotlib import animation
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.envs.common import affine_transform
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
@ENV_REGISTRY.register('gym_hybrid')
class GymHybridEnv(BaseEnv):
default_env_id = ['Sliding-v0', 'Moving-v0', 'HardMove-v0']
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(copy.deepcopy(cls.config))
cfg.cfg_type = cls.__name__ + 'Dict'
return cfg
config = dict(
env_id='Moving-v0',
act_scale=True,
)
def __init__(self, cfg: EasyDict) -> None:
self._cfg = cfg
self._env_id = cfg.env_id
assert self._env_id in self.default_env_id
self._act_scale = cfg.act_scale
self._replay_path = None
self._save_replay = False
self._save_replay_count = 0
self._init_flag = False
def reset(self) -> np.ndarray:
if not self._init_flag:
if self._env_id == 'HardMove-v0':
self._env = gym.make(self._env_id, num_actuators=self._cfg.num_actuators)
else:
self._env = gym.make(self._env_id)
self._observation_space = self._env.observation_space
self._action_space = self._env.action_space
self._reward_space = gym.spaces.Box(
low=self._env.reward_range[0], high=self._env.reward_range[1], shape=(1, ), dtype=np.float32
)
self._init_flag = True
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
self._eval_episode_return = 0
obs = self._env.reset()
obs = to_ndarray(obs).astype(np.float32)
return obs
def close(self) -> None:
if self._init_flag:
self._env.close()
self._init_flag = False
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def step(self, action: Dict) -> BaseEnvTimestep:
if self._act_scale:
if self._env_id == 'HardMove-v0':
action = [
action['action_type'], [affine_transform(i, min_val=-1, max_val=1) for i in action['action_args']]
]
else:
# acceleration_value.
action['action_args'][0] = affine_transform(action['action_args'][0], min_val=0, max_val=1)
# rotation_value. Following line can be omitted, because in the affine_transform function,
# we have already done the clip(-1,1) operation
action['action_args'][1] = affine_transform(action['action_args'][1], min_val=-1, max_val=1)
action = [action['action_type'], action['action_args']]
if self._save_replay:
self._frames.append(self._env.render(mode='rgb_array'))
obs, rew, done, info = self._env.step(action)
obs = to_ndarray(obs)
if isinstance(obs, list): # corner case
for i in range(len(obs)):
if len(obs[i].shape) == 0:
obs[i] = np.array([obs[i]])
obs = np.concatenate(obs)
assert isinstance(obs, np.ndarray) and obs.shape == (10, )
obs = obs.astype(np.float32)
rew = to_ndarray([rew]) # wrapped to be transferred to a numpy array with shape (1,)
if isinstance(rew, list):
rew = rew[0]
assert isinstance(rew, np.ndarray) and rew.shape == (1, )
self._eval_episode_return += rew.item()
if done:
info['eval_episode_return'] = self._eval_episode_return
if self._save_replay:
if self._env_id == 'HardMove-v0':
self._env_id = f'hardmove_n{self._cfg.num_actuators}'
path = os.path.join(
self._replay_path, '{}_episode_{}.gif'.format(self._env_id, self._save_replay_count)
)
self.display_frames_as_gif(self._frames, path)
self._frames = []
self._save_replay_count += 1
info['action_args_mask'] = np.array([[1, 0], [0, 1], [0, 0]])
return BaseEnvTimestep(obs, rew, done, info)
def random_action(self) -> Dict:
# action_type: 0, 1, 2
# action_args:
# - acceleration_value: [0, 1]
# - rotation_value: [-1, 1]
raw_action = self._action_space.sample()
return {'action_type': raw_action[0], 'action_args': raw_action[1]}
def __repr__(self) -> str:
return "DI-engine gym hybrid Env"
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
self._save_replay = True
self._save_replay_count = 0
self._frames = []
@staticmethod
def display_frames_as_gif(frames: list, path: str) -> None:
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)
anim.save(path, writer='imagemagick', fps=20)
|