Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) 2023, Tri Dao. | |
import torch | |
import torch.nn.functional as F | |
import causal_conv1d_cuda | |
class CausalConv1dFn(torch.autograd.Function): | |
def forward(ctx, x, weight, bias=None, activation=None): | |
if activation not in [None, "silu", "swish"]: | |
raise NotImplementedError("activation must be None, silu, or swish") | |
if x.stride(2) != 1 and x.stride(1) != 1: | |
x = x.contiguous() | |
bias = bias.contiguous() if bias is not None else None | |
ctx.save_for_backward(x, weight, bias) | |
ctx.activation = activation in ["silu", "swish"] | |
out = causal_conv1d_cuda.causal_conv1d_fwd(x, weight, bias, ctx.activation) | |
return out | |
def backward(ctx, dout): | |
x, weight, bias = ctx.saved_tensors | |
if dout.stride(2) != 1 and dout.stride(1) != 1: | |
dout = dout.contiguous() | |
# The kernel supports passing in a pre-allocated dx (e.g., in case we want to fuse the | |
# backward of conv1d with the backward of chunk). | |
# Here we just pass in None and dx will be allocated in the C++ code. | |
dx, dweight, dbias = causal_conv1d_cuda.causal_conv1d_bwd( | |
x, weight, bias, dout, None, ctx.activation | |
) | |
return dx, dweight, dbias if bias is not None else None, None | |
def causal_conv1d_fn(x, weight, bias=None, activation=None): | |
""" | |
x: (batch, dim, seqlen) | |
weight: (dim, width) | |
bias: (dim,) | |
activation: either None or "silu" or "swish" | |
out: (batch, dim, seqlen) | |
""" | |
return CausalConv1dFn.apply(x, weight, bias, activation) | |
def causal_conv1d_ref(x, weight, bias=None, activation=None): | |
""" | |
x: (batch, dim, seqlen) | |
weight: (dim, width) | |
bias: (dim,) | |
out: (batch, dim, seqlen) | |
""" | |
if activation not in [None, "silu", "swish"]: | |
raise NotImplementedError("activation must be None, silu, or swish") | |
dtype_in = x.dtype | |
x = x.to(weight.dtype) | |
seqlen = x.shape[-1] | |
dim, width = weight.shape | |
out = F.conv1d(x, weight.unsqueeze(1), bias, padding=width - 1, groups=dim) | |
out = out[..., :seqlen] | |
return (out if activation is None else F.silu(out)).to(dtype=dtype_in) | |
def causal_conv1d_update(x, conv_state, weight, bias=None, activation=None): | |
""" | |
x: (batch, dim) | |
conv_state: (batch, dim, width) | |
weight: (dim, width) | |
bias: (dim,) | |
out: (batch, dim) | |
""" | |
if activation not in [None, "silu", "swish"]: | |
raise NotImplementedError("activation must be None, silu, or swish") | |
activation = activation in ["silu", "swish"] | |
return causal_conv1d_cuda.causal_conv1d_update(x, conv_state, weight, bias, activation) | |
def causal_conv1d_update_ref(x, conv_state, weight, bias=None, activation=None): | |
""" | |
x: (batch, dim) | |
conv_state: (batch, dim, width) | |
weight: (dim, width) | |
bias: (dim,) | |
out: (batch, dim) | |
""" | |
if activation not in [None, "silu", "swish"]: | |
raise NotImplementedError("activation must be None, silu, or swish") | |
dtype_in = x.dtype | |
batch, dim = x.shape | |
width = weight.shape[1] | |
assert conv_state.shape == (batch, dim, width) | |
assert weight.shape == (dim, width) | |
conv_state.copy_(torch.roll(conv_state, shifts=-1, dims=-1)) # Update state (B D W) | |
conv_state[:, :, -1] = x | |
out = torch.sum(conv_state * weight, dim=-1) # (B D) | |
if bias is not None: | |
out += bias | |
return (out if activation is None else F.silu(out)).to(dtype=dtype_in) | |