zjowowen's picture
init space
079c32c
raw
history blame
12.3 kB
from typing import Any, Union, List
from collections import namedtuple
from easydict import EasyDict
import gym
import copy
import numpy as np
from overcooked_ai_py.mdp.actions import Action, Direction
from overcooked_ai_py.mdp.overcooked_mdp import PlayerState, OvercookedGridworld, OvercookedState, ObjectState, \
SoupState, Recipe
from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv, DEFAULT_ENV_PARAMS
from ding.envs import BaseEnv
from ding.utils import ENV_REGISTRY, deep_merge_dicts
OvercookEnvTimestep = namedtuple('OvercookEnvTimestep', ['obs', 'reward', 'done', 'info'])
# n, s = Direction.NORTH, Direction.SOUTH
# e, w = Direction.EAST, Direction.WEST
# stay, interact = Action.STAY, Action.INTERACT
# Action.ALL_ACTIONS: [n, s, e, w, stay, interact]
@ENV_REGISTRY.register('overcooked')
class OvercookEnv(BaseEnv):
config = EasyDict(
dict(
env_name="cramped_room",
horizon=400,
concat_obs=False,
action_mask=True,
shape_reward=True,
)
)
def __init__(self, cfg) -> None:
self._cfg = deep_merge_dicts(self.config, cfg)
self._env_name = self._cfg.env_name
self._horizon = self._cfg.horizon
self._concat_obs = self._cfg.concat_obs
self._action_mask = self._cfg.action_mask
self._shape_reward = self._cfg.shape_reward
self.mdp = OvercookedGridworld.from_layout_name(self._env_name)
self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0)
# rightnow overcook environment encoding only support 2 agent game
self.agent_num = 2
self.action_dim = len(Action.ALL_ACTIONS)
self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS))
# set up obs shape
featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state)
self.featurize_fn = featurize_fn
dummy_mdp = self.base_env.mdp
dummy_state = dummy_mdp.get_standard_start_state()
obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26)
obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first
if self._concat_obs:
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
else:
obs_shape = (2, ) + obs_shape
self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64)
if self._action_mask:
self.observation_space = gym.spaces.Dict(
{
'agent_state': self.observation_space,
'action_mask': gym.spaces.Box(
low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64
)
}
)
self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
# Note: the real env instance only has a empty close method, only pas
pass
def random_action(self):
return [self.action_space.sample() for _ in range(self.agent_num)]
def step(self, action):
assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action))
agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action]
if self.agent_idx == 0:
joint_action = (agent_action, other_agent_action)
else:
joint_action = (other_agent_action, agent_action)
next_state, reward, done, env_info = self.base_env.step(joint_action)
reward = np.array([float(reward)])
self._eval_episode_return += reward
if self._shape_reward:
self._eval_episode_return += sum(env_info['shaped_r_by_agent'])
reward += sum(env_info['shaped_r_by_agent'])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
env_info["policy_agent_idx"] = self.agent_idx
env_info["eval_episode_return"] = self._eval_episode_return
env_info["other_agent_env_idx"] = 1 - self.agent_idx
action_mask = self.get_action_mask()
if self._action_mask:
obs = {
"agent_state": both_agents_ob,
# "overcooked_state": self.base_env.state,
"action_mask": action_mask
}
else:
obs = both_agents_ob
return OvercookEnvTimestep(obs, reward, done, env_info)
def obs_preprocess(self, obs):
obs = obs.transpose(2, 0, 1)
return obs
def reset(self):
self.base_env.reset()
self._eval_episode_return = 0
self.mdp = self.base_env.mdp
# random init agent index
self.agent_idx = np.random.choice([0, 1])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return obs
def get_available_actions(self):
return self.mdp.get_actions(self.base_env.state)
def get_action_mask(self):
available_actions = self.get_available_actions()
action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64)
for i in range(self.action_dim):
if Action.INDEX_TO_ACTION[i] in available_actions[0]:
action_masks[0][i] = 1
if Action.INDEX_TO_ACTION[i] in available_actions[1]:
action_masks[1][i] = 1
return action_masks
def __repr__(self):
return "DI-engine Overcooked Env"
@ENV_REGISTRY.register('overcooked_game')
class OvercookGameEnv(BaseEnv):
config = EasyDict(
dict(
env_name="cramped_room",
horizon=400,
concat_obs=False,
action_mask=False,
shape_reward=True,
)
)
def __init__(self, cfg) -> None:
self._cfg = deep_merge_dicts(self.config, cfg)
self._env_name = self._cfg.env_name
self._horizon = self._cfg.horizon
self._concat_obs = self._cfg.concat_obs
self._action_mask = self._cfg.action_mask
self._shape_reward = self._cfg.shape_reward
self.mdp = OvercookedGridworld.from_layout_name(self._env_name)
self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0)
# rightnow overcook environment encoding only support 2 agent game
self.agent_num = 2
self.action_dim = len(Action.ALL_ACTIONS)
self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS))
# set up obs shape
featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state)
self.featurize_fn = featurize_fn
dummy_mdp = self.base_env.mdp
dummy_state = dummy_mdp.get_standard_start_state()
obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26)
obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first
if self._concat_obs:
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
else:
obs_shape = (2, ) + obs_shape
self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64)
if self._action_mask:
self.observation_space = gym.spaces.Dict(
{
'agent_state': self.observation_space,
'action_mask': gym.spaces.Box(
low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64
)
}
)
self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
# Note: the real env instance only has a empty close method, only pass
pass
def random_action(self):
return [self.action_space.sample() for _ in range(self.agent_num)]
def step(self, action):
assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action))
agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action]
if self.agent_idx == 0:
joint_action = (agent_action, other_agent_action)
else:
joint_action = (other_agent_action, agent_action)
next_state, reward, done, env_info = self.base_env.step(joint_action)
reward = np.array([float(reward)])
self._eval_episode_return += reward
if self._shape_reward:
self._eval_episode_return += sum(env_info['shaped_r_by_agent'])
reward += sum(env_info['shaped_r_by_agent'])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
env_info["policy_agent_idx"] = self.agent_idx
env_info["eval_episode_return"] = self._eval_episode_return
env_info["other_agent_env_idx"] = 1 - self.agent_idx
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return OvercookEnvTimestep(obs, reward, done, env_info)
def obs_preprocess(self, obs):
obs = obs.transpose(2, 0, 1)
return obs
def reset(self):
self.base_env.reset()
self._eval_episode_return = 0
self.mdp = self.base_env.mdp
# random init agent index
self.agent_idx = np.random.choice([0, 1])
#fix init agent index
self.agent_idx = 0
ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return obs
def get_available_actions(self):
return self.mdp.get_actions(self.base_env.state)
def get_action_mask(self):
available_actions = self.get_available_actions()
action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64)
for i in range(self.action_dim):
if Action.INDEX_TO_ACTION[i] in available_actions[0]:
action_masks[0][i] = 1
if Action.INDEX_TO_ACTION[i] in available_actions[1]:
action_masks[1][i] = 1
return action_masks
def __repr__(self):
return "DI-engine Overcooked GameEnv"