File size: 7,430 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import copy
from collections import namedtuple
from typing import Any, List, Union

import gfootball
import gfootball.env as football_env
import numpy as np
from ding.envs import BaseEnv, BaseEnvTimestep, BaseEnvInfo
from ding.envs.common.env_element import EnvElement, EnvElementInfo
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
from dizoo.gfootball.envs.obs.encoder import FeatureEncoder
from dizoo.gfootball.envs.obs.gfootball_obs import FullObs
from dizoo.gfootball.envs.action.gfootball_action import GfootballSpAction


@ENV_REGISTRY.register('gfootball_sp')
class GfootballEnv(BaseEnv):

    timestep = namedtuple('GfootballTimestep', ['obs', 'reward', 'done', 'info'])
    info_template = namedtuple('GFootballEnvInfo', ['obs_space', 'act_space', 'rew_space'])

    def __init__(self, cfg: dict) -> None:
        self._cfg = cfg
        self.save_replay = self._cfg.save_replay
        # self.env_name = cfg.get("env_name", "11_vs_11_kaggle")
        self.gui = self._cfg.render
        self._obs_helper = FullObs(cfg)
        self._action_helper = GfootballSpAction(cfg)
        self._launch_env_flag = False
        self._encoder = FeatureEncoder()
        self.is_evaluator = self._cfg.get("is_evaluator", False)
        if self.is_evaluator:
            self.env_name = "11_vs_11_hard_stochastic"
            self.right_role_num = 0
        else:
            self.env_name = "11_vs_11_kaggle"
            self.right_role_num = 1

    def _make_env(self):
        self._env = football_env.create_environment(
            env_name=self.env_name,
            representation='raw',
            stacked=False,
            logdir='/tmp/football',
            write_goal_dumps=False,
            write_full_episode_dumps=self.save_replay,
            write_video=self.save_replay,
            render=self.gui,
            number_of_right_players_agent_controls=self.right_role_num
        )
        if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
            np_seed = 100 * np.random.randint(1, 1000)
            self._env.seed(self._seed + np_seed)
        elif hasattr(self, '_seed'):
            self._env.seed(self._seed)
        self._launch_env_flag = True
        if self.is_evaluator:
            self._eval_episode_return = [0, 0]
        else:
            self._eval_episode_return = [0, 0]

    def reset(self) -> np.ndarray:
        if not self._launch_env_flag:
            self._make_env()
            self._init_flag = True
        self._env.reset()
        obs = self._env.observation()
        if self.is_evaluator:
            self._prev_obs = obs[0]
            obs = self._encoder.encode(obs[0])
            return [obs, obs]
        else:
            self._prev_obs, self.prev_obs_opponent = obs
            obs_ = self._encoder.encode(obs[0])
            obs_opponent = self._encoder.encode(obs[1])
            return [obs_, obs_opponent]

    def close(self) -> None:
        if self._launch_env_flag:
            self._env.close()
        self._launch_env_flag = False

    def seed(self, seed: int, dynamic_seed: int = None) -> None:
        self._seed = seed
        if dynamic_seed:
            self._dynamic_seed = dynamic_seed

    def step(self, action) -> 'GfootballEnv.timestep':
        action = to_ndarray(action)
        # action = self.process_action(action)  # process
        raw_obs, raw_rew, done, info = self._env.step(action)
        if self.is_evaluator:
            raw_obs = raw_obs[0]
            rew = GfootballEnv.calc_reward(raw_rew, self._prev_obs, raw_obs)
            obs = to_ndarray(self._encoder.encode(raw_obs))
            rew = [rew, rew]
            obs = [obs, obs]
            self._eval_episode_return[0] += raw_rew
            self._eval_episode_return[1] += raw_rew
        else:
            rew = GfootballEnv.calc_reward(raw_rew[0], self._prev_obs, raw_obs[0])
            rew_oppo = GfootballEnv.calc_reward(raw_rew[1], self._prev_obs, raw_obs[1])
            rew = [rew, rew_oppo]
            obs = [to_ndarray(self._encoder.encode(raw_obs[0])), to_ndarray(self._encoder.encode(raw_obs[1]))]
            self._eval_episode_return[0] += raw_rew[0]
            self._eval_episode_return[1] += raw_rew[1]

        if done:
            if self.is_evaluator:
                info['eval_episode_return'] = self._eval_episode_return
            else:
                info[0]['eval_episode_return'] = self._eval_episode_return[0]
                info[1]['eval_episode_return'] = self._eval_episode_return[1]

        return BaseEnvTimestep(obs, rew, done, info)

    def info(self) -> BaseEnvInfo:
        info_data = {
            'obs_space': self._obs_helper.info,
            'act_space': self._action_helper.info,
            'rew_space': EnvElementInfo(
                shape=1,
                value={
                    'min': np.float64("-inf"),
                    'max': np.float64("inf"),
                    'dtype': np.float32
                },
            ),
        }
        return GfootballEnv.info_template(**info_data)

    def __repr__(self) -> str:
        return "DI-engine Gfootball Env({})".format(self.env_name)

    @staticmethod
    def calc_reward(rew, prev_obs, obs):
        """
        Reward disign referred to [football-pairs](https://github.com/seungeunrho/football-paris/blob/main/rewarders/rewarder_basic.py)
        """
        ball_x, ball_y, ball_z = obs['ball']
        MIDDLE_X, PENALTY_X, END_X = 0.2, 0.64, 1.0
        PENALTY_Y, END_Y = 0.27, 0.42

        ball_position_r = 0.0
        if (-END_X <= ball_x and ball_x < -PENALTY_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y):
            ball_position_r = -2.0
        elif (-END_X <= ball_x and ball_x < -MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y):
            ball_position_r = -1.0
        elif (-MIDDLE_X <= ball_x and ball_x <= MIDDLE_X) and (-END_Y < ball_y and ball_y < END_Y):
            ball_position_r = 0.0
        elif (PENALTY_X < ball_x and ball_x <= END_X) and (-PENALTY_Y < ball_y and ball_y < PENALTY_Y):
            ball_position_r = 2.0
        elif (MIDDLE_X < ball_x and ball_x <= END_X) and (-END_Y < ball_y and ball_y < END_Y):
            ball_position_r = 1.0
        else:
            ball_position_r = 0.0

        left_yellow = np.sum(obs["left_team_yellow_card"]) - np.sum(prev_obs["left_team_yellow_card"])
        right_yellow = np.sum(obs["right_team_yellow_card"]) - np.sum(prev_obs["right_team_yellow_card"])
        yellow_r = right_yellow - left_yellow

        win_reward = 0.0
        if obs['steps_left'] == 0:
            [my_score, opponent_score] = obs['score']
            if my_score > opponent_score:
                win_reward = 1.0

        reward = 5.0 * win_reward + 5.0 * rew + 0.003 * ball_position_r + yellow_r

        return reward

    @staticmethod
    def create_collector_env_cfg(cfg: dict) -> List[dict]:
        collector_cfg = copy.deepcopy(cfg)
        collector_env_num = collector_cfg.pop('collector_env_num', 1)
        collector_cfg.is_evaluator = False
        return [collector_cfg for _ in range(collector_env_num)]

    @staticmethod
    def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
        evaluator_cfg = copy.deepcopy(cfg)
        evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1)
        evaluator_cfg.is_evaluator = True
        return [evaluator_cfg for _ in range(evaluator_env_num)]