import random from torch.utils.data.sampler import Sampler, SubsetRandomSampler class SubsetSampler(Sampler): """ Samples elements sequentially from a given list of indices. Args: indices (list): a sequence of indices """ def __init__(self, indices): super().__init__(indices) self.indices = indices def __iter__(self): return (self.indices[i] for i in range(len(self.indices))) def __len__(self): return len(self.indices) class PerfectBatchSampler(Sampler): """ Samples a mini-batch of indices for a balanced class batching Args: dataset_items(list): dataset items to sample from. classes (list): list of classes of dataset_items to sample from. batch_size (int): total number of samples to be sampled in a mini-batch. num_gpus (int): number of GPU in the data parallel mode. shuffle (bool): if True, samples randomly, otherwise samples sequentially. drop_last (bool): if True, drops last incomplete batch. """ def __init__( self, dataset_items, classes, batch_size, num_classes_in_batch, num_gpus=1, shuffle=True, drop_last=False, label_key="class_name", ): super().__init__(dataset_items) assert ( batch_size % (num_classes_in_batch * num_gpus) == 0 ), "Batch size must be divisible by number of classes times the number of data parallel devices (if enabled)." label_indices = {} for idx, item in enumerate(dataset_items): label = item[label_key] if label not in label_indices.keys(): label_indices[label] = [idx] else: label_indices[label].append(idx) if shuffle: self._samplers = [SubsetRandomSampler(label_indices[key]) for key in classes] else: self._samplers = [SubsetSampler(label_indices[key]) for key in classes] self._batch_size = batch_size self._drop_last = drop_last self._dp_devices = num_gpus self._num_classes_in_batch = num_classes_in_batch def __iter__(self): batch = [] if self._num_classes_in_batch != len(self._samplers): valid_samplers_idx = random.sample(range(len(self._samplers)), self._num_classes_in_batch) else: valid_samplers_idx = None iters = [iter(s) for s in self._samplers] done = False while True: b = [] for i, it in enumerate(iters): if valid_samplers_idx is not None and i not in valid_samplers_idx: continue idx = next(it, None) if idx is None: done = True break b.append(idx) if done: break batch += b if len(batch) == self._batch_size: yield batch batch = [] if valid_samplers_idx is not None: valid_samplers_idx = random.sample(range(len(self._samplers)), self._num_classes_in_batch) if not self._drop_last: if len(batch) > 0: groups = len(batch) // self._num_classes_in_batch if groups % self._dp_devices == 0: yield batch else: batch = batch[: (groups // self._dp_devices) * self._dp_devices * self._num_classes_in_batch] if len(batch) > 0: yield batch def __len__(self): class_batch_size = self._batch_size // self._num_classes_in_batch return min(((len(s) + class_batch_size - 1) // class_batch_size) for s in self._samplers)