Spaces:
Runtime error
Runtime error
""" Conv2D w/ SAME padding, CondConv, MixedConv | |
A collection of conv layers and padding helpers needed by EfficientNet, MixNet, and | |
MobileNetV3 models that maintain weight compatibility with original Tensorflow models. | |
Copyright 2020 Ross Wightman | |
""" | |
import collections.abc | |
import math | |
from functools import partial | |
from itertools import repeat | |
from typing import Tuple, Optional | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from .config import * | |
# From PyTorch internals | |
def _ntuple(n): | |
def parse(x): | |
if isinstance(x, collections.abc.Iterable): | |
return x | |
return tuple(repeat(x, n)) | |
return parse | |
_single = _ntuple(1) | |
_pair = _ntuple(2) | |
_triple = _ntuple(3) | |
_quadruple = _ntuple(4) | |
def _is_static_pad(kernel_size, stride=1, dilation=1, **_): | |
return stride == 1 and (dilation * (kernel_size - 1)) % 2 == 0 | |
def _get_padding(kernel_size, stride=1, dilation=1, **_): | |
padding = ((stride - 1) + dilation * (kernel_size - 1)) // 2 | |
return padding | |
def _calc_same_pad(i: int, k: int, s: int, d: int): | |
return max((-(i // -s) - 1) * s + (k - 1) * d + 1 - i, 0) | |
def _same_pad_arg(input_size, kernel_size, stride, dilation): | |
ih, iw = input_size | |
kh, kw = kernel_size | |
pad_h = _calc_same_pad(ih, kh, stride[0], dilation[0]) | |
pad_w = _calc_same_pad(iw, kw, stride[1], dilation[1]) | |
return [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2] | |
def _split_channels(num_chan, num_groups): | |
split = [num_chan // num_groups for _ in range(num_groups)] | |
split[0] += num_chan - sum(split) | |
return split | |
def conv2d_same( | |
x, weight: torch.Tensor, bias: Optional[torch.Tensor] = None, stride: Tuple[int, int] = (1, 1), | |
padding: Tuple[int, int] = (0, 0), dilation: Tuple[int, int] = (1, 1), groups: int = 1): | |
ih, iw = x.size()[-2:] | |
kh, kw = weight.size()[-2:] | |
pad_h = _calc_same_pad(ih, kh, stride[0], dilation[0]) | |
pad_w = _calc_same_pad(iw, kw, stride[1], dilation[1]) | |
x = F.pad(x, [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2]) | |
return F.conv2d(x, weight, bias, stride, (0, 0), dilation, groups) | |
class Conv2dSame(nn.Conv2d): | |
""" Tensorflow like 'SAME' convolution wrapper for 2D convolutions | |
""" | |
# pylint: disable=unused-argument | |
def __init__(self, in_channels, out_channels, kernel_size, stride=1, | |
padding=0, dilation=1, groups=1, bias=True): | |
super(Conv2dSame, self).__init__( | |
in_channels, out_channels, kernel_size, stride, 0, dilation, groups, bias) | |
def forward(self, x): | |
return conv2d_same(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups) | |
class Conv2dSameExport(nn.Conv2d): | |
""" ONNX export friendly Tensorflow like 'SAME' convolution wrapper for 2D convolutions | |
NOTE: This does not currently work with torch.jit.script | |
""" | |
# pylint: disable=unused-argument | |
def __init__(self, in_channels, out_channels, kernel_size, stride=1, | |
padding=0, dilation=1, groups=1, bias=True): | |
super(Conv2dSameExport, self).__init__( | |
in_channels, out_channels, kernel_size, stride, 0, dilation, groups, bias) | |
self.pad = None | |
self.pad_input_size = (0, 0) | |
def forward(self, x): | |
input_size = x.size()[-2:] | |
if self.pad is None: | |
pad_arg = _same_pad_arg(input_size, self.weight.size()[-2:], self.stride, self.dilation) | |
self.pad = nn.ZeroPad2d(pad_arg) | |
self.pad_input_size = input_size | |
if self.pad is not None: | |
x = self.pad(x) | |
return F.conv2d( | |
x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups) | |
def get_padding_value(padding, kernel_size, **kwargs): | |
dynamic = False | |
if isinstance(padding, str): | |
# for any string padding, the padding will be calculated for you, one of three ways | |
padding = padding.lower() | |
if padding == 'same': | |
# TF compatible 'SAME' padding, has a performance and GPU memory allocation impact | |
if _is_static_pad(kernel_size, **kwargs): | |
# static case, no extra overhead | |
padding = _get_padding(kernel_size, **kwargs) | |
else: | |
# dynamic padding | |
padding = 0 | |
dynamic = True | |
elif padding == 'valid': | |
# 'VALID' padding, same as padding=0 | |
padding = 0 | |
else: | |
# Default to PyTorch style 'same'-ish symmetric padding | |
padding = _get_padding(kernel_size, **kwargs) | |
return padding, dynamic | |
def create_conv2d_pad(in_chs, out_chs, kernel_size, **kwargs): | |
padding = kwargs.pop('padding', '') | |
kwargs.setdefault('bias', False) | |
padding, is_dynamic = get_padding_value(padding, kernel_size, **kwargs) | |
if is_dynamic: | |
if is_exportable(): | |
assert not is_scriptable() | |
return Conv2dSameExport(in_chs, out_chs, kernel_size, **kwargs) | |
else: | |
return Conv2dSame(in_chs, out_chs, kernel_size, **kwargs) | |
else: | |
return nn.Conv2d(in_chs, out_chs, kernel_size, padding=padding, **kwargs) | |
class MixedConv2d(nn.ModuleDict): | |
""" Mixed Grouped Convolution | |
Based on MDConv and GroupedConv in MixNet impl: | |
https://github.com/tensorflow/tpu/blob/master/models/official/mnasnet/mixnet/custom_layers.py | |
""" | |
def __init__(self, in_channels, out_channels, kernel_size=3, | |
stride=1, padding='', dilation=1, depthwise=False, **kwargs): | |
super(MixedConv2d, self).__init__() | |
kernel_size = kernel_size if isinstance(kernel_size, list) else [kernel_size] | |
num_groups = len(kernel_size) | |
in_splits = _split_channels(in_channels, num_groups) | |
out_splits = _split_channels(out_channels, num_groups) | |
self.in_channels = sum(in_splits) | |
self.out_channels = sum(out_splits) | |
for idx, (k, in_ch, out_ch) in enumerate(zip(kernel_size, in_splits, out_splits)): | |
conv_groups = out_ch if depthwise else 1 | |
self.add_module( | |
str(idx), | |
create_conv2d_pad( | |
in_ch, out_ch, k, stride=stride, | |
padding=padding, dilation=dilation, groups=conv_groups, **kwargs) | |
) | |
self.splits = in_splits | |
def forward(self, x): | |
x_split = torch.split(x, self.splits, 1) | |
x_out = [conv(x_split[i]) for i, conv in enumerate(self.values())] | |
x = torch.cat(x_out, 1) | |
return x | |
def get_condconv_initializer(initializer, num_experts, expert_shape): | |
def condconv_initializer(weight): | |
"""CondConv initializer function.""" | |
num_params = np.prod(expert_shape) | |
if (len(weight.shape) != 2 or weight.shape[0] != num_experts or | |
weight.shape[1] != num_params): | |
raise (ValueError( | |
'CondConv variables must have shape [num_experts, num_params]')) | |
for i in range(num_experts): | |
initializer(weight[i].view(expert_shape)) | |
return condconv_initializer | |
class CondConv2d(nn.Module): | |
""" Conditional Convolution | |
Inspired by: https://github.com/tensorflow/tpu/blob/master/models/official/efficientnet/condconv/condconv_layers.py | |
Grouped convolution hackery for parallel execution of the per-sample kernel filters inspired by this discussion: | |
https://github.com/pytorch/pytorch/issues/17983 | |
""" | |
__constants__ = ['bias', 'in_channels', 'out_channels', 'dynamic_padding'] | |
def __init__(self, in_channels, out_channels, kernel_size=3, | |
stride=1, padding='', dilation=1, groups=1, bias=False, num_experts=4): | |
super(CondConv2d, self).__init__() | |
self.in_channels = in_channels | |
self.out_channels = out_channels | |
self.kernel_size = _pair(kernel_size) | |
self.stride = _pair(stride) | |
padding_val, is_padding_dynamic = get_padding_value( | |
padding, kernel_size, stride=stride, dilation=dilation) | |
self.dynamic_padding = is_padding_dynamic # if in forward to work with torchscript | |
self.padding = _pair(padding_val) | |
self.dilation = _pair(dilation) | |
self.groups = groups | |
self.num_experts = num_experts | |
self.weight_shape = (self.out_channels, self.in_channels // self.groups) + self.kernel_size | |
weight_num_param = 1 | |
for wd in self.weight_shape: | |
weight_num_param *= wd | |
self.weight = torch.nn.Parameter(torch.Tensor(self.num_experts, weight_num_param)) | |
if bias: | |
self.bias_shape = (self.out_channels,) | |
self.bias = torch.nn.Parameter(torch.Tensor(self.num_experts, self.out_channels)) | |
else: | |
self.register_parameter('bias', None) | |
self.reset_parameters() | |
def reset_parameters(self): | |
init_weight = get_condconv_initializer( | |
partial(nn.init.kaiming_uniform_, a=math.sqrt(5)), self.num_experts, self.weight_shape) | |
init_weight(self.weight) | |
if self.bias is not None: | |
fan_in = np.prod(self.weight_shape[1:]) | |
bound = 1 / math.sqrt(fan_in) | |
init_bias = get_condconv_initializer( | |
partial(nn.init.uniform_, a=-bound, b=bound), self.num_experts, self.bias_shape) | |
init_bias(self.bias) | |
def forward(self, x, routing_weights): | |
B, C, H, W = x.shape | |
weight = torch.matmul(routing_weights, self.weight) | |
new_weight_shape = (B * self.out_channels, self.in_channels // self.groups) + self.kernel_size | |
weight = weight.view(new_weight_shape) | |
bias = None | |
if self.bias is not None: | |
bias = torch.matmul(routing_weights, self.bias) | |
bias = bias.view(B * self.out_channels) | |
# move batch elements with channels so each batch element can be efficiently convolved with separate kernel | |
x = x.view(1, B * C, H, W) | |
if self.dynamic_padding: | |
out = conv2d_same( | |
x, weight, bias, stride=self.stride, padding=self.padding, | |
dilation=self.dilation, groups=self.groups * B) | |
else: | |
out = F.conv2d( | |
x, weight, bias, stride=self.stride, padding=self.padding, | |
dilation=self.dilation, groups=self.groups * B) | |
out = out.permute([1, 0, 2, 3]).view(B, self.out_channels, out.shape[-2], out.shape[-1]) | |
# Literal port (from TF definition) | |
# x = torch.split(x, 1, 0) | |
# weight = torch.split(weight, 1, 0) | |
# if self.bias is not None: | |
# bias = torch.matmul(routing_weights, self.bias) | |
# bias = torch.split(bias, 1, 0) | |
# else: | |
# bias = [None] * B | |
# out = [] | |
# for xi, wi, bi in zip(x, weight, bias): | |
# wi = wi.view(*self.weight_shape) | |
# if bi is not None: | |
# bi = bi.view(*self.bias_shape) | |
# out.append(self.conv_fn( | |
# xi, wi, bi, stride=self.stride, padding=self.padding, | |
# dilation=self.dilation, groups=self.groups)) | |
# out = torch.cat(out, 0) | |
return out | |
def select_conv2d(in_chs, out_chs, kernel_size, **kwargs): | |
assert 'groups' not in kwargs # only use 'depthwise' bool arg | |
if isinstance(kernel_size, list): | |
assert 'num_experts' not in kwargs # MixNet + CondConv combo not supported currently | |
# We're going to use only lists for defining the MixedConv2d kernel groups, | |
# ints, tuples, other iterables will continue to pass to normal conv and specify h, w. | |
m = MixedConv2d(in_chs, out_chs, kernel_size, **kwargs) | |
else: | |
depthwise = kwargs.pop('depthwise', False) | |
groups = out_chs if depthwise else 1 | |
if 'num_experts' in kwargs and kwargs['num_experts'] > 0: | |
m = CondConv2d(in_chs, out_chs, kernel_size, groups=groups, **kwargs) | |
else: | |
m = create_conv2d_pad(in_chs, out_chs, kernel_size, groups=groups, **kwargs) | |
return m | |