|
""" |
|
Modeling Relational Data with Graph Convolutional Networks |
|
Paper: https://arxiv.org/abs/1703.06103 |
|
Code: https://github.com/tkipf/relational-gcn |
|
Difference compared to tkipf/relation-gcn |
|
* l2norm applied to all weights |
|
* remove nodes that won't be touched |
|
""" |
|
import argparse, gc |
|
import numpy as np |
|
import time |
|
import torch as th |
|
import torch.nn as nn |
|
import dgl.function as fn |
|
import torch.nn.functional as F |
|
import dgl |
|
import dgl.multiprocessing as mp |
|
from torch.nn.parallel import DistributedDataParallel |
|
from dgl import DGLGraph |
|
from functools import partial |
|
|
|
from dgl.data.rdf import AIFBDataset |
|
from src.skeleton.graph_builder import StandaloneGraphBuilder |
|
from src.skeleton.train_type import SamplingGraphTraining |
|
from src.application.rgcn.rgcn import RelGraphEmbedLayer, EntityClassify |
|
from dgl.contrib.hostmap_tensor import HostMapTensor |
|
from src.skeleton.dataloader import Dataloader |
|
import tqdm |
|
|
|
from sklearn.metrics import roc_auc_score |
|
|
|
|
|
''' |
|
这是单机的异构图节点分类任务-Demo: |
|
|
|
适用于: |
|
-- 图的数据量较大,比如100万~1亿点, 1000万~10亿边。 |
|
|
|
class RgcnGraphBuilder 负责加载数据 |
|
class RgcnTrainer 负责训练和预测 |
|
class RgcnTrainingDataLoader 负责做训练采样和数据遍历 |
|
|
|
用户如果需要改动只需要: |
|
|
|
1、改动RgcnGraphBuilder.build_dataset 此方法负责从DGL图中分离训练数据、预测数据、测试数据 |
|
2、改动RgcnTrainer.train 此方法负责训练逻辑 |
|
3、改动RgcnTrainer.evaluate 此方法负责离线预测逻辑 |
|
4、改动RgcnTrainingDataLoader.init 此方法负责输出返回一个迭代遍历器、用于遍历数据集 |
|
|
|
这里使用AIFB数据集做精度对齐(epoch=50, batch_size=128) |
|
社区aifb数据集节点分类测试集精度: Final Test Accuracy: 0.9250 | Test loss: 0.3929 |
|
平台aifb数据集节点分类测试集精度: Final Test Accuracy: 0.9250 | Test loss: 0.2953 |
|
''' |
|
class RgcnGraphBuilder(StandaloneGraphBuilder): |
|
|
|
def build_dataset(self, g): |
|
|
|
hg = g |
|
|
|
num_classes = self.flags.num_classes |
|
|
|
num_rels = len(hg.canonical_etypes) |
|
num_of_ntype = len(hg.ntypes) |
|
|
|
|
|
|
|
|
|
|
|
eids = th.arange(g.number_of_edges()) |
|
|
|
val_size = int(len(eids) * 0.1) |
|
test_size = int(len(eids) * 0.2) |
|
|
|
valid_eids = eids[:val_size] |
|
test_eids = eids[val_size: val_size + test_size] |
|
train_eids = eids[val_size + test_size:] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
node_feats = {} |
|
for ntype in hg.ntypes: |
|
if len(hg.nodes[ntype].data) == 0 or self.flags.node_feats is False: |
|
node_feats[str(hg.get_ntype_id(ntype))] = hg.number_of_nodes(ntype) |
|
else: |
|
assert len(hg.nodes[ntype].data) == 1 |
|
feat = hg.nodes[ntype].data.pop(self.flags.feat) |
|
if feat is not None: |
|
feats = HostMapTensor(ntype + '__' + self.flags.feat, feat) |
|
node_feats[str(hg.get_ntype_id(ntype))] = feats |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
g = dgl.to_homogeneous(hg) |
|
ntype_tensor = g.ndata[dgl.NTYPE] |
|
ntype_tensor.share_memory_() |
|
etype_tensor = g.edata[dgl.ETYPE] |
|
etype_tensor.share_memory_() |
|
typeid_tensor = g.ndata[dgl.NID] |
|
typeid_tensor.share_memory_() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
g.create_formats_() |
|
|
|
g = g.shared_memory('g') |
|
|
|
return g, node_feats, num_of_ntype, num_classes, num_rels, train_eids, valid_eids, test_eids, ntype_tensor, etype_tensor, typeid_tensor |
|
|
|
|
|
class RgcnTrainer(SamplingGraphTraining): |
|
|
|
def train(self, g, dataset, device, n_gpus, proc_id, **kwargs): |
|
|
|
dev_id = -1 if n_gpus == 0 else device.index |
|
queue = kwargs['queue'] if n_gpus > 1 else None |
|
|
|
g, node_feats, num_of_ntype, num_classes, num_rels, train_eids, valid_eids, test_eids, ntype_tensor, etype_tensor, typeid_tensor = dataset |
|
|
|
node_tids = ntype_tensor |
|
world_size = n_gpus |
|
|
|
if n_gpus > 0: |
|
for key in node_feats: |
|
if not isinstance(node_feats[key], int): |
|
node_feats[key].uva(device) |
|
|
|
if n_gpus == 1: |
|
g = g.to(device) |
|
|
|
if n_gpus > 1: |
|
|
|
g = g.uva(device) |
|
dist_init_method = 'tcp://{master_ip}:{master_port}'.format( |
|
master_ip='127.0.0.1', master_port=self.flags.master_port) |
|
|
|
th.distributed.init_process_group(backend=self.flags.communication_backend, |
|
init_method=dist_init_method, |
|
world_size=world_size, |
|
rank=proc_id) |
|
|
|
|
|
|
|
embed_layer = RelGraphEmbedLayer(dev_id if self.flags.embedding_gpu or not self.flags.dgl_sparse else -1, |
|
dev_id, |
|
g.number_of_nodes(), |
|
node_tids, |
|
num_of_ntype, |
|
node_feats, |
|
self.flags.num_hidden, |
|
dgl_sparse=self.flags.dgl_sparse) |
|
|
|
|
|
loss_fcn = CrossEntropyLoss() |
|
|
|
|
|
|
|
model = EntityClassify(dev_id, |
|
g.number_of_nodes(), |
|
self.flags.num_hidden, |
|
num_classes, |
|
num_rels, |
|
num_bases=self.flags.num_bases, |
|
num_hidden_layers=self.flags.num_layers - 2, |
|
dropout=self.flags.dropout, |
|
use_self_loop=self.flags.use_self_loop, |
|
low_mem=self.flags.low_mem, |
|
layer_norm=self.flags.layer_norm) |
|
|
|
if n_gpus == 1: |
|
th.cuda.set_device(dev_id) |
|
|
|
model.cuda(dev_id) |
|
if self.flags.dgl_sparse: |
|
embed_layer.cuda(dev_id) |
|
|
|
elif n_gpus > 1: |
|
|
|
if dev_id >= 0: |
|
model.cuda(dev_id) |
|
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id) |
|
if self.flags.dgl_sparse: |
|
embed_layer.cuda(dev_id) |
|
if len(list(embed_layer.parameters())) > 0: |
|
embed_layer = DistributedDataParallel(embed_layer, device_ids=[dev_id], output_device=dev_id) |
|
else: |
|
if len(list(embed_layer.parameters())) > 0: |
|
embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None) |
|
|
|
|
|
dense_params = list(model.parameters()) |
|
if self.flags.node_feats: |
|
if n_gpus > 1: |
|
dense_params += list(embed_layer.module.embeds.parameters()) |
|
else: |
|
dense_params += list(embed_layer.embeds.parameters()) |
|
optimizer = th.optim.Adam(dense_params, lr=self.flags.lr, weight_decay=self.flags.l2norm) |
|
|
|
if self.flags.dgl_sparse: |
|
all_params = list(model.parameters()) + list(embed_layer.parameters()) |
|
optimizer = th.optim.Adam(all_params, lr=self.flags.lr, weight_decay=self.flags.l2norm) |
|
if n_gpus > 1 and isinstance(embed_layer, DistributedDataParallel): |
|
dgl_emb = embed_layer.module.dgl_emb |
|
else: |
|
dgl_emb = embed_layer.dgl_emb |
|
emb_optimizer = dgl.optim.SparseAdam(params=dgl_emb, lr=self.flags.sparse_lr, eps=1e-8) if len(dgl_emb) > 0 else None |
|
else: |
|
if n_gpus > 1: |
|
embs = list(embed_layer.module.node_embeds.parameters()) |
|
else: |
|
embs = list(embed_layer.node_embeds.parameters()) |
|
emb_optimizer = th.optim.SparseAdam(embs, lr=self.flags.sparse_lr) if len(embs) > 0 else None |
|
|
|
ntype_tensor = ntype_tensor.to(device) |
|
etype_tensor = etype_tensor.to(device) |
|
typeid_tensor = typeid_tensor.to(device) |
|
train_eids = train_eids.to(device) |
|
valid_eids = valid_eids.to(device) |
|
test_eids = test_eids.to(device) |
|
|
|
dataset = train_eids, valid_eids, test_eids, device |
|
dataloader = RgcnTrainingDataLoader(self.flags).init(g, dataset) |
|
loader, val_loader, test_loader = dataloader |
|
|
|
|
|
print("start training...") |
|
forward_time = [] |
|
backward_time = [] |
|
|
|
train_time = 0 |
|
validation_time = 0 |
|
test_time = 0 |
|
last_val_acc = 0.0 |
|
do_test = False |
|
|
|
for epoch in range(self.flags.num_epochs): |
|
|
|
if n_gpus > 1: |
|
loader.set_epoch(epoch) |
|
|
|
tstart = time.time() |
|
model.train() |
|
embed_layer.train() |
|
|
|
|
|
for i, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(loader): |
|
|
|
|
|
|
|
|
|
|
|
|
|
for block in blocks: |
|
gen_norm(block, ntype_tensor, etype_tensor, typeid_tensor) |
|
|
|
t0 = time.time() |
|
feats = embed_layer(blocks[0].srcdata[dgl.NID], |
|
blocks[0].srcdata['ntype'], |
|
blocks[0].srcdata['type_id'], |
|
node_feats) |
|
blocks = [block.int().to(device) for block in blocks] |
|
|
|
|
|
pos_graph = pos_graph.to(device) |
|
neg_graph = neg_graph.to(device) |
|
batch_pred = model(blocks, feats) |
|
|
|
f_step = time.time() |
|
loss = loss_fcn(batch_pred, pos_graph, neg_graph) |
|
|
|
|
|
|
|
|
|
t1 = time.time() |
|
optimizer.zero_grad() |
|
if emb_optimizer is not None: |
|
emb_optimizer.zero_grad() |
|
|
|
loss.backward() |
|
if emb_optimizer is not None: |
|
emb_optimizer.step() |
|
optimizer.step() |
|
t2 = time.time() |
|
|
|
forward_time.append(t1 - t0) |
|
backward_time.append(t2 - t1) |
|
|
|
if i % 100 == 0 and proc_id == 0: |
|
print("Train Loss: {:.4f}". |
|
format(loss.item())) |
|
|
|
|
|
|
|
print("Epoch {:05d}:{:05d} | Train Forward Time(s) {:.4f} | Backward Time(s) {:.4f}". |
|
format(epoch, self.flags.num_epochs, forward_time[-1], backward_time[-1])) |
|
tend = time.time() |
|
train_time += (tend - tstart) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("{}/{} Mean forward time: {:4f}".format(proc_id, n_gpus, |
|
np.mean(forward_time[len(forward_time) // 4:]))) |
|
print("{}/{} Mean backward time: {:4f}".format(proc_id, n_gpus, |
|
np.mean(backward_time[len(backward_time) // 4:]))) |
|
|
|
|
|
|
|
|
|
def _evaluate(self, n_gpus, labels, queue, proc_id, model, embed_layer, |
|
data_loader, node_feats, inv_target, mode): |
|
|
|
tstart = time.time() |
|
time_cost = 0 |
|
acc = 0 |
|
loss = 0 |
|
logits, seeds = evaluate(model, embed_layer, |
|
data_loader, node_feats, |
|
inv_target) |
|
if queue is not None: |
|
queue.put((logits, seeds)) |
|
|
|
if proc_id == 0: |
|
loss, acc = self._collect_eval(n_gpus, labels, queue) if queue is not None else \ |
|
(F.cross_entropy(logits, labels[seeds].cpu()).item(), \ |
|
th.sum(logits.argmax(dim=1) == labels[seeds].cpu()).item() / len(seeds)) |
|
|
|
print("{} Accuracy: {:.4f} | {} loss: {:.4f}".format(mode, acc, mode, loss)) |
|
|
|
tend = time.time() |
|
time_cost = (tend-tstart) |
|
return acc, loss, time_cost |
|
|
|
def _collect_eval(self, n_gpus, labels, queue): |
|
|
|
eval_logits = [] |
|
eval_seeds = [] |
|
for i in range(n_gpus): |
|
|
|
log = queue.get() |
|
eval_l, eval_s = log |
|
eval_logits.append(eval_l) |
|
eval_seeds.append(eval_s) |
|
|
|
eval_logits = th.cat(eval_logits) |
|
eval_seeds = th.cat(eval_seeds) |
|
eval_loss = F.cross_entropy(eval_logits, labels[eval_seeds].cpu()).item() |
|
eval_acc = th.sum(eval_logits.argmax(dim=1) == labels[eval_seeds].cpu()).item() / len(eval_seeds) |
|
return eval_loss, eval_acc |
|
|
|
class RgcnTrainingDataLoader(Dataloader): |
|
|
|
def init(self, g, dataset): |
|
|
|
train_eids, valid_eids, test_eids, device = dataset |
|
|
|
|
|
|
|
|
|
n_gpus = len(list(map(int, self.flags.gpu.split(',')))) |
|
|
|
|
|
fanouts = [int(fanout) for fanout in self.flags.fanout.split(',')] |
|
|
|
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts) |
|
|
|
loader = dgl.dataloading.EdgeDataLoader( |
|
g, train_eids, sampler, |
|
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
|
batch_size=self.flags.batch_size, |
|
device=device, |
|
use_ddp=n_gpus > 1, |
|
shuffle=True, |
|
drop_last=False, |
|
num_workers=self.flags.num_workers) |
|
|
|
val_loader = dgl.dataloading.EdgeDataLoader( |
|
g, valid_eids, sampler, |
|
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
|
batch_size=self.flags.batch_size, |
|
device=device, |
|
use_ddp=n_gpus > 1, |
|
shuffle=False, |
|
drop_last=False, |
|
num_workers=self.flags.num_workers) |
|
|
|
test_loader = dgl.dataloading.EdgeDataLoader( |
|
g, test_eids, sampler, |
|
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
|
batch_size=self.flags.batch_size, |
|
device=device, |
|
use_ddp=n_gpus > 1, |
|
shuffle=True, |
|
drop_last=False, |
|
num_workers=self.flags.num_workers) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return loader, val_loader, test_loader |
|
|
|
|
|
def gen_norm(g, ntype_tensor, etype_tensor, typeid_tensor): |
|
|
|
_, v, eid = g.all_edges(form='all') |
|
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True) |
|
degrees = count[inverse_index] |
|
norm = th.ones(eid.shape[0], device=eid.device) / degrees |
|
norm = norm.unsqueeze(1) |
|
g.edata['norm'] = norm |
|
|
|
g.srcdata['ntype'] = ntype_tensor[g.srcdata[dgl.NID]] |
|
g.edata['etype'] = etype_tensor[eid] |
|
g.srcdata['type_id'] = typeid_tensor[g.srcdata[dgl.NID]] |
|
|
|
|
|
def evaluate(model, embed_layer, eval_loader, node_feats, inv_target): |
|
|
|
model.eval() |
|
embed_layer.eval() |
|
eval_logits = [] |
|
eval_seeds = [] |
|
|
|
with th.no_grad(): |
|
th.cuda.empty_cache() |
|
for i, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(eval_loader): |
|
|
|
for block in blocks: |
|
gen_norm(block) |
|
|
|
feats = embed_layer(blocks[0].srcdata[dgl.NID], |
|
blocks[0].srcdata['ntype'], |
|
blocks[0].srcdata['type_id'], |
|
node_feats) |
|
logits = model(blocks, feats) |
|
|
|
loss_fcn = AUC() |
|
auc = loss_fcn(logits, pos_graph, neg_graph) |
|
print("valid auc: {:.4f}". |
|
format(auc.item())) |
|
|
|
|
|
|
|
eval_logits = th.cat(eval_logits) |
|
eval_seeds = th.cat(eval_seeds) |
|
|
|
return eval_logits, eval_seeds |
|
|
|
|
|
class CrossEntropyLoss(nn.Module): |
|
|
|
def forward(self, block_outputs, pos_graph, neg_graph): |
|
|
|
with pos_graph.local_scope(): |
|
pos_graph.ndata['h'] = block_outputs |
|
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
|
pos_score = pos_graph.edata['score'] |
|
with neg_graph.local_scope(): |
|
neg_graph.ndata['h'] = block_outputs |
|
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
|
neg_score = neg_graph.edata['score'] |
|
|
|
score = th.cat([pos_score, neg_score]) |
|
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long() |
|
loss = F.binary_cross_entropy_with_logits(score, label.float()) |
|
return loss |
|
|
|
|
|
class AUC(nn.Module): |
|
|
|
def forward(self, block_outputs, pos_graph, neg_graph): |
|
|
|
with pos_graph.local_scope(): |
|
pos_graph.ndata['h'] = block_outputs |
|
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
|
pos_score = pos_graph.edata['score'] |
|
with neg_graph.local_scope(): |
|
neg_graph.ndata['h'] = block_outputs |
|
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
|
neg_score = neg_graph.edata['score'] |
|
|
|
score = th.cat([pos_score, neg_score]).numpy() |
|
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).numpy() |
|
|
|
return roc_auc_score(label, score) |
|
|