|
from __future__ import division |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from torch.autograd import Variable |
|
import numpy as np |
|
|
|
from PIL import Image |
|
|
|
from utils.parse_config import * |
|
from utils.utils import build_targets |
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
|
def create_modules(module_defs): |
|
""" |
|
Constructs module list of layer blocks from module configuration in module_defs |
|
""" |
|
hyperparams = module_defs.pop(0) |
|
output_filters = [int(hyperparams["channels"])] |
|
module_list = nn.ModuleList() |
|
for i, module_def in enumerate(module_defs): |
|
modules = nn.Sequential() |
|
|
|
if module_def["type"] == "convolutional": |
|
bn = int(module_def["batch_normalize"]) |
|
filters = int(module_def["filters"]) |
|
kernel_size = int(module_def["size"]) |
|
pad = (kernel_size - 1) // 2 if int(module_def["pad"]) else 0 |
|
modules.add_module( |
|
"conv_%d" % i, |
|
nn.Conv2d( |
|
in_channels=output_filters[-1], |
|
out_channels=filters, |
|
kernel_size=kernel_size, |
|
stride=int(module_def["stride"]), |
|
padding=pad, |
|
bias=not bn, |
|
), |
|
) |
|
if bn: |
|
modules.add_module("batch_norm_%d" % i, nn.BatchNorm2d(filters)) |
|
if module_def["activation"] == "leaky": |
|
modules.add_module("leaky_%d" % i, nn.LeakyReLU(0.1)) |
|
|
|
elif module_def["type"] == "maxpool": |
|
kernel_size = int(module_def["size"]) |
|
stride = int(module_def["stride"]) |
|
if kernel_size == 2 and stride == 1: |
|
padding = nn.ZeroPad2d((0, 1, 0, 1)) |
|
modules.add_module("_debug_padding_%d" % i, padding) |
|
maxpool = nn.MaxPool2d( |
|
kernel_size=int(module_def["size"]), |
|
stride=int(module_def["stride"]), |
|
padding=int((kernel_size - 1) // 2), |
|
) |
|
modules.add_module("maxpool_%d" % i, maxpool) |
|
|
|
elif module_def["type"] == "upsample": |
|
upsample = nn.Upsample(scale_factor=int(module_def["stride"]), mode="nearest") |
|
modules.add_module("upsample_%d" % i, upsample) |
|
|
|
elif module_def["type"] == "route": |
|
layers = [int(x) for x in module_def["layers"].split(",")] |
|
filters = sum([output_filters[layer_i] for layer_i in layers]) |
|
modules.add_module("route_%d" % i, EmptyLayer()) |
|
|
|
elif module_def["type"] == "shortcut": |
|
filters = output_filters[int(module_def["from"])] |
|
modules.add_module("shortcut_%d" % i, EmptyLayer()) |
|
|
|
elif module_def["type"] == "yolo": |
|
anchor_idxs = [int(x) for x in module_def["mask"].split(",")] |
|
|
|
anchors = [int(x) for x in module_def["anchors"].split(",")] |
|
anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)] |
|
anchors = [anchors[i] for i in anchor_idxs] |
|
num_classes = int(module_def["classes"]) |
|
img_height = int(hyperparams["height"]) |
|
|
|
yolo_layer = YOLOLayer(anchors, num_classes, img_height) |
|
modules.add_module("yolo_%d" % i, yolo_layer) |
|
|
|
module_list.append(modules) |
|
output_filters.append(filters) |
|
|
|
return hyperparams, module_list |
|
|
|
|
|
class EmptyLayer(nn.Module): |
|
"""Placeholder for 'route' and 'shortcut' layers""" |
|
|
|
def __init__(self): |
|
super(EmptyLayer, self).__init__() |
|
|
|
|
|
class YOLOLayer(nn.Module): |
|
"""Detection layer""" |
|
|
|
def __init__(self, anchors, num_classes, img_dim): |
|
super(YOLOLayer, self).__init__() |
|
self.anchors = anchors |
|
self.num_anchors = len(anchors) |
|
self.num_classes = num_classes |
|
self.bbox_attrs = 5 + num_classes |
|
self.image_dim = img_dim |
|
self.ignore_thres = 0.5 |
|
self.lambda_coord = 1 |
|
|
|
self.mse_loss = nn.MSELoss(size_average=True) |
|
self.bce_loss = nn.BCELoss(size_average=True) |
|
self.ce_loss = nn.CrossEntropyLoss() |
|
|
|
def forward(self, x, targets=None): |
|
nA = self.num_anchors |
|
nB = x.size(0) |
|
nG = x.size(2) |
|
stride = self.image_dim / nG |
|
|
|
|
|
FloatTensor = torch.cuda.FloatTensor if x.is_cuda else torch.FloatTensor |
|
LongTensor = torch.cuda.LongTensor if x.is_cuda else torch.LongTensor |
|
ByteTensor = torch.cuda.ByteTensor if x.is_cuda else torch.ByteTensor |
|
|
|
prediction = x.view(nB, nA, self.bbox_attrs, nG, nG).permute(0, 1, 3, 4, 2).contiguous() |
|
|
|
|
|
x = torch.sigmoid(prediction[..., 0]) |
|
y = torch.sigmoid(prediction[..., 1]) |
|
w = prediction[..., 2] |
|
h = prediction[..., 3] |
|
pred_conf = torch.sigmoid(prediction[..., 4]) |
|
pred_cls = torch.sigmoid(prediction[..., 5:]) |
|
|
|
|
|
grid_x = torch.arange(nG).repeat(nG, 1).view([1, 1, nG, nG]).type(FloatTensor) |
|
grid_y = torch.arange(nG).repeat(nG, 1).t().view([1, 1, nG, nG]).type(FloatTensor) |
|
scaled_anchors = FloatTensor([(a_w / stride, a_h / stride) for a_w, a_h in self.anchors]) |
|
anchor_w = scaled_anchors[:, 0:1].view((1, nA, 1, 1)) |
|
anchor_h = scaled_anchors[:, 1:2].view((1, nA, 1, 1)) |
|
|
|
|
|
pred_boxes = FloatTensor(prediction[..., :4].shape) |
|
pred_boxes[..., 0] = x.data + grid_x |
|
pred_boxes[..., 1] = y.data + grid_y |
|
pred_boxes[..., 2] = torch.exp(w.data) * anchor_w |
|
pred_boxes[..., 3] = torch.exp(h.data) * anchor_h |
|
|
|
|
|
if targets is not None: |
|
|
|
if x.is_cuda: |
|
self.mse_loss = self.mse_loss.cuda() |
|
self.bce_loss = self.bce_loss.cuda() |
|
self.ce_loss = self.ce_loss.cuda() |
|
|
|
nGT, nCorrect, mask, conf_mask, tx, ty, tw, th, tconf, tcls = build_targets( |
|
pred_boxes=pred_boxes.cpu().data, |
|
pred_conf=pred_conf.cpu().data, |
|
pred_cls=pred_cls.cpu().data, |
|
target=targets.cpu().data, |
|
anchors=scaled_anchors.cpu().data, |
|
num_anchors=nA, |
|
num_classes=self.num_classes, |
|
grid_size=nG, |
|
ignore_thres=self.ignore_thres, |
|
img_dim=self.image_dim, |
|
) |
|
|
|
nProposals = int((pred_conf > 0.5).sum().item()) |
|
recall = float(nCorrect / nGT) if nGT else 1 |
|
precision = float(nCorrect / nProposals) |
|
|
|
|
|
mask = Variable(mask.type(ByteTensor)) |
|
conf_mask = Variable(conf_mask.type(ByteTensor)) |
|
|
|
|
|
tx = Variable(tx.type(FloatTensor), requires_grad=False) |
|
ty = Variable(ty.type(FloatTensor), requires_grad=False) |
|
tw = Variable(tw.type(FloatTensor), requires_grad=False) |
|
th = Variable(th.type(FloatTensor), requires_grad=False) |
|
tconf = Variable(tconf.type(FloatTensor), requires_grad=False) |
|
tcls = Variable(tcls.type(LongTensor), requires_grad=False) |
|
|
|
|
|
conf_mask_true = mask |
|
conf_mask_false = conf_mask - mask |
|
|
|
|
|
loss_x = self.mse_loss(x[mask], tx[mask]) |
|
loss_y = self.mse_loss(y[mask], ty[mask]) |
|
loss_w = self.mse_loss(w[mask], tw[mask]) |
|
loss_h = self.mse_loss(h[mask], th[mask]) |
|
loss_conf = self.bce_loss(pred_conf[conf_mask_false], tconf[conf_mask_false]) + self.bce_loss( |
|
pred_conf[conf_mask_true], tconf[conf_mask_true] |
|
) |
|
loss_cls = (1 / nB) * self.ce_loss(pred_cls[mask], torch.argmax(tcls[mask], 1)) |
|
loss = loss_x + loss_y + loss_w + loss_h + loss_conf + loss_cls |
|
|
|
return ( |
|
loss, |
|
loss_x.item(), |
|
loss_y.item(), |
|
loss_w.item(), |
|
loss_h.item(), |
|
loss_conf.item(), |
|
loss_cls.item(), |
|
recall, |
|
precision, |
|
) |
|
|
|
else: |
|
|
|
output = torch.cat( |
|
( |
|
pred_boxes.view(nB, -1, 4) * stride, |
|
pred_conf.view(nB, -1, 1), |
|
pred_cls.view(nB, -1, self.num_classes), |
|
), |
|
-1, |
|
) |
|
return output |
|
|
|
|
|
class Darknet(nn.Module): |
|
"""YOLOv3 object detection model""" |
|
|
|
def __init__(self, config_path, img_size=416): |
|
super(Darknet, self).__init__() |
|
self.module_defs = parse_model_config(config_path) |
|
self.hyperparams, self.module_list = create_modules(self.module_defs) |
|
self.img_size = img_size |
|
self.seen = 0 |
|
self.header_info = np.array([0, 0, 0, self.seen, 0]) |
|
self.loss_names = ["x", "y", "w", "h", "conf", "cls", "recall", "precision"] |
|
|
|
def forward(self, x, targets=None): |
|
is_training = targets is not None |
|
output = [] |
|
self.losses = defaultdict(float) |
|
layer_outputs = [] |
|
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)): |
|
if module_def["type"] in ["convolutional", "upsample", "maxpool"]: |
|
x = module(x) |
|
elif module_def["type"] == "route": |
|
layer_i = [int(x) for x in module_def["layers"].split(",")] |
|
x = torch.cat([layer_outputs[i] for i in layer_i], 1) |
|
elif module_def["type"] == "shortcut": |
|
layer_i = int(module_def["from"]) |
|
x = layer_outputs[-1] + layer_outputs[layer_i] |
|
elif module_def["type"] == "yolo": |
|
|
|
if is_training: |
|
x, *losses = module[0](x, targets) |
|
for name, loss in zip(self.loss_names, losses): |
|
self.losses[name] += loss |
|
|
|
else: |
|
x = module(x) |
|
output.append(x) |
|
layer_outputs.append(x) |
|
|
|
self.losses["recall"] /= 3 |
|
self.losses["precision"] /= 3 |
|
return sum(output) if is_training else torch.cat(output, 1) |
|
|
|
def load_weights(self, weights_path): |
|
"""Parses and loads the weights stored in 'weights_path'""" |
|
|
|
|
|
fp = open(weights_path, "rb") |
|
header = np.fromfile(fp, dtype=np.int32, count=5) |
|
|
|
|
|
self.header_info = header |
|
|
|
self.seen = header[3] |
|
weights = np.fromfile(fp, dtype=np.float32) |
|
fp.close() |
|
|
|
ptr = 0 |
|
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)): |
|
if module_def["type"] == "convolutional": |
|
conv_layer = module[0] |
|
if module_def["batch_normalize"]: |
|
|
|
bn_layer = module[1] |
|
num_b = bn_layer.bias.numel() |
|
|
|
bn_b = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.bias) |
|
bn_layer.bias.data.copy_(bn_b) |
|
ptr += num_b |
|
|
|
bn_w = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.weight) |
|
bn_layer.weight.data.copy_(bn_w) |
|
ptr += num_b |
|
|
|
bn_rm = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.running_mean) |
|
bn_layer.running_mean.data.copy_(bn_rm) |
|
ptr += num_b |
|
|
|
bn_rv = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.running_var) |
|
bn_layer.running_var.data.copy_(bn_rv) |
|
ptr += num_b |
|
else: |
|
|
|
num_b = conv_layer.bias.numel() |
|
conv_b = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(conv_layer.bias) |
|
conv_layer.bias.data.copy_(conv_b) |
|
ptr += num_b |
|
|
|
num_w = conv_layer.weight.numel() |
|
conv_w = torch.from_numpy(weights[ptr : ptr + num_w]).view_as(conv_layer.weight) |
|
conv_layer.weight.data.copy_(conv_w) |
|
ptr += num_w |
|
|
|
""" |
|
@:param path - path of the new weights file |
|
@:param cutoff - save layers between 0 and cutoff (cutoff = -1 -> all are saved) |
|
""" |
|
|
|
def save_weights(self, path, cutoff=-1): |
|
|
|
fp = open(path, "wb") |
|
self.header_info[3] = self.seen |
|
self.header_info.tofile(fp) |
|
|
|
|
|
for i, (module_def, module) in enumerate(zip(self.module_defs[:cutoff], self.module_list[:cutoff])): |
|
if module_def["type"] == "convolutional": |
|
conv_layer = module[0] |
|
|
|
if module_def["batch_normalize"]: |
|
bn_layer = module[1] |
|
bn_layer.bias.data.cpu().numpy().tofile(fp) |
|
bn_layer.weight.data.cpu().numpy().tofile(fp) |
|
bn_layer.running_mean.data.cpu().numpy().tofile(fp) |
|
bn_layer.running_var.data.cpu().numpy().tofile(fp) |
|
|
|
else: |
|
conv_layer.bias.data.cpu().numpy().tofile(fp) |
|
|
|
conv_layer.weight.data.cpu().numpy().tofile(fp) |
|
|
|
fp.close() |
|
|