vampnet-opera / scripts /utils /visualize_embeddings.py
hugo flores garcia
recovering from a gittastrophe
41b9d24
"""
TODO: train a linear probe
usage:
python gtzan_embeddings.py --args.load conf/interface.yml --Interface.device cuda --path_to_audio /path/to/audio/labels --output_dir /path/to/output
"""
from pathlib import Path
from typing import List
import audiotools as at
from audiotools import AudioSignal
import argbind
import torch
import numpy as np
import zipfile
import json
from vampnet.interface import Interface
import tqdm
# bind the Interface to argbind
Interface = argbind.bind(Interface)
DEBUG = False
def smart_plotly_export(fig, save_path: Path):
img_format = save_path.suffix[1:]
if img_format == "html":
fig.write_html(save_path)
elif img_format == 'bytes':
return fig.to_image(format='png')
#TODO: come back and make this prettier
elif img_format == 'numpy':
import io
from PIL import Image
def plotly_fig2array(fig):
#convert Plotly fig to an array
fig_bytes = fig.to_image(format="png", width=1200, height=700)
buf = io.BytesIO(fig_bytes)
img = Image.open(buf)
return np.asarray(img)
return plotly_fig2array(fig)
elif img_format == 'jpeg' or 'png' or 'webp':
fig.write_image(save_path)
else:
raise ValueError("invalid image format")
def dim_reduce(annotated_embeddings, layer, output_dir, n_components=3, method="tsne"):
"""
dimensionality reduction for visualization!
saves an html plotly figure to save_path
parameters:
annotated_embeddings (list): the annotated enmbeddings to be reduced; embeddings have shape (samples, features)
labels (list): list of labels for embedding
save_path (str): path where u wanna save ur figure
method (str): umap, tsne, or pca
title (str): title for ur figure
returns:
proj (np.ndarray): projection vector with shape (samples, dimensions)
"""
import pandas as pd
import plotly.express as px
fig_name = f"vampnet-embeddings-layer={layer}"
fig_title = f"{fig_name}_{method}"
save_path = (output_dir / fig_name).with_suffix(".html")
if method == "umap":
from umap import UMAP
reducer = umap.UMAP(n_components=n_components)
elif method == "tsne":
from sklearn.manifold import TSNE
reducer = TSNE(n_components=n_components)
elif method == "pca":
from sklearn.decomposition import PCA
reducer = PCA(n_components=n_components)
else:
raise ValueError(f"invalid method: {method}")
labels = [emb.label for emb in annotated_embeddings]
names = [emb.filename for emb in annotated_embeddings]
embs = [emb.embedding for emb in annotated_embeddings]
embs_at_layer = np.stack(embs)[:, layer, :]
projs = reducer.fit_transform(embs_at_layer)
df = pd.DataFrame(
{
"label": labels,
"name": names,
"x": projs[:, 0],
"y": projs[:, 1],
}
)
if n_components == 2:
fig = px.scatter(
df, x="x", y="y", color="label", hover_name="name", title=fig_title,
)
elif n_components == 3:
df['z'] = projs[:, 2]
fig = px.scatter_3d(
df, x="x", y="y", z="z", color="label", hover_name="name", title=fig_title
)
else:
raise ValueError(f"can't plot {n_components} components")
fig.update_traces(
marker=dict(size=6, line=dict(width=1, color="DarkSlateGrey")),
selector=dict(mode="markers"),
)
return smart_plotly_export(fig, save_path)
# per JukeMIR, we want the emebddings from the middle layer?
def vampnet_embed(sig: AudioSignal, interface: Interface, layer=10):
with torch.inference_mode():
# preprocess the signal
sig = interface.preprocess(sig)
# get the coarse vampnet model
vampnet = interface.coarse
# get the tokens
z = interface.encode(sig)[:, :vampnet.n_codebooks, :]
z_latents = vampnet.embedding.from_codes(z, interface.codec)
# do a forward pass through the model, get the embeddings
_z, embeddings = vampnet(z_latents, return_activations=True)
# print(f"got embeddings with shape {embeddings.shape}")
# [layer, batch, time, n_dims]
# [20, 1, 600ish, 768]
# squeeze batch dim (1 bc layer should be dim 0)
assert embeddings.shape[1] == 1, f"expected batch dim to be 1, got {embeddings.shape[0]}"
embeddings = embeddings.squeeze(1)
num_layers = embeddings.shape[0]
assert layer < num_layers, f"layer {layer} is out of bounds for model with {num_layers} layers"
# do meanpooling over the time dimension
embeddings = embeddings.mean(dim=-2)
# [20, 768]
# return the embeddings
return embeddings
from dataclasses import dataclass, fields
@dataclass
class AnnotatedEmbedding:
label: str
filename: str
embedding: np.ndarray
def save(self, path):
"""Save the Embedding object to a given path as a zip file."""
with zipfile.ZipFile(path, 'w') as archive:
# Save numpy array
with archive.open('embedding.npy', 'w') as f:
np.save(f, self.embedding)
# Save non-numpy data as json
non_numpy_data = {f.name: getattr(self, f.name) for f in fields(self) if f.name != 'embedding'}
with archive.open('data.json', 'w') as f:
f.write(json.dumps(non_numpy_data).encode('utf-8'))
@classmethod
def load(cls, path):
"""Load the Embedding object from a given zip path."""
with zipfile.ZipFile(path, 'r') as archive:
# Load numpy array
with archive.open('embedding.npy') as f:
embedding = np.load(f)
# Load non-numpy data from json
with archive.open('data.json') as f:
data = json.loads(f.read().decode('utf-8'))
return cls(embedding=embedding, **data)
@argbind.bind(without_prefix=True)
def main(
path_to_audio: str = None,
cache_dir: str = "./.emb_cache",
output_dir: str = "./vampnet_embeddings",
layers: List[int] = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19],
method: str = "tsne",
n_components: int = 2,
):
path_to_audio = Path(path_to_audio)
assert path_to_audio.exists(), f"{path_to_audio} does not exist"
cache_dir = Path(cache_dir)
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# load our interface
# argbind will automatically load the default config,
interface = Interface()
# we expect path_to_audio to consist of a folder for each label, so let's get the list of labels
labels = [Path(x).name for x in path_to_audio.iterdir() if x.is_dir()]
print(f"Found {len(labels)} labels")
print(f"labels: {labels}")
# collect audio files, labels, and embeddings
annotated_embeddings = []
for label in labels:
audio_files = list(at.util.find_audio(path_to_audio / label))
print(f"Found {len(audio_files)} audio files for label {label}")
for audio_file in tqdm.tqdm(audio_files, desc=f"embedding label {label}"):
# check if we have a cached embedding for this file
cached_path = cache_dir / f"{label}_{audio_file.stem}.emb"
if cached_path.exists():
# if so, load it
if DEBUG:
print(f"loading cached embedding for {cached_path.stem}")
embedding = AnnotatedEmbedding.load(cached_path)
else:
try:
sig = AudioSignal(audio_file)
except Exception as e:
print(f"failed to load {audio_file.name} with error {e}")
print(f"skipping {audio_file.name}")
continue
# gets the embedding
emb = vampnet_embed(sig, interface).cpu().numpy()
# create an embedding we can save/load
embedding = AnnotatedEmbedding(
label=label, filename=audio_file.name, embedding=emb
)
# cache the embeddings
cached_path.parent.mkdir(exist_ok=True, parents=True)
embedding.save(cached_path)
annotated_embeddings.append(embedding)
# now, let's do a dim reduction on the embeddings and visualize them.
for layer in tqdm.tqdm(layers, desc="dim reduction"):
dim_reduce(
annotated_embeddings,
layer,
output_dir=output_dir,
n_components=n_components,
method=method,
)
if __name__ == "__main__":
args = argbind.parse_args()
with argbind.scope(args):
main()