|
import copy |
|
from collections import namedtuple |
|
from typing import Any, List, Union |
|
|
|
import gfootball |
|
import gfootball.env as football_env |
|
import numpy as np |
|
from ding.envs import BaseEnv, BaseEnvTimestep, BaseEnvInfo |
|
from ding.envs.common.env_element import EnvElement, EnvElementInfo |
|
from ding.torch_utils import to_ndarray |
|
from ding.utils import ENV_REGISTRY |
|
from dizoo.gfootball.envs.obs.encoder import FeatureEncoder |
|
from dizoo.gfootball.envs.obs.gfootball_obs import FullObs |
|
from dizoo.gfootball.envs.action.gfootball_action import GfootballSpAction |
|
|
|
|
|
@ENV_REGISTRY.register('gfootball_sp') |
|
class GfootballEnv(BaseEnv): |
|
|
|
timestep = namedtuple('GfootballTimestep', ['obs', 'reward', 'done', 'info']) |
|
info_template = namedtuple('GFootballEnvInfo', ['obs_space', 'act_space', 'rew_space']) |
|
|
|
def __init__(self, cfg: dict) -> None: |
|
self._cfg = cfg |
|
self.save_replay = self._cfg.save_replay |
|
|
|
self.gui = self._cfg.render |
|
self._obs_helper = FullObs(cfg) |
|
self._action_helper = GfootballSpAction(cfg) |
|
self._launch_env_flag = False |
|
self._encoder = FeatureEncoder() |
|
self.is_evaluator = self._cfg.get("is_evaluator", False) |
|
if self.is_evaluator: |
|
self.env_name = "11_vs_11_hard_stochastic" |
|
self.right_role_num = 0 |
|
else: |
|
self.env_name = "11_vs_11_kaggle" |
|
self.right_role_num = 1 |
|
|
|
def _make_env(self): |
|
self._env = football_env.create_environment( |
|
env_name=self.env_name, |
|
representation='raw', |
|
stacked=False, |
|
logdir='/tmp/football', |
|
write_goal_dumps=False, |
|
write_full_episode_dumps=self.save_replay, |
|
write_video=self.save_replay, |
|
render=self.gui, |
|
number_of_right_players_agent_controls=self.right_role_num |
|
) |
|
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed: |
|
np_seed = 100 * np.random.randint(1, 1000) |
|
self._env.seed(self._seed + np_seed) |
|
elif hasattr(self, '_seed'): |
|
self._env.seed(self._seed) |
|
self._launch_env_flag = True |
|
if self.is_evaluator: |
|
self._eval_episode_return = [0, 0] |
|
else: |
|
self._eval_episode_return = [0, 0] |
|
|
|
def reset(self) -> np.ndarray: |
|
if not self._launch_env_flag: |
|
self._make_env() |
|
self._init_flag = True |
|
self._env.reset() |
|
obs = self._env.observation() |
|
if self.is_evaluator: |
|
self._prev_obs = obs[0] |
|
obs = self._encoder.encode(obs[0]) |
|
return [obs, obs] |
|
else: |
|
self._prev_obs, self.prev_obs_opponent = obs |
|
obs_ = self._encoder.encode(obs[0]) |
|
obs_opponent = self._encoder.encode(obs[1]) |
|
return [obs_, obs_opponent] |
|
|
|
def close(self) -> None: |
|
if self._launch_env_flag: |
|
self._env.close() |
|
self._launch_env_flag = False |
|
|
|
def seed(self, seed: int, dynamic_seed: int = None) -> None: |
|
self._seed = seed |
|
if dynamic_seed: |
|
self._dynamic_seed = dynamic_seed |
|
|
|
def step(self, action) -> 'GfootballEnv.timestep': |
|
action = to_ndarray(action) |
|
|
|
raw_obs, raw_rew, done, info = self._env.step(action) |
|
if self.is_evaluator: |
|
raw_obs = raw_obs[0] |
|
rew = GfootballEnv.calc_reward(raw_rew, self._prev_obs, raw_obs) |
|
obs = to_ndarray(self._encoder.encode(raw_obs)) |
|
rew = [rew, rew] |
|
obs = [obs, obs] |
|
self._eval_episode_return[0] += raw_rew |
|
self._eval_episode_return[1] += raw_rew |
|
else: |
|
rew = GfootballEnv.calc_reward(raw_rew[0], self._prev_obs, raw_obs[0]) |
|
rew_oppo = GfootballEnv.calc_reward(raw_rew[1], self._prev_obs, raw_obs[1]) |
|
rew = [rew, rew_oppo] |
|
obs = [to_ndarray(self._encoder.encode(raw_obs[0])), to_ndarray(self._encoder.encode(raw_obs[1]))] |
|
self._eval_episode_return[0] += raw_rew[0] |
|
self._eval_episode_return[1] += raw_rew[1] |
|
|
|
if done: |
|
if self.is_evaluator: |
|
info['eval_episode_return'] = self._eval_episode_return |
|
else: |
|
info[0]['eval_episode_return'] = self._eval_episode_return[0] |
|
info[1]['eval_episode_return'] = self._eval_episode_return[1] |
|
|
|
return BaseEnvTimestep(obs, rew, done, info) |
|
|
|
def info(self) -> BaseEnvInfo: |
|
info_data = { |
|
'obs_space': self._obs_helper.info, |
|
'act_space': self._action_helper.info, |
|
'rew_space': EnvElementInfo( |
|
shape=1, |
|
value={ |
|
'min': np.float64("-inf"), |
|
'max': np.float64("inf"), |
|
'dtype': np.float32 |
|
}, |
|
), |
|
} |
|
return GfootballEnv.info_template(**info_data) |
|
|
|
def __repr__(self) -> str: |
|
return "DI-engine Gfootball Env({})".format(self.env_name) |
|
|
|
@staticmethod |
|
def calc_reward(rew, prev_obs, obs): |
|
""" |
|
Reward disign referred to [football-pairs](https://github.com/seungeunrho/football-paris/blob/main/rewarders/rewarder_basic.py) |
|
""" |
|
ball_x, ball_y, ball_z = obs['ball'] |
|
MIDDLE_X, PENALTY_X, END_X = 0.2, 0.64, 1.0 |
|
PENALTY_Y, END_Y = 0.27, 0.42 |
|
|
|
ball_position_r = 0.0 |
|
if (-END_X <= ball_x and ball_x < -PENALTY_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y): |
|
ball_position_r = -2.0 |
|
elif (-END_X <= ball_x and ball_x < -MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y): |
|
ball_position_r = -1.0 |
|
elif (-MIDDLE_X <= ball_x and ball_x <= MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y): |
|
ball_position_r = 0.0 |
|
elif (PENALTY_X < ball_x and ball_x <= END_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y): |
|
ball_position_r = 2.0 |
|
elif (MIDDLE_X < ball_x and ball_x <= END_X) and (-END_Y < ball_y and ball_y < END_Y): |
|
ball_position_r = 1.0 |
|
else: |
|
ball_position_r = 0.0 |
|
|
|
left_yellow = np.sum(obs["left_team_yellow_card"]) - np.sum(prev_obs["left_team_yellow_card"]) |
|
right_yellow = np.sum(obs["right_team_yellow_card"]) - np.sum(prev_obs["right_team_yellow_card"]) |
|
yellow_r = right_yellow - left_yellow |
|
|
|
win_reward = 0.0 |
|
if obs['steps_left'] == 0: |
|
[my_score, opponent_score] = obs['score'] |
|
if my_score > opponent_score: |
|
win_reward = 1.0 |
|
|
|
reward = 5.0 * win_reward + 5.0 * rew + 0.003 * ball_position_r + yellow_r |
|
|
|
return reward |
|
|
|
@staticmethod |
|
def create_collector_env_cfg(cfg: dict) -> List[dict]: |
|
collector_cfg = copy.deepcopy(cfg) |
|
collector_env_num = collector_cfg.pop('collector_env_num', 1) |
|
collector_cfg.is_evaluator = False |
|
return [collector_cfg for _ in range(collector_env_num)] |
|
|
|
@staticmethod |
|
def create_evaluator_env_cfg(cfg: dict) -> List[dict]: |
|
evaluator_cfg = copy.deepcopy(cfg) |
|
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1) |
|
evaluator_cfg.is_evaluator = True |
|
return [evaluator_cfg for _ in range(evaluator_env_num)] |
|
|