|
import enum |
|
import math |
|
|
|
import numpy as np |
|
from collections import namedtuple |
|
from s2clientprotocol import common_pb2 as sc_common, sc2api_pb2 as sc_pb, raw_pb2 as r_pb |
|
|
|
ORIGINAL_AGENT = "me" |
|
OPPONENT_AGENT = "opponent" |
|
|
|
MOVE_EAST = 4 |
|
MOVE_WEST = 5 |
|
|
|
actions = { |
|
"move": 16, |
|
"attack": 23, |
|
"stop": 4, |
|
"heal": 386, |
|
"parasitic_bomb": 2542, |
|
'fungal_growth': 74, |
|
} |
|
|
|
|
|
class Direction(enum.IntEnum): |
|
NORTH = 0 |
|
SOUTH = 1 |
|
EAST = 2 |
|
WEST = 3 |
|
|
|
|
|
def distance(x1, y1, x2, y2): |
|
"""Distance between two points.""" |
|
return math.hypot(x2 - x1, y2 - y1) |
|
|
|
|
|
class SMACAction: |
|
info_template = namedtuple('EnvElementInfo', ['shape', 'value', 'to_agent_processor', 'from_agent_processor']) |
|
|
|
def __init__(self, n_agents, n_enemies, two_player=False, mirror_opponent=True): |
|
self.obs_pathing_grid = False |
|
self.obs_terrain_height = False |
|
self.state_last_action = True |
|
self.state_timestep_number = False |
|
self.n_obs_pathing = 8 |
|
self.n_obs_height = 9 |
|
self._move_amount = 2 |
|
self.n_actions_no_attack = 6 |
|
self.n_actions_move = 4 |
|
self.n_actions = self.n_actions_no_attack + n_enemies |
|
self.map_x = 0 |
|
self.map_y = 0 |
|
|
|
|
|
self.last_action = np.zeros((n_agents, self.n_actions)) |
|
self.last_action_opponent = np.zeros((n_enemies, self.n_actions)) |
|
self.n_agents = n_agents |
|
self.n_enemies = n_enemies |
|
|
|
self.two_player = two_player |
|
self.mirror_opponent = mirror_opponent |
|
|
|
def reset(self): |
|
self.last_action.fill(0) |
|
self.last_action_opponent.fill(0) |
|
|
|
def update(self, map_info, map_x, map_y): |
|
if map_info.pathing_grid.bits_per_pixel == 1: |
|
vals = np.array(list(map_info.pathing_grid.data)).reshape(map_x, int(map_y / 8)) |
|
self.pathing_grid = np.transpose( |
|
np.array([[(b >> i) & 1 for b in row for i in range(7, -1, -1)] for row in vals], dtype=np.bool) |
|
) |
|
else: |
|
self.pathing_grid = np.invert( |
|
np.flip( |
|
np.transpose(np.array(list(map_info.pathing_grid.data), dtype=np.bool).reshape(map_x, map_y)), |
|
axis=1 |
|
) |
|
) |
|
|
|
self.terrain_height = np.flip( |
|
np.transpose(np.array(list(map_info.terrain_height.data)).reshape(map_x, map_y)), 1 |
|
) / 255 |
|
self.map_x = map_x |
|
self.map_y = map_y |
|
|
|
def _parse_single(self, actions, engine, is_opponent=False): |
|
actions = np.asarray(actions, dtype=np.int) |
|
assert len(actions) == (self.n_enemies if is_opponent else self.n_agents) |
|
|
|
actions_int = [int(a) for a in actions] |
|
|
|
if is_opponent: |
|
self.last_action_opponent = np.eye(self.n_actions)[np.array(actions_int)] |
|
else: |
|
self.last_action = np.eye(self.n_actions)[np.array(actions_int)] |
|
|
|
sc_actions = [] |
|
for a_id, action in enumerate(actions_int): |
|
sc_action = self.get_agent_action(a_id, action, engine, is_opponent) |
|
if sc_action: |
|
sc_actions.append(sc_action) |
|
return sc_actions |
|
|
|
def get_action(self, actions, engine): |
|
if self.two_player: |
|
|
|
assert self.two_player |
|
assert isinstance(actions, dict) |
|
assert ORIGINAL_AGENT in actions |
|
assert OPPONENT_AGENT in actions |
|
|
|
if self.mirror_opponent: |
|
actions[OPPONENT_AGENT] = [self._transform_action(a) for a in actions[OPPONENT_AGENT]] |
|
|
|
sc_actions_me = self._parse_single(actions[ORIGINAL_AGENT], engine, is_opponent=False) |
|
sc_actions_opponent = self._parse_single(actions[OPPONENT_AGENT], engine, is_opponent=True) |
|
|
|
return {ORIGINAL_AGENT: sc_actions_me, OPPONENT_AGENT: sc_actions_opponent} |
|
else: |
|
assert not isinstance(actions, dict) |
|
sc_actions = self._parse_single(actions, engine, is_opponent=False) |
|
return sc_actions |
|
|
|
def get_unit_by_id(self, a_id, engine, is_opponent=False): |
|
"""Get unit by ID.""" |
|
if is_opponent: |
|
return engine.enemies[a_id] |
|
return engine.agents[a_id] |
|
|
|
def get_agent_action(self, a_id, action, engine, is_opponent=False): |
|
"""Construct the action for agent a_id. |
|
The input action here is *absolute* and is not mirrored! |
|
We use skip_mirror=True in get_avail_agent_actions to avoid error. |
|
""" |
|
avail_actions = self.get_avail_agent_actions(a_id, engine, is_opponent=is_opponent, skip_mirror=True) |
|
try: |
|
assert avail_actions[action] == 1, \ |
|
"Agent {} cannot perform action {} in ava {}".format(a_id, action, avail_actions) |
|
except Exception as e: |
|
if action == 0: |
|
action = 1 |
|
else: |
|
action = 1 |
|
|
|
|
|
unit = self.get_unit_by_id(a_id, engine, is_opponent=is_opponent) |
|
|
|
|
|
|
|
|
|
|
|
tag = unit.tag |
|
type_id = unit.unit_type |
|
x = unit.pos.x |
|
y = unit.pos.y |
|
|
|
|
|
|
|
|
|
if action == 0: |
|
|
|
assert unit.health == 0, "No-op only available for dead agents." |
|
return None |
|
elif action == 1: |
|
|
|
cmd = r_pb.ActionRawUnitCommand(ability_id=actions["stop"], unit_tags=[tag], queue_command=False) |
|
|
|
elif action == 2: |
|
|
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=actions["move"], |
|
target_world_space_pos=sc_common.Point2D(x=x, y=y + self._move_amount), |
|
unit_tags=[tag], |
|
queue_command=False |
|
) |
|
|
|
elif action == 3: |
|
|
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=actions["move"], |
|
target_world_space_pos=sc_common.Point2D(x=x, y=y - self._move_amount), |
|
unit_tags=[tag], |
|
queue_command=False |
|
) |
|
|
|
elif action == 4: |
|
|
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=actions["move"], |
|
target_world_space_pos=sc_common.Point2D(x=x + self._move_amount, y=y), |
|
unit_tags=[tag], |
|
queue_command=False |
|
) |
|
|
|
elif action == 5: |
|
|
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=actions["move"], |
|
target_world_space_pos=sc_common.Point2D(x=x - self._move_amount, y=y), |
|
unit_tags=[tag], |
|
queue_command=False |
|
) |
|
else: |
|
|
|
target_id = action - self.n_actions_no_attack |
|
if engine.map_type == "MMM" and unit.unit_type == (engine.medivac_id_opponent |
|
if is_opponent else engine.medivac_id): |
|
target_unit = (engine.enemies[target_id] if is_opponent else engine.agents[target_id]) |
|
action_name = "heal" |
|
elif engine.map_type == 'infestor_viper': |
|
|
|
if type_id == 499: |
|
target_unit = engine.enemies[target_id] |
|
action_name = "parasitic_bomb" |
|
|
|
else: |
|
target_unit = engine.enemies[target_id] |
|
target_loc = (target_unit.pos.x, target_unit.pos.y) |
|
action_name = "fungal_growth" |
|
target_loc = sc_common.Point2D(x=target_loc[0], y=target_loc[1]) |
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=actions[action_name], |
|
target_world_space_pos=target_loc, |
|
unit_tags=[tag], |
|
queue_command=False |
|
) |
|
return sc_pb.Action(action_raw=r_pb.ActionRaw(unit_command=cmd)) |
|
else: |
|
target_unit = (engine.agents[target_id] if is_opponent else engine.enemies[target_id]) |
|
action_name = "attack" |
|
|
|
action_id = actions[action_name] |
|
target_tag = target_unit.tag |
|
|
|
cmd = r_pb.ActionRawUnitCommand( |
|
ability_id=action_id, target_unit_tag=target_tag, unit_tags=[tag], queue_command=False |
|
) |
|
|
|
sc_action = sc_pb.Action(action_raw=r_pb.ActionRaw(unit_command=cmd)) |
|
return sc_action |
|
|
|
def get_avail_agent_actions(self, agent_id, engine, is_opponent=False, skip_mirror=False): |
|
"""Returns the available actions for agent_id.""" |
|
medivac_id = engine.medivac_id_opponent if is_opponent else engine.medivac_id |
|
unit = self.get_unit_by_id(agent_id, engine, is_opponent) |
|
if unit.health > 0: |
|
|
|
avail_actions = [0] * self.n_actions |
|
|
|
|
|
avail_actions[1] = 1 |
|
|
|
|
|
if self.can_move(unit, Direction.NORTH): |
|
avail_actions[2] = 1 |
|
if self.can_move(unit, Direction.SOUTH): |
|
avail_actions[3] = 1 |
|
if self.can_move(unit, Direction.EAST): |
|
avail_actions[4] = 1 |
|
if self.can_move(unit, Direction.WEST): |
|
avail_actions[5] = 1 |
|
|
|
|
|
shoot_range = self.unit_shoot_range(unit) |
|
|
|
target_items = engine.enemies.items() if not is_opponent else engine.agents.items() |
|
self_items = engine.agents.items() if not is_opponent else engine.enemies.items() |
|
if engine.map_type == "MMM" and unit.unit_type == medivac_id: |
|
|
|
target_items = [(t_id, t_unit) for (t_id, t_unit) in self_items if t_unit.unit_type != medivac_id] |
|
|
|
for t_id, t_unit in target_items: |
|
if t_unit.health > 0: |
|
dist = distance(unit.pos.x, unit.pos.y, t_unit.pos.x, t_unit.pos.y) |
|
if dist <= shoot_range: |
|
if engine.map_type == "infestor_viper": |
|
value = 0 |
|
|
|
if unit.unit_type == 499: |
|
if unit.energy >= 125: |
|
value = 1 |
|
|
|
else: |
|
if unit.energy >= 50: |
|
value = 1 |
|
avail_actions[t_id + self.n_actions_no_attack] = value |
|
else: |
|
avail_actions[t_id + self.n_actions_no_attack] = 1 |
|
|
|
else: |
|
|
|
avail_actions = [1] + [0] * (self.n_actions - 1) |
|
|
|
if (not skip_mirror) and self.mirror_opponent and is_opponent: |
|
avail_actions[MOVE_EAST], avail_actions[MOVE_WEST] = \ |
|
avail_actions[MOVE_WEST], avail_actions[MOVE_EAST] |
|
|
|
return avail_actions |
|
|
|
def can_move(self, unit, direction): |
|
"""Whether a unit can move in a given direction.""" |
|
m = self._move_amount / 2 |
|
|
|
if direction == Direction.NORTH: |
|
x, y = int(unit.pos.x), int(unit.pos.y + m) |
|
elif direction == Direction.SOUTH: |
|
x, y = int(unit.pos.x), int(unit.pos.y - m) |
|
elif direction == Direction.EAST: |
|
x, y = int(unit.pos.x + m), int(unit.pos.y) |
|
else: |
|
x, y = int(unit.pos.x - m), int(unit.pos.y) |
|
|
|
if self.check_bounds(x, y) and self.pathing_grid[x, y]: |
|
return True |
|
|
|
return False |
|
|
|
def check_bounds(self, x, y): |
|
"""Whether a point is within the map bounds.""" |
|
return 0 <= x < self.map_x and 0 <= y < self.map_y |
|
|
|
def get_surrounding_pathing(self, unit): |
|
"""Returns pathing values of the grid surrounding the given unit.""" |
|
points = self.get_surrounding_points(unit, include_self=False) |
|
vals = [self.pathing_grid[x, y] if self.check_bounds(x, y) else 1 for x, y in points] |
|
return vals |
|
|
|
def get_surrounding_height(self, unit): |
|
"""Returns height values of the grid surrounding the given unit.""" |
|
points = self.get_surrounding_points(unit, include_self=True) |
|
vals = [self.terrain_height[x, y] if self.check_bounds(x, y) else 1 for x, y in points] |
|
return vals |
|
|
|
def unit_shoot_range(self, unit): |
|
"""Returns the shooting range for an agent.""" |
|
type_id = unit.unit_type |
|
if type_id == 499: |
|
return 8 |
|
elif type_id == 111: |
|
return 10 |
|
else: |
|
return 6 |
|
|
|
def get_surrounding_points(self, unit, include_self=False): |
|
"""Returns the surrounding points of the unit in 8 directions.""" |
|
x = int(unit.pos.x) |
|
y = int(unit.pos.y) |
|
|
|
ma = self._move_amount |
|
|
|
points = [ |
|
(x, y + 2 * ma), |
|
(x, y - 2 * ma), |
|
(x + 2 * ma, y), |
|
(x - 2 * ma, y), |
|
(x + ma, y + ma), |
|
(x - ma, y - ma), |
|
(x + ma, y - ma), |
|
(x - ma, y + ma), |
|
] |
|
|
|
if include_self: |
|
points.append((x, y)) |
|
|
|
return points |
|
|
|
def get_movement_features(self, agent_id, engine, is_opponent=False): |
|
unit = self.get_unit_by_id(agent_id, engine, is_opponent=is_opponent) |
|
move_feats_dim = self.get_obs_move_feats_size() |
|
move_feats = np.zeros(move_feats_dim, dtype=np.float32) |
|
|
|
if unit.health > 0: |
|
|
|
avail_actions = self.get_avail_agent_actions(agent_id, engine, is_opponent=is_opponent) |
|
for m in range(self.n_actions_move): |
|
move_feats[m] = avail_actions[m + 2] |
|
|
|
ind = self.n_actions_move |
|
|
|
if self.obs_pathing_grid: |
|
move_feats[ind:ind + self.n_obs_pathing |
|
] = self.get_surrounding_pathing(unit) |
|
ind += self.n_obs_pathing |
|
|
|
if self.obs_terrain_height: |
|
move_feats[ind:] = self.get_surrounding_height(unit) |
|
return move_feats |
|
|
|
def get_obs_move_feats_size(self): |
|
"""Returns the size of the vector containing the agents's movement-related features.""" |
|
move_feats = self.n_actions_move |
|
if self.obs_pathing_grid: |
|
move_feats += self.n_obs_pathing |
|
if self.obs_terrain_height: |
|
move_feats += self.n_obs_height |
|
|
|
return move_feats |
|
|
|
def get_last_action(self, is_opponent=False): |
|
if is_opponent: |
|
ret = self.last_action_opponent |
|
if self.mirror_opponent: |
|
ret[:, MOVE_EAST], ret[:, MOVE_WEST] = \ |
|
ret[:, MOVE_WEST].copy(), ret[:, MOVE_EAST].copy() |
|
else: |
|
ret = self.last_action |
|
return ret |
|
|
|
def get_avail_actions(self, engine, is_opponent=False): |
|
return [ |
|
self.get_avail_agent_actions(agent_id, engine, is_opponent=is_opponent) |
|
for agent_id in range(self.n_agents if not is_opponent else self.n_enemies) |
|
] |
|
|
|
@staticmethod |
|
def _transform_action(a): |
|
if a == MOVE_EAST: |
|
a = MOVE_WEST |
|
elif a == MOVE_WEST: |
|
a = MOVE_EAST |
|
return a |
|
|
|
def info(self): |
|
shape = (self.n_actions, ) |
|
value = {'min': 0, 'max': 1} |
|
return SMACAction.info_template(shape, value, None, None) |
|
|