A deeper dive into Raphtory analysis
Raphtory’s analysis engine works by vertex centric computation. Each vertex has access to local information about the graph (just its immediate vicinity). To complement this, vertices can communicate with their neighbours (other vertices that are directly connected to it). Many graph algorithms which operate on a per-vertex level can be expressed in this way. The benefit of this is that graphs can be distributed over multiple cores/machines, each containing a proportion of the vertices, and these vertex computations can be executed in a parallel manner.
Each vertex (or node) knows:
Its own update history, property set and property history e.g. values and update times.
The history/properties of the edges attached to it - both incoming and outgoing as Raphtory has a directed graph model. E.g. in Twitter where the other person has to follow you back, as opposed to Facebook friends where two friends are connected by default.
Its own algorithmic state - this is a map of temporary values that can be set and modified during the computation steps.
The next sections will explore how algorithms can be written using these vertex communications.
The GraphAlgorithm API
The core of the Raphtory algorithm API is the algorithm
package which contains base-classes for different types of algorithms that custom algorithms should extend.
The different algorithm base classes support mapping between different types of views of the underlying graph.
Currently Raphtory supports an aggregate ReducedGraphPerspective
view
where algorithm steps act on vertices (see Vertex
) and a multilayer
MultilayerGraphPerspective
view where algorithm steps act on
exploded vertices (see ExplodedVertex
) which represent a vertex at a
given point in time.
In general, an algorithm has two stages: graph processing and tabularising results. Graph processing is defined
using the apply()
method whereas tabularising results is handled by the
tabularise()
method. Graph operations are defined by the
GraphPerspective
class and tabular data by the Table
class.
Rows in a Table
are manipulated using the
Row
class.
The support for different graph views is still experimental. Algorithms that are view-agnostic (in particular,
all algorithms from Raphtory-akka (The prior deprecated version)) should extend the
Generic
algorithm base-class.
The algorithm API package also contains
Identity
which is an algorithm that leaves the graph unchanged and does
not write out any results. This is mainly useful as a default argument for algorithms that can optionally
run another graph algorithm, e.g., as an optional pre- or post-processing step. An example of such an algorithm
is community-based outlier detection CBOD
which can optionally
run a community detection algorithm to label the vertices.
Graph processing
The core of most algorithms (though not all, see zero-step algorithms below) is the graph processing stage.
The graph processing is implemented by overriding the apply()
method,
(e.g., for a Generic
algorithm):
override def apply(graph: GraphPerspective): graph.Graph = {
The apply()
method takes a GraphPerspective
as input,
manipulates the state of vertices, and then returns the
graph view, either for further processing by
other algorithms or for collecting and writing out results.
A GraphPerspective
has two key methods which are used during
graph processing, step()
and iterate()
.
step()
step()
takes in a function to be applied to each vertex in the graph, and permits each vertex to mutate
its state and send messages to some or all of its neighbours. Vertices are represented in Raphtory by the
Vertex
class, which exposes methods for storing computational state,
messaging other vertices, accessing edges (represented by the Edge
class),
and accessing properties set when the graph was constructed. The step()
method is often used as the setup for
an algorithm, getting each vertex ready with a default state or finding the subset of vertices that are required to
send the first messages. For example:
graph
.step({
vertex =>
vertex.setState("cclabel", vertex.ID)
vertex.messageAllNeighbours(vertex.ID)
})
This is a snippet from the Raphtory
Connected Components implementation. Here, each vertex sets
its cclabel
to its own ID and then sends this label to all of its neighbours.
iterate()
iterate()
does the same thing as step()
, but is run repeatedly until some criterion is met or a
maximum number of iterations is reached. Vertex state is often used to record progress during iterations and to
decide if an algorithm has converged. The convergence criterion is established by vertices voting to halt unanimously
using the Vertex.voteToHalt()
method.
All of this can be seen in the example below:
.iterate({
vertex =>
val label = vertex.messageQueue[Long].min
if (label < vertex.getState[Long]("cclabel")) {
vertex.setState("cclabel", label)
vertex messageAllNeighbours label
}
else
vertex.voteToHalt()
}, iterations = 100, executeMessagedOnly = true)
}
In this instance, the vertices check the messages they have received from neighbours and set their cclabel
to be
the minimum number received. This new label is then sent to their neighbours, allowing it to propagate to the
neighbours who sent the other labels in the next step. If no new label is found
(as their own label is already lower) a vertex may call voteToHalt()
. This means that they believe they have
found their final value and therefore the algorithm may converge early. No new messages are sent in this instance.
Due to the nature of this algorithm and those like it, iterate()
has an additional flag of
executeMessagedOnly
, which when set means that only vertices which have received new messages will execute the function.
This can drastically increase the efficiency of algorithms on large graphs, where often only a few vertices
may need to execute at any one step (especially when looking at algorithms like random walks or paths).
For connected components, as a vertex won’t change its label unless a lower label is received from a neighbour,
it can be set here.
Global state
Many algorithms require computing some global properties, e.g., normalisation constants. This is supported in
Raphtory by using accumulators. The function defining the algorithmic step for both the step()
and iterate()
method can optionally take a second GraphState
argument. Before we can
use the global state, we first have to define some accumulators using the
GraphPerspective.setGlobalState()
method.
The Accumulator
class has a +=
operator to add new values to the
accumulator and a value
attribute for accessing the last computed value.
The example below illustrates how to compute the maximum degree in the graph using an accumulator:
graph
.setGlobalState({
graphState =>
graphState.newMax[Int]("maxDegree")
})
.step({
(vertex, graphState) =>
graphState("maxDegree") += vertex.degree
})
Tabularising results
Once graph processing is complete, the algorithm proceeds to collect vertex state into tabular form.
This stage of the algorithm is implemented by overriding the tabularise()
method, i.e.
override def tabularise(graph: GraphPerspective): Table = {
select()
Typically, one calls select()
on the input graph as the first step of tabularise()
(though one can do further
graph post-processing at this stage).
select()
maps a vertex to a Row
object containing the results for that vertex.
As an alternative, one can also use the explodeSelect()
method. Like select, explodeSelect()
also takes a
function which is executed once per vertex. However, this function should return a list of rows rather than a single row.
This can be used to return multiple rows per vertex (e.g. for edge-level outputs
EdgeList
) or as a way of filtering results by returning an empty list.
Like the step()
and iterate()
methods, the select()
and explodeSelect()
step can also optionally take the global GraphState
as an additional input.
The final method for tabularising results is the globalSelect()
method which maps the global
GraphState
to
a single Row
. For example, we could use this to return the maximum degree
computed above:
graph
.globalSelect(graphState => Row(graphState[Int]("maxDegree")))
In the connected components instance, we are interested in extracting the ID of the vertex and the final component ID that it saved in its state:
graph
.select(vertex => Row(vertex.ID(),vertex.getState[Long]("cclabel")))
Once we have the data in Row form we may perform a different set of transformations.
filter()
The filter function can only be run after the vertex data has been converted to
Table
format by the select()
call.
For example:
table
.filter(row => row.get(1) == true)
This can be important if you only want to return elements that have received a certain label.
For example, if we are looking for nodes reachable from a given entity in the graph, we can store a flag in their state
and then filter on this once in Row form. Alternatively, we could have implemented this using explodeSelect()
.
explode()
explode()
can be used to increase the number of rows, or prevent the output from producing any arrays.
For example, if the select function returned a list within the row, we can use the explode to turn this list
into individual items:
table
.explode(row => row.get(2).asInstanceOf[List[(Long, String)]].map(
expl => Row(row(0), expl._1, expl._2)
)
)
Writing out results
Finally, once you are happy with the format of your data you can output it to disk.
This is done by using a Sink
which is given to the
query when it is executed. Several inbuilt output formats are available within Raphtory, but it is also very
simple to implement your own if you have a specific destination in mind.
As an example from our prior code snippets the FileSink
saves the results of each
partition as separate
files in a directory. This directory is the only argument required when creating the
FileSink
object and passing it to the query:
val sink = FileSink("/tmp")
graph
.execute(ConnectedComponents())
.writeTo(sink)
Similarly the PulsarSink
can be used to write results directly to
Pulsar topics. The user gives the Topic as an argument when creating the
PulsarSink
object:
val sink = PulsarSink("components")
graph
.execute(ConnectedComponents())
.writeTo(sink)
Types of Algorithms
Zero-step algorithms
Zero-step algorithms refer to all algorithms which require no vertex messaging step, and can be expressed just using a
select()
operation. This means that the algorithm only requires knowledge of each vertex and the edges
connected to it. This might be familiar as local measures
in the network science literature. Some algorithms that fit
into this category are:
Vertex degree
Vertex/edge property extraction
Some temporal motifs centred on a vertex, e.g. 3-node 2-edge temporal motifs.
In principle, one can implement such an algorithm by only overriding the tabularise()
,
leaving the default apply()
method which simply returns the input graph unchanged.
To see an example of this, here is a snippet extracting vertex degrees:
1 override def tabularise(graph: GraphPerspective): Table = {
2 graph
3 .select({
4 vertex =>
5 val inDegree = vertex.getInNeighbours().size
6 val outDegree = vertex.getOutNeighbours().size
7 val totalDegree = vertex.getAllNeighbours().size
8 Row(vertex.name(), inDegree, outDegree, totalDegree)
9 })
10}
In here, the vertex’s in/out/total degree is extracted in a straightforward way, with line 8 mapping these to a
row for outputting. However, this means that the algorithm cannot usefully participate in a more complicated
processing pipeline as downstream algorithms do not have access to the computed results. For something as simple as
outputting node degrees, which other algorithms already trivially have access to, this may make sense.
Generally, however, it is better practise to implement such algorithms using a single step()
during graph
processing which does not send any messages and sets the computed values as state on the vertices.
For the degree algorithm above, we would instead have (this is how the build-in
Degree
works):
override def apply(graph: GraphPerspective): graph.Graph = {
graph.step({
vertex =>
vertex.setState("inDegree", vertex.getInNeighbours().size)
vertex.setState("outDegree", vertex.getOutNeighbours().size)
vertex.setState("totalDegree", vertex.getAllNeighbours().size)
})
}
override def tabularise(graph: GraphPerspective): Table = {
graph.select({
vertex => Row(vertex.getPropertyOrElse("name", vertex.ID()),
vertex.getState("inDegree"), vertex.getState("outDegree"), vertex.getState("totalDegree"))
})
}
Implemented in this way, the algorithm can participate usefully in algorithm chaining discussed below.
One-step algorithms
One-step algorithms are those which require precisely one messaging step, and can be expressed using
two calls to step()
to send out the messages and collate the results. This maps to measures which
require knowledge of a vertex’s neighbours and the connections between them. Some examples of these include:
Local triangle count
Local clustering coefficient
Average neighbour degree
For an example of this, let’s look at a snippet of the
LocalTriangleCount
algorithm:
override def apply(graph: GraphPerspective): graph.Graph = {
graph
.step({
vertex =>
vertex.setState("triangles",0)
val neighbours = vertex.getAllNeighbours().toSet
vertex.messageAllNeighbours(neighbours)
})
.step({
vertex =>
val neighbours = vertex.getAllNeighbours().toSet
val queue = vertex.messageQueue[Set[Long]]
var tri = 0
queue.foreach(
nbs =>
tri+=nbs.intersect(neighbours).size
)
vertex.setState("triangles",tri/2)
})
}
override def tabularise(graph: GraphPerspective): Table = {
graph
.select({
vertex =>
Row(vertex.name(), vertex.getState[Int]("triangles"))
})
}
The first step()
function tells each vertex to send a list of its neighbours to all neighbours.
Then the second step()
function tells each vertex to compute the intersection of the received sets with its own
set of neighbours. The sum of these intersections is twice the number of triangles for that vertex,
so this number is halved.
Iterative algorithms
Finally, iterative algorithms cover those which require an unknown number of messaging steps,
which are executed using a mixture of step()
and iterate()
. These algorithms correspond to
measures that take into account the large scale structure of the graph, including:
Connected components
Single source shortest path
PageRank, eigenvector and hub/authority centrality
Community detection (e.g., using the label propagation algorithm)
Watts’ linear threshold model
Diffusion models (taint tracking, SIS/SIR)
An example of this is the ConnectedComponents
algorithm discussed previously.
Tabularisers
It is possible to write algorithms that only extract vertex state and output it to table format.
These are particularly useful as the final step in an algorithm chain where one may want to also extract
intermediate results. In this case, there is no need to override the apply()
method as the default
implementation simply returns the input graph unchanged. A tabulariser can simply override the tabularise()
method as needed.
Chaining algorithms
It is often useful to compose/chain different algorithms together. This is implemented in Raphtory using the
->
operator of the algorithm classes.
As an example, consider the CBOD
algorithm which detects outliers
based on community labels for vertices.
In some cases, community labels may already be included in the input data.
However, most of the time one would need to run a community detection algorithm (e.g.,
LPA
) first to get the labels.
One way to express this in Raphtory is to use chaining, i.e,
import com.raphtory.algorithms.generic.community.LPA
import com.raphtory.algorithms.generic.CBOD
val lpa_cbod = LPA() -> CBOD(label="lpalabel")
This results in a new algorithm with an apply()
method that simply calls the apply()
method
of the input algorithms in sequence, i.e.,
lpa_cbod.apply(graph)
is equivalent to calling
CBOD(label="lpalabel").apply(LPA().apply(graph))
The tabularise
method of a chained algorithm calls the corresponding methods of the last algorithm in the chain, i.e.,
lpa-cbod.tabularise(graph)
is equivalent to
CBOD(label="lpalabel").tabularise(graph)
In this example, we first use LPA
to compute community labels
and store them in a vertex state with key "lpalabel"
.
CBOD
then uses the labels to identify outliers and writes out the results.
In the case of CBOD
, one can also supply the community detection
algorithm as an optional argument, i.e., CBOD(label="lpalabel", labeler=LPA())
, where by default
labeler=Identity
which does nothing. This is an example of the intended use of the
Identity
algorithm.