import os import random import numpy as np import pandas as pd import pytorch_lightning as pl import torch from PIL import Image, ImageFile from torch.utils.data import DataLoader, Dataset from torchvision.transforms import Resize from . import config, transforms from .utils import cells_to_bboxes from .utils import iou_width_height as iou from .utils import non_max_suppression as nms from .utils import plot_image, xyxy2xywhn, xywhn2xyxy ImageFile.LOAD_TRUNCATED_IMAGES = True class YOLODataset(Dataset): def __init__( self, csv_file, img_dir, label_dir, anchors, image_size=416, S=[13, 26, 52], C=20, transform=None, mosaic_percentage=0.67, ): self.annotations = pd.read_csv(csv_file) self.img_dir = img_dir self.label_dir = label_dir self.image_size = image_size self.mosaic_border = [image_size // 2, image_size // 2] self.transform = transform self.S = S self.anchors = torch.tensor( anchors[0] + anchors[1] + anchors[2] ) # for all 3 scales self.num_anchors = self.anchors.shape[0] self.num_anchors_per_scale = self.num_anchors // 3 self.C = C self.ignore_iou_thresh = 0.5 self.mosaic_percentage = mosaic_percentage def __len__(self): return len(self.annotations) def load_mosaic(self, index): # YOLOv5 4-mosaic loader. Loads 1 image + 3 random images into a 4-image mosaic labels4 = [] s = self.image_size yc, xc = ( int(random.uniform(x, 2 * s - x)) for x in self.mosaic_border ) # mosaic center x, y indices = [index] + random.choices( range(len(self)), k=3 ) # 3 additional image indices random.shuffle(indices) for i, index in enumerate(indices): # Load image label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) bboxes = np.roll( np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 ).tolist() img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) img = np.array(Image.open(img_path).convert("RGB")) h, w = img.shape[0], img.shape[1] labels = np.array(bboxes) # place img in img4 if i == 0: # top left img4 = np.full( (s * 2, s * 2, img.shape[2]), 114, dtype=np.uint8 ) # base image with 4 tiles x1a, y1a, x2a, y2a = ( max(xc - w, 0), max(yc - h, 0), xc, yc, ) # xmin, ymin, xmax, ymax (large image) x1b, y1b, x2b, y2b = ( w - (x2a - x1a), h - (y2a - y1a), w, h, ) # xmin, ymin, xmax, ymax (small image) elif i == 1: # top right x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h elif i == 2: # bottom left x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h) x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h) elif i == 3: # bottom right x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h) x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h) img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b] # img4[ymin:ymax, xmin:xmax] padw = x1a - x1b padh = y1a - y1b # Labels if labels.size: labels[:, :-1] = xywhn2xyxy( labels[:, :-1], w, h, padw, padh ) # normalized xywh to pixel xyxy format labels4.append(labels) # Concat/clip labels labels4 = np.concatenate(labels4, 0) for x in (labels4[:, :-1],): np.clip(x, 0, 2 * s, out=x) # clip when using random_perspective() # img4, labels4 = replicate(img4, labels4) # replicate labels4[:, :-1] = xyxy2xywhn(labels4[:, :-1], 2 * s, 2 * s) labels4[:, :-1] = np.clip(labels4[:, :-1], 0, 1) labels4 = labels4[labels4[:, 2] > 0] labels4 = labels4[labels4[:, 3] > 0] return img4, labels4 def load_single_img(self, index): label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) bboxes = np.roll( np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1 ).tolist() img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) image = np.array(Image.open(img_path).convert("RGB")) return image, bboxes def __getitem__(self, index): if random.random() < self.mosaic_percentage: image, bboxes = self.load_mosaic(index) else: image, bboxes = self.load_single_img(index) if self.transform: augmentations = self.transform(image=image, bboxes=bboxes) image = augmentations["image"] bboxes = augmentations["bboxes"] # e.g. = (3, 13, 13, 6), (3, 26, 26, 6), (3, 52, 52, 6) || 6 = [x, y, w, h, obj, class] for each anchor box targets = [torch.zeros((self.num_anchors // 3, S, S, 6)) for S in self.S] for box in bboxes: iou_anchors = iou(torch.tensor(box[2:4]), self.anchors) anchor_indices = iou_anchors.argsort(descending=True, dim=0) x, y, width, height, class_label = box has_anchor = [False] * 3 # each scale should have one anchor for anchor_idx in anchor_indices: scale_idx = anchor_idx // self.num_anchors_per_scale anchor_on_scale = anchor_idx % self.num_anchors_per_scale S = self.S[scale_idx] i, j = int(S * y), int(S * x) # which cell anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0] if not anchor_taken and not has_anchor[scale_idx]: targets[scale_idx][anchor_on_scale, i, j, 0] = 1 x_cell, y_cell = S * x - j, S * y - i # both between [0,1] width_cell, height_cell = ( width * S, height * S, ) # can be greater than 1 since it's relative to cell box_coordinates = torch.tensor( [x_cell, y_cell, width_cell, height_cell] ) targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label) has_anchor[scale_idx] = True elif ( not anchor_taken and iou_anchors[anchor_idx] > self.ignore_iou_thresh ): targets[scale_idx][ anchor_on_scale, i, j, 0 ] = -1 # ignore prediction return image, tuple(targets) class YOLODataModule(pl.LightningDataModule): def __init__(self, train_csv_path, test_csv_path): super().__init__() self.train_csv_path = train_csv_path self.test_csv_path = test_csv_path self.train_dataset = None self.eval_dataset = None self.test_dataset = None def setup(self, stage=None): self.train_dataset = YOLODataset( self.train_csv_path, transform=transforms.train_transforms, S=[ config.IMAGE_SIZE // 32, config.IMAGE_SIZE // 16, config.IMAGE_SIZE // 8 ], img_dir=config.IMG_DIR, label_dir=config.LABEL_DIR, anchors=config.ANCHORS, mosaic_percentage=config.TRAIN_MOSAIC_PERCENTAGE ) self.eval_dataset = YOLODataset( self.train_csv_path, transform=transforms.test_transforms, S=[ config.IMAGE_SIZE // 32, config.IMAGE_SIZE // 16, config.IMAGE_SIZE // 8 ], img_dir=config.IMG_DIR, label_dir=config.LABEL_DIR, anchors=config.ANCHORS, mosaic_percentage=config.TRAIN_MOSAIC_PERCENTAGE # should be 0? ) self.test_dataset = YOLODataset( self.test_csv_path, transform=transforms.test_transforms, S=[ config.IMAGE_SIZE // 32, config.IMAGE_SIZE // 16, config.IMAGE_SIZE // 8 ], img_dir=config.IMG_DIR, label_dir=config.LABEL_DIR, anchors=config.ANCHORS, mosaic_percentage=config.TEST_MOSAIC_PERCENTAGE ) def train_dataloader(self): return DataLoader( dataset=self.train_dataset, batch_size=config.BATCH_SIZE, shuffle=True, num_workers=config.NUM_WORKERS, pin_memory=config.PIN_MEMORY, drop_last=False ) def val_dataloader(self): return DataLoader( dataset=self.eval_dataset, batch_size=config.BATCH_SIZE, shuffle=False, num_workers=config.NUM_WORKERS, pin_memory=config.PIN_MEMORY, drop_last=False ) def test_dataloader(self): return DataLoader( dataset=self.test_dataset, batch_size=config.BATCH_SIZE, shuffle=False, num_workers=config.NUM_WORKERS, pin_memory=config.PIN_MEMORY, drop_last=False ) def test(): anchors = config.ANCHORS transform = config.test_transforms dataset = YOLODataset( "../data/PASCAL_VOC/2examples.csv", "../data/PASCAL_VOC/images", "../data/PASCAL_VOC/labels", S=[13, 26, 52], anchors=anchors, transform=transform ) S = [13, 26, 52] scaled_anchors = torch.tensor(anchors) / ( 1 / torch.tensor(S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2) ) loader = DataLoader(dataset=dataset, batch_size=1, shuffle=True) for x, y in loader: boxes = [] for i in range(y[0].shape[1]): anchor = scaled_anchors[i] print(anchor.shape) print(y[i].shape) boxes += cells_to_bboxes( y[i], is_preds=False, S=y[i].shape[2], anchors=anchor )[0] boxes = nms(boxes, iou_threshold=1, threshold=0.7, box_format="midpoint") print(boxes) plot_image(x[0].permute(1, 2, 0).to("cpu"), boxes) if __name__ == "__main__": test()