|
import torch |
|
import torch.optim as optim |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import numpy as np |
|
from collections import deque |
|
import random |
|
import matplotlib.pyplot as plt |
|
import matplotlib.animation as animation |
|
import heapq |
|
from huggingface_hub import HfApi, HfFolder |
|
|
|
|
|
def generate_floorplan(size=10, obstacle_density=0.2): |
|
floorplan = [[0 for _ in range(size)] for _ in range(size)] |
|
target_x, target_y = size - 1, size - 1 |
|
floorplan[target_x][target_y] = 2 |
|
num_obstacles = int(size * size * obstacle_density) |
|
for _ in range(num_obstacles): |
|
x = random.randint(0, size - 1) |
|
y = random.randint(0, size - 1) |
|
if floorplan[x][y] == 0 and (x, y) != (0, 0): |
|
floorplan[x][y] = 1 |
|
return floorplan, target_x, target_y |
|
|
|
def a_star(floorplan, start, goal): |
|
size = len(floorplan) |
|
open_set = [] |
|
heapq.heappush(open_set, (0, start)) |
|
came_from = {} |
|
g_score = {start: 0} |
|
f_score = {start: heuristic(start, goal)} |
|
|
|
while open_set: |
|
_, current = heapq.heappop(open_set) |
|
|
|
if current == goal: |
|
return reconstruct_path(came_from, current) |
|
|
|
neighbors = get_neighbors(current, size) |
|
for neighbor in neighbors: |
|
if floorplan[neighbor[0]][neighbor[1]] == 1: |
|
continue |
|
|
|
tentative_g_score = g_score[current] + 1 |
|
|
|
if neighbor not in g_score or tentative_g_score < g_score[neighbor]: |
|
came_from[neighbor] = current |
|
g_score[neighbor] = tentative_g_score |
|
f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal) |
|
heapq.heappush(open_set, (f_score[neighbor], neighbor)) |
|
|
|
return [] |
|
|
|
def heuristic(a, b): |
|
return abs(a[0] - b[0]) + abs(a[1] - b[1]) |
|
|
|
def get_neighbors(pos, size): |
|
neighbors = [] |
|
x, y = pos |
|
if x > 0: |
|
neighbors.append((x - 1, y)) |
|
if x < size - 1: |
|
neighbors.append((x + 1, y)) |
|
if y > 0: |
|
neighbors.append((x, y - 1)) |
|
if y < size - 1: |
|
neighbors.append((x, y + 1)) |
|
return neighbors |
|
|
|
def reconstruct_path(came_from, current): |
|
path = [current] |
|
while current in came_from: |
|
current = came_from[current] |
|
path.append(current) |
|
return path[::-1] |
|
|
|
class Environment: |
|
def __init__(self, size=10, obstacle_density=0.2): |
|
self.size = size |
|
self.floorplan, self.target_x, self.target_y = generate_floorplan(size, obstacle_density) |
|
self.robot_x = 0 |
|
self.robot_y = 0 |
|
|
|
def reset(self): |
|
while True: |
|
self.robot_x = random.randint(0, self.size - 1) |
|
self.robot_y = random.randint(0, self.size - 1) |
|
if self.floorplan[self.robot_x][self.robot_y] == 0: |
|
break |
|
return self.get_cnn_state() |
|
|
|
def step(self, action): |
|
new_x, new_y = self.robot_x, self.robot_y |
|
|
|
if action == 0: |
|
new_x = max(self.robot_x - 1, 0) |
|
elif action == 1: |
|
new_x = min(self.robot_x + 1, self.size - 1) |
|
elif action == 2: |
|
new_y = max(self.robot_y - 1, 0) |
|
elif action == 3: |
|
new_y = min(self.robot_y + 1, self.size - 1) |
|
|
|
|
|
if self.floorplan[new_x][new_y] != 1: |
|
self.robot_x, self.robot_y = new_x, new_y |
|
|
|
done = (self.robot_x == self.target_x and self.robot_y == self.target_y) |
|
reward = self.get_reward(self.robot_x, self.robot_y) |
|
next_state = self.get_cnn_state() |
|
info = {} |
|
return next_state, reward, done, info |
|
|
|
def get_reward(self, robot_x, robot_y): |
|
if self.floorplan[robot_x][robot_y] == 1: |
|
return -5 |
|
elif robot_x == self.target_x and robot_y == self.target_y: |
|
return 10 |
|
else: |
|
return -0.1 |
|
|
|
def get_cnn_state(self): |
|
grid = [row[:] for row in self.floorplan] |
|
grid[self.robot_x][self.robot_y] = 3 |
|
return np.array(grid).flatten() |
|
|
|
def render(self, path=None): |
|
grid = np.array(self.floorplan) |
|
fig, ax = plt.subplots() |
|
ax.set_xticks(np.arange(-0.5, self.size, 1)) |
|
ax.set_yticks(np.arange(-0.5, self.size, 1)) |
|
ax.grid(which='major', color='k', linestyle='-', linewidth=1) |
|
ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False) |
|
|
|
def update(i): |
|
ax.clear() |
|
ax.imshow(grid, cmap='Greys', interpolation='nearest') |
|
if path: |
|
x, y = path[i] |
|
ax.plot(y, x, 'bo') |
|
plt.draw() |
|
|
|
ani = animation.FuncAnimation(fig, update, frames=len(path), repeat=False) |
|
plt.show() |
|
|
|
class DQN(nn.Module): |
|
def __init__(self, input_size, hidden_sizes, output_size): |
|
super(DQN, self).__init__() |
|
self.input_size = input_size |
|
self.hidden_sizes = hidden_sizes |
|
self.output_size = output_size |
|
|
|
self.fc_layers = nn.ModuleList() |
|
prev_size = input_size |
|
for size in hidden_sizes: |
|
self.fc_layers.append(nn.Linear(prev_size, size)) |
|
prev_size = size |
|
self.output_layer = nn.Linear(prev_size, output_size) |
|
|
|
def forward(self, x): |
|
if len(x.shape) > 2: |
|
x = x.view(x.size(0), -1) |
|
for layer in self.fc_layers: |
|
x = F.relu(layer(x)) |
|
x = self.output_layer(x) |
|
return x |
|
|
|
def choose_action(self, state): |
|
with torch.no_grad(): |
|
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) |
|
q_values = self(state_tensor) |
|
action = q_values.argmax().item() |
|
return action |
|
|
|
class ReplayBuffer: |
|
def __init__(self, capacity): |
|
self.buffer = deque(maxlen=capacity) |
|
|
|
def push(self, state, action, reward, next_state, done): |
|
self.buffer.append((state, action, reward, next_state, done)) |
|
|
|
def sample(self, batch_size): |
|
batch = random.sample(self.buffer, batch_size) |
|
states, actions, rewards, next_states, dones = zip(*batch) |
|
return states, actions, rewards, next_states, dones |
|
|
|
def __len__(self): |
|
return len(self.buffer) |
|
|
|
|
|
def save_checkpoint(state, filename="checkpoint.pth.tar"): |
|
torch.save(state, filename) |
|
|
|
|
|
def load_checkpoint(filename): |
|
checkpoint = torch.load(filename) |
|
return checkpoint |
|
|
|
|
|
env = Environment() |
|
input_size = env.size * env.size |
|
hidden_sizes = [64, 64] |
|
output_size = 4 |
|
|
|
dqn = DQN(input_size, hidden_sizes, output_size) |
|
dqn_target = DQN(input_size, hidden_sizes, output_size) |
|
dqn_target.load_state_dict(dqn.state_dict()) |
|
|
|
optimizer = optim.Adam(dqn.parameters(), lr=0.001) |
|
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5) |
|
replay_buffer = ReplayBuffer(10000) |
|
num_episodes = 50 |
|
batch_size = 64 |
|
gamma = 0.99 |
|
target_update_freq = 100 |
|
checkpoint_freq = 10 |
|
|
|
losses = [] |
|
for episode in range(num_episodes): |
|
state = env.reset() |
|
total_reward = 0 |
|
done = False |
|
|
|
|
|
initial_path = a_star(env.floorplan, (env.robot_x, env.robot_y), (env.target_x, env.target_y)) |
|
path_index = 0 |
|
|
|
while not done: |
|
epsilon = max(0.01, 0.2 - 0.01 * (episode / 2)) |
|
if np.random.rand() < epsilon: |
|
if initial_path and path_index < len(initial_path): |
|
next_pos = initial_path[path_index] |
|
if next_pos[0] < env.robot_x: |
|
action = 0 |
|
elif next_pos[0] > env.robot_x: |
|
action = 1 |
|
elif next_pos[1] < env.robot_y: |
|
action = 2 |
|
else: |
|
action = 3 |
|
path_index += 1 |
|
else: |
|
action = np.random.randint(output_size) |
|
else: |
|
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) |
|
with torch.no_grad(): |
|
q_values = dqn(state_tensor) |
|
action = q_values.argmax().item() |
|
|
|
next_state, reward, done, _ = env.step(action) |
|
replay_buffer.push(state, action, reward, next_state, done) |
|
|
|
if len(replay_buffer) > batch_size: |
|
states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size) |
|
states = torch.tensor(states, dtype=torch.float32) |
|
actions = torch.tensor(actions, dtype=torch.int64) |
|
rewards = torch.tensor(rewards, dtype=torch.float32) |
|
next_states = torch.tensor(next_states, dtype=torch.float32) |
|
dones = torch.tensor(dones, dtype=torch.float32) |
|
|
|
q_values = dqn(states) |
|
q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1) |
|
|
|
with torch.no_grad(): |
|
next_q_values = dqn(next_states) |
|
next_q_values = next_q_values.max(1)[0] |
|
target_q_values = rewards + (1 - dones) * gamma * next_q_values |
|
|
|
loss = F.smooth_l1_loss(q_values, target_q_values) |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
losses.append(loss.item()) |
|
|
|
total_reward += reward |
|
state = next_state |
|
|
|
if episode % target_update_freq == 0: |
|
dqn_target.load_state_dict(dqn.state_dict()) |
|
scheduler.step() |
|
|
|
|
|
if episode % checkpoint_freq == 0 or episode == num_episodes - 1: |
|
checkpoint = { |
|
'episode': episode + 1, |
|
'state_dict': dqn.state_dict(), |
|
'optimizer': optimizer.state_dict(), |
|
'losses': losses |
|
} |
|
save_checkpoint(checkpoint, f'checkpoint_{episode + 1}.pth.tar') |
|
|
|
print(f"Episode {episode + 1}: Total Reward = {total_reward}, Loss = {np.mean(losses[-batch_size:]) if losses else None}") |
|
|
|
|
|
torch.save(dqn.state_dict(), 'dqn_model.pth') |
|
|
|
|
|
dqn = DQN(input_size, hidden_sizes, output_size) |
|
dqn.load_state_dict(torch.load('dqn_model.pth')) |
|
dqn.eval() |
|
|
|
|
|
state = env.reset() |
|
done = False |
|
path = [(env.robot_x, env.robot_y)] |
|
|
|
while not done: |
|
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) |
|
with torch.no_grad(): |
|
q_values = dqn(state_tensor) |
|
action = q_values.argmax().item() |
|
next_state, reward, done, _ = env.step(action) |
|
path.append((env.robot_x, env.robot_y)) |
|
state = next_state |
|
|
|
|
|
env.render(path) |
|
|
|
|
|
def evaluate_agent(env, agent, num_episodes=5): |
|
total_rewards = 0 |
|
successful_episodes = 0 |
|
|
|
for episode in range(num_episodes): |
|
state = env.reset() |
|
episode_reward = 0 |
|
done = False |
|
|
|
while not done: |
|
action = agent.choose_action(state) |
|
next_state, reward, done, _ = env.step(action) |
|
episode_reward += reward |
|
state = next_state |
|
|
|
total_rewards += episode_reward |
|
if episode_reward > 0: |
|
successful_episodes += 1 |
|
|
|
avg_reward = total_rewards / num_episodes |
|
success_rate = successful_episodes / num_episodes |
|
|
|
print("Evaluation Results:") |
|
print(f"Average Reward: {avg_reward}") |
|
print(f"Success Rate: {success_rate}") |
|
|
|
return avg_reward, success_rate |
|
|
|
|
|
avg_reward, success_rate = evaluate_agent(env, dqn, num_episodes=5) |
|
|
|
|
|
|
|
api = HfApi() |
|
api_token = HfFolder.get_token() |
|
|
|
|
|
model_repo = 'cajcodes/dqn-floorplan-finder' |
|
api.create_repo(repo_id=model_repo, exist_ok=True) |
|
|
|
|
|
api.upload_file( |
|
path_or_fileobj='dqn_model.pth', |
|
path_in_repo='dqn_model.pth', |
|
repo_id=model_repo |
|
) |
|
|