Spaces:
Running
on
L40S
Running
on
L40S
# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a | |
# copy of this software and associated documentation files (the "Software"), | |
# to deal in the Software without restriction, including without limitation | |
# the rights to use, copy, modify, merge, publish, distribute, sublicense, | |
# and/or sell copies of the Software, and to permit persons to whom the | |
# Software is furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | |
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | |
# DEALINGS IN THE SOFTWARE. | |
# | |
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES | |
# SPDX-License-Identifier: MIT | |
import logging | |
import pathlib | |
from typing import List | |
import numpy as np | |
import torch | |
import torch.distributed as dist | |
import torch.nn as nn | |
from apex.optimizers import FusedAdam, FusedLAMB | |
from torch.nn.modules.loss import _Loss | |
from torch.nn.parallel import DistributedDataParallel | |
from torch.optim import Optimizer | |
from torch.utils.data import DataLoader, DistributedSampler | |
from tqdm import tqdm | |
from se3_transformer.data_loading import QM9DataModule | |
from se3_transformer.model import SE3TransformerPooled | |
from se3_transformer.model.fiber import Fiber | |
from se3_transformer.runtime import gpu_affinity | |
from se3_transformer.runtime.arguments import PARSER | |
from se3_transformer.runtime.callbacks import QM9MetricCallback, QM9LRSchedulerCallback, BaseCallback, \ | |
PerformanceCallback | |
from se3_transformer.runtime.inference import evaluate | |
from se3_transformer.runtime.loggers import LoggerCollection, DLLogger, WandbLogger, Logger | |
from se3_transformer.runtime.utils import to_cuda, get_local_rank, init_distributed, seed_everything, \ | |
using_tensor_cores, increase_l2_fetch_granularity | |
def save_state(model: nn.Module, optimizer: Optimizer, epoch: int, path: pathlib.Path, callbacks: List[BaseCallback]): | |
""" Saves model, optimizer and epoch states to path (only once per node) """ | |
if get_local_rank() == 0: | |
state_dict = model.module.state_dict() if isinstance(model, DistributedDataParallel) else model.state_dict() | |
checkpoint = { | |
'state_dict': state_dict, | |
'optimizer_state_dict': optimizer.state_dict(), | |
'epoch': epoch | |
} | |
for callback in callbacks: | |
callback.on_checkpoint_save(checkpoint) | |
torch.save(checkpoint, str(path)) | |
logging.info(f'Saved checkpoint to {str(path)}') | |
def load_state(model: nn.Module, optimizer: Optimizer, path: pathlib.Path, callbacks: List[BaseCallback]): | |
""" Loads model, optimizer and epoch states from path """ | |
checkpoint = torch.load(str(path), map_location={'cuda:0': f'cuda:{get_local_rank()}'}) | |
if isinstance(model, DistributedDataParallel): | |
model.module.load_state_dict(checkpoint['state_dict']) | |
else: | |
model.load_state_dict(checkpoint['state_dict']) | |
optimizer.load_state_dict(checkpoint['optimizer_state_dict']) | |
for callback in callbacks: | |
callback.on_checkpoint_load(checkpoint) | |
logging.info(f'Loaded checkpoint from {str(path)}') | |
return checkpoint['epoch'] | |
def train_epoch(train_dataloader, loss_fn, epoch_idx, grad_scaler, optimizer, local_rank, callbacks, args): | |
losses = [] | |
for i, batch in tqdm(enumerate(train_dataloader), total=len(train_dataloader), unit='batch', | |
desc=f'Epoch {epoch_idx}', disable=(args.silent or local_rank != 0)): | |
*inputs, target = to_cuda(batch) | |
for callback in callbacks: | |
callback.on_batch_start() | |
with torch.cuda.amp.autocast(enabled=args.amp): | |
pred = model(*inputs) | |
loss = loss_fn(pred, target) / args.accumulate_grad_batches | |
grad_scaler.scale(loss).backward() | |
# gradient accumulation | |
if (i + 1) % args.accumulate_grad_batches == 0 or (i + 1) == len(train_dataloader): | |
if args.gradient_clip: | |
grad_scaler.unscale_(optimizer) | |
torch.nn.utils.clip_grad_norm_(model.parameters(), args.gradient_clip) | |
grad_scaler.step(optimizer) | |
grad_scaler.update() | |
optimizer.zero_grad() | |
losses.append(loss.item()) | |
return np.mean(losses) | |
def train(model: nn.Module, | |
loss_fn: _Loss, | |
train_dataloader: DataLoader, | |
val_dataloader: DataLoader, | |
callbacks: List[BaseCallback], | |
logger: Logger, | |
args): | |
device = torch.cuda.current_device() | |
model.to(device=device) | |
local_rank = get_local_rank() | |
world_size = dist.get_world_size() if dist.is_initialized() else 1 | |
if dist.is_initialized(): | |
model = DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) | |
model.train() | |
grad_scaler = torch.cuda.amp.GradScaler(enabled=args.amp) | |
if args.optimizer == 'adam': | |
optimizer = FusedAdam(model.parameters(), lr=args.learning_rate, betas=(args.momentum, 0.999), | |
weight_decay=args.weight_decay) | |
elif args.optimizer == 'lamb': | |
optimizer = FusedLAMB(model.parameters(), lr=args.learning_rate, betas=(args.momentum, 0.999), | |
weight_decay=args.weight_decay) | |
else: | |
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate, momentum=args.momentum, | |
weight_decay=args.weight_decay) | |
epoch_start = load_state(model, optimizer, args.load_ckpt_path, callbacks) if args.load_ckpt_path else 0 | |
for callback in callbacks: | |
callback.on_fit_start(optimizer, args) | |
for epoch_idx in range(epoch_start, args.epochs): | |
if isinstance(train_dataloader.sampler, DistributedSampler): | |
train_dataloader.sampler.set_epoch(epoch_idx) | |
loss = train_epoch(train_dataloader, loss_fn, epoch_idx, grad_scaler, optimizer, local_rank, callbacks, args) | |
if dist.is_initialized(): | |
loss = torch.tensor(loss, dtype=torch.float, device=device) | |
torch.distributed.all_reduce(loss) | |
loss = (loss / world_size).item() | |
logging.info(f'Train loss: {loss}') | |
logger.log_metrics({'train loss': loss}, epoch_idx) | |
for callback in callbacks: | |
callback.on_epoch_end() | |
if not args.benchmark and args.save_ckpt_path is not None and args.ckpt_interval > 0 \ | |
and (epoch_idx + 1) % args.ckpt_interval == 0: | |
save_state(model, optimizer, epoch_idx, args.save_ckpt_path, callbacks) | |
if not args.benchmark and args.eval_interval > 0 and (epoch_idx + 1) % args.eval_interval == 0: | |
evaluate(model, val_dataloader, callbacks, args) | |
model.train() | |
for callback in callbacks: | |
callback.on_validation_end(epoch_idx) | |
if args.save_ckpt_path is not None and not args.benchmark: | |
save_state(model, optimizer, args.epochs, args.save_ckpt_path, callbacks) | |
for callback in callbacks: | |
callback.on_fit_end() | |
def print_parameters_count(model): | |
num_params_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
logging.info(f'Number of trainable parameters: {num_params_trainable}') | |
if __name__ == '__main__': | |
is_distributed = init_distributed() | |
local_rank = get_local_rank() | |
args = PARSER.parse_args() | |
logging.getLogger().setLevel(logging.CRITICAL if local_rank != 0 or args.silent else logging.INFO) | |
logging.info('====== SE(3)-Transformer ======') | |
logging.info('| Training procedure |') | |
logging.info('===============================') | |
if args.seed is not None: | |
logging.info(f'Using seed {args.seed}') | |
seed_everything(args.seed) | |
logger = LoggerCollection([ | |
DLLogger(save_dir=args.log_dir, filename=args.dllogger_name), | |
WandbLogger(name=f'QM9({args.task})', save_dir=args.log_dir, project='se3-transformer') | |
]) | |
datamodule = QM9DataModule(**vars(args)) | |
model = SE3TransformerPooled( | |
fiber_in=Fiber({0: datamodule.NODE_FEATURE_DIM}), | |
fiber_out=Fiber({0: args.num_degrees * args.num_channels}), | |
fiber_edge=Fiber({0: datamodule.EDGE_FEATURE_DIM}), | |
output_dim=1, | |
tensor_cores=using_tensor_cores(args.amp), # use Tensor Cores more effectively | |
**vars(args) | |
) | |
loss_fn = nn.L1Loss() | |
if args.benchmark: | |
logging.info('Running benchmark mode') | |
world_size = dist.get_world_size() if dist.is_initialized() else 1 | |
callbacks = [PerformanceCallback(logger, args.batch_size * world_size)] | |
else: | |
callbacks = [QM9MetricCallback(logger, targets_std=datamodule.targets_std, prefix='validation'), | |
QM9LRSchedulerCallback(logger, epochs=args.epochs)] | |
if is_distributed: | |
gpu_affinity.set_affinity(gpu_id=get_local_rank(), nproc_per_node=torch.cuda.device_count()) | |
print_parameters_count(model) | |
logger.log_hyperparams(vars(args)) | |
increase_l2_fetch_granularity() | |
train(model, | |
loss_fn, | |
datamodule.train_dataloader(), | |
datamodule.val_dataloader(), | |
callbacks, | |
logger, | |
args) | |
logging.info('Training finished successfully') | |