DGL使用指南
Chapter 1: Graph
1.1 圖相關的基本定義
圖G=(V,E)用來表示實體和實體之間的關系,其中V是節點集合,E是邊的集合。根據邊是否是有向邊,圖可以分為有向圖和無向圖。根據節點類型是否相同,可以分為同構圖和異構圖。例如,商場里的買家、賣家和商品之間的相互關系可以構成異構圖。多重圖是指兩個節點之間可以存在多條邊。
1.2 dgl中的圖、節點、邊的表示
dgl中不同的節點使用不同的整數(節點id)來表示,邊用節點對來表示,每條邊會根據添加的先后順序被分配一個邊的id。節點id和邊id都是從0開始。在dgl中每條邊都是有向的:邊(u,v)表示邊的方向是從u指向v。
使用dgl.graph()創建圖的例子:
>>> import dgl
>>> import torch as th
>>> # edges 0->1, 0->2, 0->3, 1->3
>>> u, v = th.tensor([0, 0, 0, 1]), th.tensor([1, 2, 3, 3])
>>> g = dgl.graph((u, v))
>>> print(g) # number of nodes are inferred from the max node IDs in the given edges
Graph(num_nodes=4, num_edges=4,
ndata_schemes={}
edata_schemes={})
>>> # Node IDs
>>> print(g.nodes())
tensor([0, 1, 2, 3])
>>> # Edge end nodes
>>> print(g.edges())
(tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3]))
>>> print(g.edges(form='all')) # 打印邊的源和目的節點,以及邊的序號
(tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3]), tensor([0, 1, 2, 3]))
>>> # If the node with the largest ID is isolated (meaning no edges),
>>> # then one needs to explicitly set the number of nodes
>>> g = dgl.graph((u, v), num_nodes=8)
>>> bg = dgl.to_bidirected(g) # 將有向圖轉為無向(雙向)二部圖
>>> bg.all_edges(form='all')
(tensor([0, 0, 0, 1, 1, 2, 3, 3]), tensor([1, 2, 3, 0, 3, 0, 0, 1]),
tensor([0, 1, 2, 3, 4, 5, 6, 7]))
DGL可以使用64位或32位整數保存節點和邊id,但是邊id的類型和節點id的類型必須是相同的。如果使用64位數值,最多可以表示263-1(42.9億)的節點或邊。如果節點少於231-1,使用32位整型可以減少內存消耗並提高計算速度。
DGL 可以通過如下方式進行轉換:
>>> edges = th.tensor([2, 5, 3]), th.tensor([3, 5, 0]) # edges 2->3, 5->5, 3->0
>>> g64 = dgl.graph(edges) # DGL uses int64 by default
>>> print(g64.idtype)
torch.int64
>>> g32 = dgl.graph(edges, idtype=th.int32) # create a int32 graph
>>> g32.idtype
torch.int32
>>> g64_2 = g32.long() # convert to int64
>>> g64_2.idtype
torch.int64
>>> g32_2 = g64.int() # convert to int32
>>> g32_2.idtype
torch.int32
1.3 DGL中節點特征和邊的特征
DGL中可以給節點和邊添加特征,節點特征可以訪問ndata,邊特征可以訪問edata。
01. >>> import dgl
02. >>> import torch as th
03. >>> g = dgl.graph(([0, 0, 1, 5], [1, 2, 2, 0])) # 6 nodes, 4 edges
04. >>> g
Graph(num_nodes=6, num_edges=4,
ndata_schemes={}
edata_schemes={})
05. >>> g.ndata['x'] = th.ones(g.num_nodes(), 3) # node feature of length 3
06. >>> g.edata['x'] = th.ones(g.num_edges(), dtype=th.int32) # scalar integer feature
07. >>> g
Graph(num_nodes=6, num_edges=4,
ndata_schemes={'x' : Scheme(shape=(3,), dtype=torch.float32)}
edata_schemes={'x' : Scheme(shape=(,), dtype=torch.int32)})
08. >>> # different names can have different shapes
09. >>> g.ndata['y'] = th.randn(g.num_nodes(), 5)
10. >>> g.ndata['x'][1] # get node 1's feature
tensor([1., 1., 1.])
11. >>> g.edata['x'][th.tensor([0, 3])] # get features of edge 0 and 3
tensor([1, 1], dtype=torch.int32)
主意:
- 只能使用數值型的特征。
- 可以對整個圖的邊或節點創建特征,也可以對一個節點或邊創建特征,不能對邊或節點的子集創建特征。
- 名稱相同的特征的在每個節點的維度和類型要相同。
>>> # edges 0->1, 0->2, 0->3, 1->3
>>> edges = th.tensor([0, 0, 0, 1]), th.tensor([1, 2, 3, 3])
>>> weights = th.tensor([0.1, 0.6, 0.9, 0.7]) # weight of each edge
>>> g = dgl.graph(edges)
>>> g.edata['w'] = weights # give it a name 'w'
>>> g
Graph(num_nodes=4, num_edges=4,
ndata_schemes={}
edata_schemes={'w' : Scheme(shape=(,), dtype=torch.float32)})
1.4 其它建圖方式
從稀疏矩陣創建圖
>>> import dgl
>>> import torch as th
>>> import scipy.sparse as sp
>>> spmat = sp.rand(100, 100, density=0.05) # 5% nonzero entries
>>> dgl.from_scipy(spmat) # from SciPy
Graph(num_nodes=100, num_edges=500,
ndata_schemes={}
edata_schemes={})
從networkx 創建圖
>>> import networkx as nx
>>> nx_g = nx.path_graph(5) # a chain 0-1-2-3-4
>>> dgl.from_networkx(nx_g) # from networkx
Graph(num_nodes=5, num_edges=8,
ndata_schemes={}
edata_schemes={})
也可以使用save_graphs和load_graphs api來保存和加載DGL二進制圖文件。
1.5 異構圖

