File size: 6,934 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
from typing import Any, Union, List, Optional
import os
import time
import copy
import numpy as np
import gym
from easydict import EasyDict
from ding.envs import BaseEnv, BaseEnvTimestep, EvalEpisodeReturnWrapper
from ding.envs.common.common_function import affine_transform
from ding.torch_utils import to_ndarray, to_list
from ding.utils import ENV_REGISTRY
import evogym.envs
from evogym import WorldObject, sample_robot
from evogym.sim import EvoSim
@ENV_REGISTRY.register('evogym')
class EvoGymEnv(BaseEnv):
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(copy.deepcopy(cls.config))
cfg.cfg_type = cls.__name__ + 'Dict'
return cfg
config = dict(
env_id='Walker-v0',
robot='speed_bot', # refer to 'world data' for more robots configurations
robot_h=5, # only used for random robots
robot_w=5, # only used for random robots
robot_pd=None, # only used for random robots, probability distributions of randomly generated components)
robot_dir="" # only used for defined robots, path to the robot config, env/world_data/my_bot.json
)
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._init_flag = False
self._replay_path = None
if 'robot_dir' not in self._cfg.keys():
self._cfg = '../'
def reset(self) -> np.ndarray:
if not self._init_flag:
self._env = self._make_env()
self._env.observation_space.dtype = np.float32 # To unify the format of envs in DI-engine
self._observation_space = self._env.observation_space
self.num_actuators = self._env.get_actuator_indices('robot').size
# by default actions space is double (float64), create a new space with type of type float (float32)
self._action_space = gym.spaces.Box(low=0.6, high=1.6, shape=(self.num_actuators, ), dtype=np.float32)
self._reward_space = gym.spaces.Box(
low=self._env.reward_range[0], high=self._env.reward_range[1], shape=(1, ), dtype=np.float32
)
self._init_flag = True
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
if self._replay_path is not None:
gym.logger.set_level(gym.logger.DEBUG)
# make render mode compatible with gym
if gym.version.VERSION > '0.22.0':
self._env.metadata.update({'render_modes': ["rgb_array"]})
else:
self._env.metadata.update({'render.modes': ["rgb_array"]})
self._env = gym.wrappers.RecordVideo(
self._env,
video_folder=self._replay_path,
episode_trigger=lambda episode_id: True,
name_prefix='rl-video-{}-{}'.format(id(self), time.time())
)
obs = self._env.reset()
obs = to_ndarray(obs).astype('float32')
return obs
def close(self) -> None:
if self._init_flag:
self._env.close()
self._init_flag = False
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def step(self, action: Union[np.ndarray, list]) -> BaseEnvTimestep:
action = to_ndarray(action).astype(np.float32)
obs, rew, done, info = self._env.step(action)
obs = to_ndarray(obs).astype(np.float32)
rew = to_ndarray([rew]).astype(np.float32)
return BaseEnvTimestep(obs, rew, done, info)
def _make_env(self):
# robot configuration can be read from file or created randomly
if self._cfg.robot in [None, 'random']:
h, w = 5, 5
pd = None
if 'robot_h' in self._cfg.keys():
assert self._cfg.robot_h > 0
h = self._cfg.robot_h
if 'robot_w' in self._cfg.keys():
assert self._cfg.robot_w > 0
w = self._cfg.robot_w
if 'robot_pd' in self._cfg.keys():
assert isinstance(self._cfg.robot_pd, np.ndarray)
assert self._cfg.robot_w > 0
pd = self._cfg.robot_pd
structure = sample_robot((h, w), pd)
else:
structure = self.read_robot_from_file(self._cfg.robot, self._cfg.robot_dir)
env = gym.make(self._cfg.env_id, body=structure[0])
env = EvalEpisodeReturnWrapper(env)
return env
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
def random_action(self) -> np.ndarray:
return self.action_space.sample()
def __repr__(self) -> str:
return "DI-engine EvoGym Env({})".format(self._cfg.env_id)
@staticmethod
def create_collector_env_cfg(cfg: dict) -> List[dict]:
collector_cfg = copy.deepcopy(cfg)
collector_env_num = collector_cfg.pop('collector_env_num', 1)
return [collector_cfg for _ in range(collector_env_num)]
@staticmethod
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
evaluator_cfg = copy.deepcopy(cfg)
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1)
return [evaluator_cfg for _ in range(evaluator_env_num)]
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space
@staticmethod
def read_robot_from_file(file_name, root_dir='../'):
possible_paths = [
os.path.join(file_name),
os.path.join(f'{file_name}.npz'),
os.path.join(f'{file_name}.json'),
os.path.join(root_dir, 'world_data', file_name),
os.path.join(root_dir, 'world_data', f'{file_name}.npz'),
os.path.join(root_dir, 'world_data', f'{file_name}.json'),
]
best_path = None
for path in possible_paths:
if os.path.exists(path):
best_path = path
break
if best_path.endswith('json'):
robot_object = WorldObject.from_json(best_path)
return (robot_object.get_structure(), robot_object.get_connections())
if best_path.endswith('npz'):
structure_data = np.load(best_path)
structure = []
for key, value in structure_data.items():
structure.append(value)
return tuple(structure)
return None
|