在DGL中每條關系使用三元組來表示(source node type, edge type, destination node type)
>>> import dgl
>>> import torch as th
>>> # Create a heterograph with 3 node types and 3 edges types.
>>> graph_data = {
... ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])),
... ('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])),
... ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))
... }
>>> g = dgl.heterograph(graph_data)
>>> g.ntypes
['disease', 'drug', 'gene']
>>> g.etypes
['interacts', 'interacts', 'treats']
>>> g.canonical_etypes
[('drug', 'interacts', 'drug'),
('drug', 'interacts', 'gene'),
('drug', 'treats', 'disease')]
同構圖和二部圖是特殊的異構圖
>>> # A homogeneous graph 頭尾節點類型相同
>>> dgl.heterograph({('node_type', 'edge_type', 'node_type'): (u, v)})
>>> # A bipartite graph
>>> dgl.heterograph({('source_type', 'edge_type', 'destination_type'): (u, v)})
圖的結構(metagraph)
>>> g
Graph(num_nodes={'disease': 3, 'drug': 3, 'gene': 4},
num_edges={('drug', 'interacts', 'drug'): 2,
('drug', 'interacts', 'gene'): 2,
('drug', 'treats', 'disease'): 1},
metagraph=[('drug', 'drug', 'interacts'),
('drug', 'gene', 'interacts'),
('drug', 'disease', 'treats')])
>>> g.metagraph().edges()
OutMultiEdgeDataView([('drug', 'drug'), ('drug', 'gene'), ('drug', 'disease')])
在dgl中不同類型的節點id是分開計數的,所以在使用nodes api時必須指定節點類型:
>>> # Get the number of all nodes in the graph
>>> g.num_nodes()
10
>>> # Get the number of drug nodes
>>> g.num_nodes('drug')
3
>>> # Nodes of different types have separate IDs,
>>> # hence not well-defined without a type specified
>>> g.nodes()
DGLError: Node type name must be specified if there are more than one node types.
>>> g.nodes('drug')
tensor([0, 1, 2])
設置和讀取異構圖中的節點特征和邊特征:
>>> # Set/get feature 'hv' for nodes of type 'drug'
>>> g.nodes['drug'].data['hv'] = th.ones(3, 1)
>>> g.nodes['drug'].data['hv']
tensor([[1.],
[1.],
[1.]])
>>> # Set/get feature 'he' for edge of type 'treats'
>>> g.edges['treats'].data['he'] = th.zeros(1, 1)
>>> g.edges['treats'].data['he']
tensor([[0.]])
當只有一種節點類型時,就不用指定:
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])),
... ('drug', 'is similar', 'drug'): (th.tensor([0, 1]), th.tensor([2, 3]))
... })
>>> g.nodes()
tensor([0, 1, 2, 3])
>>> # To set/get feature with a single type, no need to use the new syntax
>>> g.ndata['hv'] = th.ones(4, 1)
通過指定特定的關系,從異構圖中創建子圖:
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])),
... ('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])),
... ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))
... })
>>> g.nodes['drug'].data['hv'] = th.ones(3, 1)
>>> # Retain relations ('drug', 'interacts', 'drug') and ('drug', 'treats', 'disease')
>>> # All nodes for 'drug' and 'disease' will be retained
>>> eg = dgl.edge_type_subgraph(g, [('drug', 'interacts', 'drug'),
... ('drug', 'treats', 'disease')])
>>> eg
Graph(num_nodes={'disease': 3, 'drug': 3},
num_edges={('drug', 'interacts', 'drug'): 2, ('drug', 'treats', 'disease'): 1},
metagraph=[('drug', 'drug', 'interacts'), ('drug', 'disease', 'treats')])
>>> # The associated features will be copied as well
>>> eg.nodes['drug'].data['hv']
tensor([[1.],
[1.],
[1.]])
異構圖轉同構圖:
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])),
... ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))})
>>> g.nodes['drug'].data['hv'] = th.zeros(3, 1)
>>> g.nodes['disease'].data['hv'] = th.ones(3, 1)
>>> g.edges['interacts'].data['he'] = th.zeros(2, 1)
>>> g.edges['treats'].data['he'] = th.zeros(1, 2)
>>> # By default, it does not merge any features
>>> hg = dgl.to_homogeneous(g)
>>> 'hv' in hg.ndata
False
>>> # Copy edge features
>>> # For feature copy, it expects features to have
>>> # the same size and dtype across node/edge types
>>> hg = dgl.to_homogeneous(g, edata=['he'])
DGLError: Cannot concatenate column ‘he’ with shape Scheme(shape=(2,), dtype=torch.float32) and shape Scheme(shape=(1,), dtype=torch.float32)
>>> # Copy node features
>>> hg = dgl.to_homogeneous(g, ndata=['hv'])
>>> hg.ndata['hv']
tensor([[1.],
[1.],
[1.],
[0.],
[0.],
[0.]])
The original node/edge types and type-specific IDs are stored in :py:attr:`~dgl.DGLGraph.ndata` and :py:attr:`~dgl.DGLGraph.edata`.
查看轉換后同構圖里節點和邊在原異構圖中的ID和類型:
>>> # Order of node types in the heterograph
>>> g.ntypes
['disease', 'drug']
>>> # Original node types
>>> hg.ndata[dgl.NTYPE]
tensor([0, 0, 0, 1, 1, 1])
>>> # Original type-specific node IDs
>>> hg.ndata[dgl.NID]
>>> tensor([0, 1, 2, 0, 1, 2])
>>> # Order of edge types in the heterograph
>>> g.etypes
['interacts', 'treats']
>>> # Original edge types
>>> hg.edata[dgl.ETYPE]
tensor([0, 0, 1])
>>> # Original type-specific edge IDs
>>> hg.edata[dgl.EID]
tensor([0, 1, 0])
1.6 在GPU上使用DGL
>>> import dgl
>>> import torch as th
>>> u, v = th.tensor([0, 1, 2]), th.tensor([2, 3, 4])
>>> g = dgl.graph((u, v))
>>> g.ndata['x'] = th.randn(5, 3) # original feature is on CPU
>>> g.device
device(type='cpu')
>>> cuda_g = g.to('cuda:0') # accepts any device objects from backend framework
>>> cuda_g.device
device(type='cuda', index=0)
>>> cuda_g.ndata['x'].device # feature data is copied to GPU too
device(type='cuda', index=0)
>>> # A graph constructed from GPU tensors is also on GPU
>>> u, v = u.to('cuda:0'), v.to('cuda:0')
>>> g = dgl.graph((u, v))
>>> g.device
device(type='cuda', index=0)
CPU上的tensor不能傳給GPU上的圖,只能先把數據復制到同一設備:
>>> cuda_g.in_degrees()
tensor([0, 0, 1, 1, 1], device='cuda:0')
>>> cuda_g.in_edges([2, 3, 4]) # ok for non-tensor type arguments
(tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0'))
>>> cuda_g.in_edges(th.tensor([2, 3, 4]).to('cuda:0')) # tensor type must be on GPU
(tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0'))
>>> cuda_g.ndata['h'] = th.randn(5, 4) # ERROR! feature must be on GPU too!
DGLError: Cannot assign node feature "h" on device cpu to a graph on device
cuda:0. Call DGLGraph.to() to copy the graph to the same device.
Chapter 2: Message Passing
消息傳遞的范式
\(x_v\in\mathbb{R}^{d_1}\)是節點\(v\)的特征,\(w_{e}\in\mathbb{R}^{d_2}\)是邊\(({u}, {v})\)的特征,邊和節點的消息聚合范式如下:
其中\(\phi\)是邊上的消息函數,\(\psi\)是更新函數函數,\(\rho\)是聚合函數。\(\phi\)決定着使用節點和邊的特征生產何種消息,\(\rho\)決定着對鄰居節點傳來的消息如何聚合,\(\psi\)決定着如何利用第t步的值來對第t+1步進行更新。
DGL中內建的消息傳播范式
在dgl.function空間下,DGL實現了常用的消息函數(message functions)和聚合函數(reduce functions),這些自帶的函數已經進行了大量的優化,推薦優先使用。
目前內建消息函數支持一元運算支持:copy,二元運算支持:add,sub,mul,div,dot
除了內建函數,也可以自定義消息函數和聚合函數(UDF),命名規范通常是用u代表起點,v代表終點,e代表邊。例如要實現將起點的hu特征加上終點的hv特征,最后傳給邊的he特征:
# 內建函數
dgl.function.u_add_v('hu', 'hv', 'he')
# UDF
def message_func(edges):
return {'he': edges.src['hu'] + edges.dst['hv']}
目前內建的聚合函數支持:sum,max,min,prod,mean。reduce函數通常有兩個參數:mailbox和dst。例如:
# 內建聚合函數
dgl.function.sum('m', 'h')
# UDF
import torch
def reduce_func(nodes):
return {'h': torch.sum(nodes.mailbox['m'], dim=1)}
# dgl中,對邊的更新使用apply_edges(),需要傳入一個消息函數
import dgl.function as fn
graph.apply_edges(fn.u_add_v('el', 'er', 'e'))
對節點的更新使用update_all(),需要消息函數、聚合函數和更新函數作為參數。
def updata_all_example(graph):
# store the result in graph.ndata['ft']
graph.update_all(fn.u_mul_e('ft', 'a', 'm'),
fn.sum('m', 'ft'))
# Call update function outside of update_all
final_ft = graph.ndata['ft'] * 2
return final_ft
更新過程可以用數學公式表示為:\({final\_ft}_i = 2 * \sum_{j\in\mathcal{N}(i)} ({ft}_j * a_{ij})\)
編寫高效的消息傳遞代碼
DGL對消息傳遞的內存消耗和運算速度都進行了優化:
- apply_edges 和update_all 矩陣乘法並行運算
- sampling out the node feature using entry index. (Memory and speed optimization)
- 對消息函數和聚合函數合並到一個運算單元,避免邊上的消息處理過程。
為了達到好的運算性能,推薦使用內建函數來組合自定義的消息傳遞過程。
對於以下兩種更新方式:
linear = nn.Parameter(th.FloatTensor(size=(1, node_feat_dim*2)))
def concat_message_function(edges):
{'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])}
g.apply_edges(concat_message_function)
g.edata['out'] = g.edata['cat_feat'] * linear
linear = nn.Parameter(th.FloatTensor(size=(1, node_feat_dim*2)))
def concat_message_function(edges):
{'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])}
g.apply_edges(concat_message_function)
g.edata['out'] = g.edata['cat_feat'] * linear
雖然結果都一樣,但是第二種比一種運算時更高效,因為通常邊的數量會遠大於節點的數量,而第二種減少了將中間結果保存在邊上的過程,從而減少了內存消耗。
在子圖上進行消息傳遞
nid = [0, 2, 3, 6, 7, 9]
sg = g.subgraph(nid)
sg.update_all(message_func, reduce_func, apply_node_func)
在小批量迭代訓練過程中,經常會使用子圖的消息傳遞。
在消息傳遞過程中加入邊的權值
graph.edata['a'] = affinity
graph.update_all(fn.u_mul_e('ft', 'a', 'm'),
fn.sum('m', 'ft'))
異構圖上的消息傳遞
for c_etype in G.canonical_etypes:
srctype, etype, dsttype = c_etype
Wh = self.weight[etype](feat_dict[srctype])
# Save it in graph for message passing
G.nodes[srctype].data['Wh_%s' % etype] = Wh
# Specify per-relation message passing functions: (message_func, reduce_func).
# Note that the results are saved to the same destination feature 'h', which
# hints the type wise reducer for aggregation.
funcs[etype] = (fn.copy_u('Wh_%s' % etype, 'm'), fn.mean('m', 'h'))
# Trigger message passing of multiple types.
G.multi_update_all(funcs, 'sum')
# return the updated node feature dictionary
return {ntype : G.nodes[ntype].data['h'] for ntype in G.ntypes}
Chapter 3: Building GNN Modules
傳統的深度學習網絡模型中的輸入和參數的維度都是固定的,但是在GNN網絡中,每個節點的鄰居個數可能是不一樣的,因此需要做一些特殊的處理,需要將數據維度處理成固定的維度。DGL的作用在於對圖數據結構的提供一些便利的操作方法。通常會使用DGL來處理處理圖中節點和邊的消息傳遞過程,而對應的tensor運算由pytorch和tensorflow等框架完成。
DGL nn模塊的構造函數
指定輸出,輸出和隱藏層的維度,包括源節點和目的節點的維度。
設置不同邊上的消息聚合方法:mean,sum,max, min,lstm。
正則化方法,例如l2正則:\(h_v = h_v / \lVert h_v \rVert_2\)
import torch as th
from torch import nn
from torch.nn import init
from .... import function as fn
from ....base import DGLError
from ....utils import expand_as_pair, check_eq_shape
class SAGEConv(nn.Module):
def __init__(self,
in_feats,
out_feats,
aggregator_type,
bias=True,
norm=None,
activation=None):
super(SAGEConv, self).__init__()
self._in_src_feats, self._in_dst_feats = expand_as_pair(in_feats)
self._out_feats = out_feats
self._aggre_type = aggregator_type
self.norm = norm
self.activation = activation
不同的消息聚合方式實現,使用reset_parameters()方法進行參數初始化
def reset_parameters(self):
"""Reinitialize learnable parameters."""
gain = nn.init.calculate_gain('relu')
if self._aggre_type == 'max_pool':
nn.init.xavier_uniform_(self.fc_pool.weight, gain=gain)
if self._aggre_type == 'lstm':
self.lstm.reset_parameters()
if self._aggre_type != 'gcn':
nn.init.xavier_uniform_(self.fc_self.weight, gain=gain)
nn.init.xavier_uniform_(self.fc_neigh.weight, gain=gain)
# aggregator type: mean, max_pool, lstm, gcn
if aggregator_type not in ['mean', 'max_pool', 'lstm', 'gcn']:
raise KeyError('Aggregator type {} not supported.'.format(aggregator_type))
if aggregator_type == 'max_pool':
self.fc_pool = nn.Linear(self._in_src_feats, self._in_src_feats)
if aggregator_type == 'lstm':
self.lstm = nn.LSTM(self._in_src_feats, self._in_src_feats, batch_first=True)
if aggregator_type in ['mean', 'max_pool', 'lstm']:
self.fc_self = nn.Linear(self._in_dst_feats, out_feats, bias=bias)
self.fc_neigh = nn.Linear(self._in_src_feats, out_feats, bias=bias)
self.reset_parameters()
DGL nn模塊的前向計算方法
def forward(self, graph, feat):
with graph.local_scope():
# Specify graph type then expand input feature according to graph type
feat_src, feat_dst = expand_as_pair(feat, graph)
SAGEConv的過程可以用數學公式表示為:
在同構圖中,整個訓練過程的源節點和目的節點的類型是相同的。
而在異構圖的訓練過程中,可以被分成多種二部圖,每個二部圖對應了一種節點之間的關系(src_type, edge_type, dst_dtype)。在批量訓練中,每次的訓練過程只發生在叫做block的子圖上,只有在block中的目的節點會根據傳遞的消息進行更新。
SAGEConv 的消息傳遞和聚合
if self._aggre_type == 'mean':
graph.srcdata['h'] = feat_src
graph.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'neigh'))
h_neigh = graph.dstdata['neigh']
elif self._aggre_type == 'gcn':
check_eq_shape(feat)
graph.srcdata['h'] = feat_src
graph.dstdata['h'] = feat_dst # same as above if homogeneous
graph.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'neigh'))
# divide in_degrees
degs = graph.in_degrees().to(feat_dst)
h_neigh = (graph.dstdata['neigh'] + graph.dstdata['h']) / (degs.unsqueeze(-1) + 1)
elif self._aggre_type == 'max_pool':
graph.srcdata['h'] = F.relu(self.fc_pool(feat_src))
graph.update_all(fn.copy_u('h', 'm'), fn.max('m', 'neigh'))
h_neigh = graph.dstdata['neigh']
else:
raise KeyError('Aggregator type {} not recognized.'.format(self._aggre_type))
# GraphSAGE GCN does not require fc_self.
if self._aggre_type == 'gcn':
rst = self.fc_neigh(h_neigh)
else:
rst = self.fc_self(h_self) + self.fc_neigh(h_neigh)
激活函數和正則
# activation
if self.activation is not None:
rst = self.activation(rst)
# normalization
if self.norm is not None:
rst = self.norm(rst)
return rst
y異構圖中的GraphConv 模塊
異構圖中的GraphConv 可以表示為:
其中\(f_r\)表示處理關系\(r\)的模塊。
class HeteroGraphConv(nn.Module):
def __init__(self, mods, aggregate='sum'):
super(HeteroGraphConv, self).__init__()
self.mods = nn.ModuleDict(mods)
if isinstance(aggregate, str):
self.agg_fn = get_aggregate_fn(aggregate)
else:
self.agg_fn = aggregate
def forward(self, g, inputs, mod_args=None, mod_kwargs=None):
if mod_args is None:
mod_args = {}
if mod_kwargs is None:
mod_kwargs = {}
outputs = {nty : [] for nty in g.dsttypes}
if g.is_block:
src_inputs = inputs
dst_inputs = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()}
else:
src_inputs = dst_inputs = inputs
for stype, etype, dtype in g.canonical_etypes:
rel_graph = g[stype, etype, dtype]
if rel_graph.number_of_edges() == 0:
continue
if stype not in src_inputs or dtype not in dst_inputs:
continue
dstdata = self.mods[etype](
rel_graph,
(src_inputs[stype], dst_inputs[dtype]),
*mod_args.get(etype, ()),
**mod_kwargs.get(etype, {}))
outputs[dtype].append(dstdata)
rsts = {}
for nty, alist in outputs.items():
if len(alist) != 0:
rsts[nty] = self.agg_fn(alist, nty)
Chapter 4: Graph Data Pipeline
DGL 中主要使用dgl.data.DGLDataset子類來數據進行讀取、處理和保存圖數據。
DGLDataset類定義的數據處理流程:
例如:下面是一個數據讀取的模板
from dgl.data import DGLDataset
class MyDataset(DGLDataset):
""" Template for customizing graph datasets in DGL.
Parameters
----------
url : str
URL to download the raw dataset
raw_dir : str
Specifying the directory that will store the
downloaded data or the directory that
already stores the input data.
Default: ~/.dgl/
save_dir : str
Directory to save the processed dataset.
Default: the value of `raw_dir`
force_reload : bool
Whether to reload the dataset. Default: False
verbose : bool
Whether to print out progress information
"""
def __init__(self,
url=None,
raw_dir=None,
save_dir=None,
force_reload=False,
verbose=False):
super(MyDataset, self).__init__(name='dataset_name',
url=url,
raw_dir=raw_dir,
save_dir=save_dir,
force_reload=force_reload,
verbose=verbose)
def download(self):
# download raw data to local disk
pass
def process(self):
# process raw data to graphs, labels, splitting masks
pass
def __getitem__(self, idx):
# get one example by index
pass
def __len__(self):
# number of data examples
pass
def save(self):
# save processed data to directory `self.save_path`
pass
def load(self):
# load processed data from directory `self.save_path`
pass
def has_cache(self):
# check whether there are processed data in `self.save_path`
pass
Download raw data (optional)
from dgl.data.utils import download, extract_archive
def download(self):
# path to store the file
# make sure to use the same suffix as the original file name's
gz_file_path = os.path.join(self.raw_dir, self.name + '.csv.gz')
# download file
download(self.url, path=gz_file_path)
# check SHA-1
if not check_sha1(gz_file_path, self._sha1_str):
raise UserWarning('File {} is downloaded but the content hash does not match.'
'The repo may be outdated or download may be incomplete. '
'Otherwise you can create an issue for it.'.format(self.name + '.csv.gz'))
# extract file to directory `self.name` under `self.raw_dir`
self._extract_gz(gz_file_path, self.raw_path)
Process data
Processing Graph Classification datasets
Processing Node Classification datasets
Processing dataset for Link Prediction datasets
Save and load data
Loading OGB datasets using ogb package
Chapter 5: Training Graph Neural Networks
在圖神經網絡中,一般有節點分類、邊分類、邊預測(兩個節點存在邊的可能性)、對圖作為一個整體的分類等任務。
首先用人工生成的異構圖作為例子,圖中的邊類型有:
('user', 'follow', 'user')
('user', 'followed-by', 'user')
('user', 'click', 'item')
('item', 'clicked-by', 'user')
('user', 'dislike', 'item')
('item', 'disliked-by', 'user')
構造異構圖的代碼如下:
import numpy as np
import torch
n_users = 1000
n_items = 500
n_follows = 3000
n_clicks = 5000
n_dislikes = 500
n_hetero_features = 10
n_user_classes = 5
n_max_clicks = 10
follow_src = np.random.randint(0, n_users, n_follows)
follow_dst = np.random.randint(0, n_users, n_follows)
click_src = np.random.randint(0, n_users, n_clicks)
click_dst = np.random.randint(0, n_items, n_clicks)
dislike_src = np.random.randint(0, n_users, n_dislikes)
dislike_dst = np.random.randint(0, n_items, n_dislikes)
hetero_graph = dgl.heterograph({
('user', 'follow', 'user'): (follow_src, follow_dst),
('user', 'followed-by', 'user'): (follow_dst, follow_src),
('user', 'click', 'item'): (click_src, click_dst),
('item', 'clicked-by', 'user'): (click_dst, click_src),
('user', 'dislike', 'item'): (dislike_src, dislike_dst),
('item', 'disliked-by', 'user'): (dislike_dst, dislike_src)})
hetero_graph.nodes['user'].data['feature'] = torch.randn(n_users, n_hetero_features)
hetero_graph.nodes['item'].data['feature'] = torch.randn(n_items, n_hetero_features)
hetero_graph.nodes['user'].data['label'] = torch.randint(0, n_user_classes, (n_users,))
hetero_graph.edges['click'].data['label'] = torch.randint(1, n_max_clicks, (n_clicks,)).float()
# randomly generate training masks on user nodes and click edges
hetero_graph.nodes['user'].data['train_mask'] = torch.zeros(n_users, dtype=torch.bool).bernoulli(0.6)
hetero_graph.edges['click'].data['train_mask'] = torch.zeros(n_clicks, dtype=torch.bool).bernoulli(0.6)
5.1 Node Classification/Regression
模型定義:
# Contruct a two-layer GNN model
import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
class SAGE(nn.Module):
def __init__(self, in_feats, hid_feats, out_feats):
super().__init__()
self.conv1 = dglnn.SAGEConv(
in_feats=in_feats, out_feats=hid_feats, aggregator_type='mean')
self.conv2 = dglnn.SAGEConv(
in_feats=hid_feats, out_feats=out_feats, aggregator_type='mean')
def forward(self, graph, inputs):
# inputs are features of nodes
h = self.conv1(graph, inputs)
h = F.relu(h)
h = self.conv2(graph, h)
return h
def evaluate(model, graph, features, labels, mask):
model.eval()
with torch.no_grad():
logits = model(graph, features)
logits = logits[mask]
labels = labels[mask]
_, indices = torch.max(logits, dim=1)
correct = torch.sum(indices == labels)
return correct.item() * 1.0 / len(labels)
訓練流程:
node_features = graph.ndata['feat']
node_labels = graph.ndata['label']
train_mask = graph.ndata['train_mask']
valid_mask = graph.ndata['val_mask']
test_mask = graph.ndata['test_mask']
n_features = node_features.shape[1]
n_labels = int(node_labels.max().item() + 1)
model = SAGE(in_feats=n_features, hid_feats=100, out_feats=n_labels)
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
model.train()
# forward propagation by using all nodes
logits = model(graph, node_features)
# compute loss
loss = F.cross_entropy(logits[train_mask], node_labels[train_mask])
# compute validation accuracy
acc = evaluate(model, graph, node_features, node_labels, valid_mask)
# backward propagation
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
# Save model if necessary. Omitted in this example.
異構圖的訓練過程:
# Define a Heterograph Conv model
import dgl.nn as dglnn
class RGCN(nn.Module):
def __init__(self, in_feats, hid_feats, out_feats, rel_names):
super().__init__()
self.conv1 = dglnn.HeteroGraphConv({
rel: dglnn.GraphConv(in_feats, hid_feats)
for rel in rel_names}, aggregate='sum')
self.conv2 = dglnn.HeteroGraphConv({
rel: dglnn.GraphConv(hid_feats, out_feats)
for rel in rel_names}, aggregate='sum')
def forward(self, graph, inputs):
# inputs are features of nodes
h = self.conv1(graph, inputs)
h = {k: F.relu(v) for k, v in h.items()}
h = self.conv2(graph, h)
return h
model = RGCN(n_hetero_features, 20, n_user_classes, hetero_graph.etypes)
user_feats = hetero_graph.nodes['user'].data['feature']
item_feats = hetero_graph.nodes['item'].data['feature']
labels = hetero_graph.nodes['user'].data['label']
train_mask = hetero_graph.nodes['user'].data['train_mask']
opt = torch.optim.Adam(model.parameters())
for epoch in range(5):
model.train()
# forward propagation by using all nodes and extracting the user embeddings
logits = model(hetero_graph, node_features)['user']
# compute loss
loss = F.cross_entropy(logits[train_mask], labels[train_mask])
# Compute validation accuracy. Omitted in this example.
# backward propagation
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
# Save model if necessary. Omitted in the example.
當只需要進行推理時:
node_features = {'user': user_feats, 'item': item_feats}
h_dict = model(hetero_graph, {'user': user_feats, 'item': item_feats})
h_user = h_dict['user']
h_item = h_dict['item']
5.2 Edge Classification/Regression
在一些任務中,可能需要對邊上的屬性進行預測或者可能需要預測兩個節點之間是否存在邊,這時就涉及到了邊的分類(回歸任務)。
首先隨機生成一下數據:
src = np.random.randint(0, 100, 500)
dst = np.random.randint(0, 100, 500)
# make it symmetric
edge_pred_graph = dgl.graph((np.concatenate([src, dst]), np.concatenate([dst, src])))
# synthetic node and edge features, as well as edge labels
edge_pred_graph.ndata['feature'] = torch.randn(100, 10)
edge_pred_graph.edata['feature'] = torch.randn(1000, 10)
edge_pred_graph.edata['label'] = torch.randn(1000)
# synthetic train-validation-test splits
edge_pred_graph.edata['train_mask'] = torch.zeros(1000, dtype=torch.bool).bernoulli(0.6)
邊的分類和回歸任務與節點分類任務類似,也是需要計算節點的嵌入表示,然后利用節點的表示,計算邊的類別或者邊上的score。
例如:可以使用如下方式直接使用節點的表示向量h來計算邊的回歸值。
import dgl.function as fn
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h contains the node representations computed from the GNN defined
# in the node classification section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
另外,也可以使用自己定義一個apply_edges()方法,來實現不同的計算過程:
class MLPPredictor(nn.Module):
def __init__(self, in_features, out_classes):
super().__init__()
self.W = nn.Linear(in_features * 2, out_classes)
def apply_edges(self, edges):
h_u = edges.src['h']
h_v = edges.dst['h']
score = self.W(torch.cat([h_u, h_v], 1))
return {'score': score}
def forward(self, graph, h):
# h contains the node representations computed from the GNN defined
# in the node classification section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(self.apply_edges)
return graph.edata['score']
全圖訓練的流程:
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.sage = SAGE(in_features, hidden_features, out_features)
self.pred = DotProductPredictor()
def forward(self, g, x):
h = self.sage(g, x)
return self.pred(g, h)
node_features = edge_pred_graph.ndata['feature']
edge_label = edge_pred_graph.edata['label']
train_mask = edge_pred_graph.edata['train_mask']
model = Model(10, 20, 5)
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
pred = model(edge_pred_graph, node_features)
loss = ((pred[train_mask] - edge_label[train_mask]) ** 2).mean()
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
對於異構圖,也是類似的,只需要在apply_edges函數中加入etype參數,指定邊的類型:
#### 直接使用節點表示,計算點積
class HeteroDotProductPredictor(nn.Module):
def forward(self, graph, h, etype):
# h contains the node representations for each edge type computed from
# the GNN for heterogeneous graphs defined in the node classification
# section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h # assigns 'h' of all node types in one shot
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype)
return graph.edges[etype].data['score']
#### 自定義 edges score計算方式
class MLPPredictor(nn.Module):
def __init__(self, in_features, out_classes):
super().__init__()
self.W = nn.Linear(in_features * 2, out_classes)
def apply_edges(self, edges):
h_u = edges.src['h']
h_v = edges.dst['h']
score = self.W(torch.cat([h_u, h_v], 1))
return {'score': score}
def forward(self, graph, h, etype):
# h contains the node representations for each edge type computed from
# the GNN for heterogeneous graphs defined in the node classification
# section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h # assigns 'h' of all node types in one shot
graph.apply_edges(self.apply_edges, etype=etype)
return graph.edges[etype].data['score']
異構圖的全圖訓練流程如下:
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, rel_names):
super().__init__()
self.sage = RGCN(in_features, hidden_features, out_features, rel_names)
self.pred = HeteroDotProductPredictor()
def forward(self, g, x, etype):
h = self.sage(g, x)
return self.pred(g, h, etype)
model = Model(10, 20, 5, hetero_graph.etypes)
user_feats = hetero_graph.nodes['user'].data['feature']
item_feats = hetero_graph.nodes['item'].data['feature']
label = hetero_graph.edges['click'].data['label']
train_mask = hetero_graph.edges['click'].data['train_mask']
node_features = {'user': user_feats, 'item': item_feats}
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
pred = model(hetero_graph, node_features, 'click')
loss = ((pred[train_mask] - label[train_mask]) ** 2).mean()
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
5.3 Link Prediction
有時需要對給定的兩個節點預測,節點間是否存在一條邊。仍然是利用兩個節點的表示建模:
在訓練過程中,一般采用加入負采樣的損失函數。
Cross-entropy loss:\(\mathcal{L} = - \log \sigma (y_{u,v}) - \sum_{v_i \sim P_n(v), i=1,\dots,k}\log \left[ 1 - \sigma (y_{u,v_i})\right]\)
BPR loss: \(\mathcal{L} = \sum_{v_i \sim P_n(v), i=1,\dots,k} - \log \sigma (y_{u,v} - y_{u,v_i})\)
Margin loss: \(\mathcal{L} = \sum_{v_i \sim P_n(v), i=1,\dots,k} \max(0, M - y_{u, v} + y_{u, v_i})\),其中M是一個超參數。
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h contains the node representations computed from the GNN defined
# in the node classification section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
def construct_negative_graph(graph, k):
src, dst = graph.edges()
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.number_of_nodes(), (len(src) * k,))
return dgl.graph((neg_src, neg_dst), num_nodes=graph.number_of_nodes())
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.sage = SAGE(in_features, hidden_features, out_features)
self.pred = DotProductPredictor()
def forward(self, g, neg_g, x):
h = self.sage(g, x)
return self.pred(g, h), self.pred(neg_g, h)
def compute_loss(pos_score, neg_score):
# Margin loss
n_edges = pos_score.shape[0]
return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean()
node_features = graph.ndata['feat']
n_features = node_features.shape[1]
k = 5
model = Model(n_features, 100, 100)
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
negative_graph = construct_negative_graph(graph, k)
pos_score, neg_score = model(graph, negative_graph, node_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
訓練完成后
node_embeddings = model.sage(graph, node_features)
然后使用兩個節點的嵌入表示計算節點之間存在邊的概率。
在異構圖的訓練過程如下:
class HeteroDotProductPredictor(nn.Module):
def forward(self, graph, h, etype):
# h contains the node representations for each node type computed from
# the GNN defined in the previous section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype)
return graph.edges[etype].data['score']
def construct_negative_graph(graph, k, etype):
utype, _, vtype = etype
src, dst = graph.edges(etype=etype)
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.number_of_nodes(vtype), (len(src) * k,))
return dgl.heterograph(
{etype: (neg_src, neg_dst)},
num_nodes_dict={ntype: graph.number_of_nodes(ntype) for ntype in graph.ntypes})
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, rel_names):
super().__init__()
self.sage = RGCN(in_features, hidden_features, out_features, rel_names)
self.pred = HeteroDotProductPredictor()
def forward(self, g, neg_g, x, etype):
h = self.sage(g, x)
return self.pred(g, h, etype), self.pred(neg_g, h, etype)
def compute_loss(pos_score, neg_score):
# Margin loss
n_edges = pos_score.shape[0]
return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean()
k = 5
model = Model(10, 20, 5, hetero_graph.etypes)
user_feats = hetero_graph.nodes['user'].data['feature']
item_feats = hetero_graph.nodes['item'].data['feature']
node_features = {'user': user_feats, 'item': item_feats}
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
negative_graph = construct_negative_graph(hetero_graph, k, ('user', 'click', 'item'))
pos_score, neg_score = model(hetero_graph, negative_graph, node_features, ('user', 'click', 'item'))
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
Chapter 6: Stochastic Training on Large Graphs
在實際使用中,如果邊或節點數超過百萬甚至是十億級別的時候,就無法使用上一章介紹的在全圖上的訓練流程了。假設network layer為L,隱藏層的維度為H,節點的個數為N,此時需要的空間為:O(NLH),如果N的值很大,很容易就會超過GPU的內存。因此需要使用批梯度下降訓練,主要采用的鄰居抽樣的方法:
節點分類中的鄰居采樣
import dgl
import dgl.nn as dglnn
import torch
import torch.nn as nn
import torch.nn.functional as F
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.NodeDataLoader(
g, train_nids, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
dataloader 每個批次返回3個對象;input_nodes, output_nodes, blocks。input_nodes對應了這個批次所有涉及的節點,output_nodes是批次里最后需要表示的節點,blocks里描述了批次里節點之間的連接關系。
模型定義如下:
## 全圖訓練的模型定義:
class TwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dglnn.GraphConv(in_features, hidden_features)
self.conv2 = dglnn.GraphConv(hidden_features, out_features)
def forward(self, g, x):
x = F.relu(self.conv1(g, x))
x = F.relu(self.conv2(g, x))
return x
## 批次訓練的模型定義:
class StochasticTwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dgl.nn.GraphConv(in_features, hidden_features)
self.conv2 = dgl.nn.GraphConv(hidden_features, out_features)
def forward(self, blocks, x):
x = F.relu(self.conv1(blocks[0], x))
x = F.relu(self.conv2(blocks[1], x))
return x
可以看到模型結構和參數是相同的,只是不同網絡層傳遞的參數是不同的。
整個訓練流程大致可以分為四個部分:
1、取出input nodes的特征復制到GPU
2、將input nodes特征和blocks輸入到模型,接收模型的輸出
3、取出label復制到GPU (1和3可以合並)
4、計算loss並反選傳播損失
model = StochasticTwoLayerGCN(in_features, hidden_features, out_features)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, output_nodes, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
input_features = blocks[0].srcdata['features']
output_labels = blocks[-1].dstdata['label']
output_predictions = model(blocks, input_features)
loss = compute_loss(output_labels, output_predictions)
opt.zero_grad()
loss.backward()
opt.step()
異構圖的訓練:
class StochasticTwoLayerRGCN(nn.Module):
def __init__(self, in_feat, hidden_feat, out_feat):
super().__init__()
self.conv1 = dglnn.HeteroGraphConv({
rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right')
for rel in rel_names
})
self.conv2 = dglnn.HeteroGraphConv({
rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right')
for rel in rel_names
})
def forward(self, blocks, x):
x = self.conv1(blocks[0], x)
x = self.conv2(blocks[1], x)
return x
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.NodeDataLoader(
g, train_nid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
model = StochasticTwoLayerRGCN(in_features, hidden_features, out_features)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, output_nodes, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
input_features = blocks[0].srcdata # returns a dict
output_labels = blocks[-1].dstdata # returns a dict
output_predictions = model(blocks, input_features)
loss = compute_loss(output_labels, output_predictions)
opt.zero_grad()
loss.backward()
opt.step()