A deeper dive into Raphtory analysis

Raphtory’s analysis engine works by vertex centric computation. Each vertex has access to local information about the graph (just its immediate vicinity). To complement this, vertices can communicate with their neighbours (other vertices that are directly connected to it). Many graph algorithms which operate on a per-vertex level can be expressed in this way. The benefit of this is that graphs can be distributed over multiple cores/machines, each containing a proportion of the vertices, and these vertex computations can be executed in a parallel manner.

Each vertex (or node) knows:

  • Its own update history, property set and property history e.g. values and update times.

  • The history/properties of the edges attached to it - both incoming and outgoing as Raphtory has a directed graph model. E.g. in Twitter where the other person has to follow you back, as opposed to Facebook friends where two friends are connected by default.

  • Its own algorithmic state - this is a map of temporary values that can be set and modified during the computation steps.

vertex time view

The information available to vertex V1 - this includes the time it was created, the IDs of its neighbours and the times at which its edges are established.

The next sections will explore how algorithms can be written using these vertex communications.

The GraphAlgorithm API

The core of the Raphtory algorithm API is the algorithm package which contains base-classes for different types of algorithms that custom algorithms should extend. The different algorithm base classes support mapping between different types of views of the underlying graph. Currently Raphtory supports an aggregate ReducedGraphPerspective view where algorithm steps act on vertices (see Vertex) and a multilayer MultilayerGraphPerspective view where algorithm steps act on exploded vertices (see ExplodedVertex) which represent a vertex at a given point in time.

In general, an algorithm has two stages: graph processing and tabularising results. Graph processing is defined using the apply() method whereas tabularising results is handled by the tabularise() method. Graph operations are defined by the GraphPerspective class and tabular data by the Table class. Rows in a Table are manipulated using the Row class.

The support for different graph views is still experimental. Algorithms that are view-agnostic (in particular, all algorithms from Raphtory-akka (The prior deprecated version)) should extend the Generic algorithm base-class.

The algorithm API package also contains Identity which is an algorithm that leaves the graph unchanged and does not write out any results. This is mainly useful as a default argument for algorithms that can optionally run another graph algorithm, e.g., as an optional pre- or post-processing step. An example of such an algorithm is community-based outlier detection CBOD which can optionally run a community detection algorithm to label the vertices.

Graph processing

The core of most algorithms (though not all, see zero-step algorithms below) is the graph processing stage. The graph processing is implemented by overriding the apply() method, (e.g., for a Generic algorithm):

override def apply(graph: GraphPerspective): graph.Graph = {

The apply() method takes a GraphPerspective as input, manipulates the state of vertices, and then returns the graph view, either for further processing by other algorithms or for collecting and writing out results. A GraphPerspective has two key methods which are used during graph processing, step() and iterate().

step()

step() takes in a function to be applied to each vertex in the graph, and permits each vertex to mutate its state and send messages to some or all of its neighbours. Vertices are represented in Raphtory by the Vertex class, which exposes methods for storing computational state, messaging other vertices, accessing edges (represented by the Edge class), and accessing properties set when the graph was constructed. The step() method is often used as the setup for an algorithm, getting each vertex ready with a default state or finding the subset of vertices that are required to send the first messages. For example:

graph
  .step({
    vertex =>
      vertex.setState("cclabel", vertex.ID)
      vertex.messageAllNeighbours(vertex.ID)
  })

This is a snippet from the Raphtory Connected Components implementation. Here, each vertex sets its cclabel to its own ID and then sends this label to all of its neighbours.

iterate()

iterate() does the same thing as step(), but is run repeatedly until some criterion is met or a maximum number of iterations is reached. Vertex state is often used to record progress during iterations and to decide if an algorithm has converged. The convergence criterion is established by vertices voting to halt unanimously using the Vertex.voteToHalt() method.
All of this can be seen in the example below:

        .iterate({
            vertex =>
                val label = vertex.messageQueue[Long].min
                if (label < vertex.getState[Long]("cclabel")) {
                    vertex.setState("cclabel", label)
                    vertex messageAllNeighbours label
                }
                else
                    vertex.voteToHalt()
        }, iterations = 100, executeMessagedOnly = true)
}

In this instance, the vertices check the messages they have received from neighbours and set their cclabel to be the minimum number received. This new label is then sent to their neighbours, allowing it to propagate to the neighbours who sent the other labels in the next step. If no new label is found (as their own label is already lower) a vertex may call voteToHalt(). This means that they believe they have found their final value and therefore the algorithm may converge early. No new messages are sent in this instance.

Due to the nature of this algorithm and those like it, iterate() has an additional flag of executeMessagedOnly, which when set means that only vertices which have received new messages will execute the function. This can drastically increase the efficiency of algorithms on large graphs, where often only a few vertices may need to execute at any one step (especially when looking at algorithms like random walks or paths). For connected components, as a vertex won’t change its label unless a lower label is received from a neighbour, it can be set here.

Global state

Many algorithms require computing some global properties, e.g., normalisation constants. This is supported in Raphtory by using accumulators. The function defining the algorithmic step for both the step() and iterate() method can optionally take a second GraphState argument. Before we can use the global state, we first have to define some accumulators using the GraphPerspective.setGlobalState() method. The Accumulator class has a += operator to add new values to the accumulator and a value attribute for accessing the last computed value.

The example below illustrates how to compute the maximum degree in the graph using an accumulator:

graph
  .setGlobalState({
    graphState =>
      graphState.newMax[Int]("maxDegree")
  })
  .step({ 
    (vertex, graphState) =>
      graphState("maxDegree") += vertex.degree
  })

Tabularising results

Once graph processing is complete, the algorithm proceeds to collect vertex state into tabular form. This stage of the algorithm is implemented by overriding the tabularise() method, i.e.

override def tabularise(graph: GraphPerspective): Table = {

select()

Typically, one calls select() on the input graph as the first step of tabularise() (though one can do further graph post-processing at this stage). select() maps a vertex to a Row object containing the results for that vertex.

As an alternative, one can also use the explodeSelect() method. Like select, explodeSelect() also takes a function which is executed once per vertex. However, this function should return a list of rows rather than a single row. This can be used to return multiple rows per vertex (e.g. for edge-level outputs EdgeList) or as a way of filtering results by returning an empty list. Like the step() and iterate() methods, the select() and explodeSelect() step can also optionally take the global GraphState as an additional input. The final method for tabularising results is the globalSelect() method which maps the global GraphState to a single Row. For example, we could use this to return the maximum degree computed above:

graph
  .globalSelect(graphState => Row(graphState[Int]("maxDegree")))

In the connected components instance, we are interested in extracting the ID of the vertex and the final component ID that it saved in its state:

graph
  .select(vertex => Row(vertex.ID(),vertex.getState[Long]("cclabel")))

Once we have the data in Row form we may perform a different set of transformations.

filter()

The filter function can only be run after the vertex data has been converted to Table format by the select() call. For example:

table
  .filter(row => row.get(1) == true)

This can be important if you only want to return elements that have received a certain label. For example, if we are looking for nodes reachable from a given entity in the graph, we can store a flag in their state and then filter on this once in Row form. Alternatively, we could have implemented this using explodeSelect().

explode()

explode() can be used to increase the number of rows, or prevent the output from producing any arrays. For example, if the select function returned a list within the row, we can use the explode to turn this list into individual items:

table
  .explode(row => row.get(2).asInstanceOf[List[(Long, String)]].map(
    expl => Row(row(0), expl._1, expl._2)
  )
)

Writing out results

Finally, once you are happy with the format of your data you can output it to disk. This is done by using a Sink which is given to the query when it is executed. Several inbuilt output formats are available within Raphtory, but it is also very simple to implement your own if you have a specific destination in mind.

As an example from our prior code snippets the FileSink saves the results of each partition as separate files in a directory. This directory is the only argument required when creating the FileSink object and passing it to the query:

val sink = FileSink("/tmp")
graph
  .execute(ConnectedComponents())
  .writeTo(sink)

Similarly the PulsarSink can be used to write results directly to Pulsar topics. The user gives the Topic as an argument when creating the PulsarSink object:

val sink = PulsarSink("components")
graph
  .execute(ConnectedComponents())
  .writeTo(sink)

Types of Algorithms

Zero-step algorithms

Zero-step algorithms refer to all algorithms which require no vertex messaging step, and can be expressed just using a select() operation. This means that the algorithm only requires knowledge of each vertex and the edges connected to it. This might be familiar as local measures in the network science literature. Some algorithms that fit into this category are:

  • Vertex degree

  • Vertex/edge property extraction

  • Some temporal motifs centred on a vertex, e.g. 3-node 2-edge temporal motifs.

In principle, one can implement such an algorithm by only overriding the tabularise(), leaving the default apply() method which simply returns the input graph unchanged.

To see an example of this, here is a snippet extracting vertex degrees:

 1  override def tabularise(graph: GraphPerspective): Table = {
 2  graph
 3    .select({
 4      vertex =>
 5        val inDegree = vertex.getInNeighbours().size
 6        val outDegree = vertex.getOutNeighbours().size
 7        val totalDegree = vertex.getAllNeighbours().size
 8        Row(vertex.name(), inDegree, outDegree, totalDegree)
 9    })
10}

In here, the vertex’s in/out/total degree is extracted in a straightforward way, with line 8 mapping these to a row for outputting. However, this means that the algorithm cannot usefully participate in a more complicated processing pipeline as downstream algorithms do not have access to the computed results. For something as simple as outputting node degrees, which other algorithms already trivially have access to, this may make sense. Generally, however, it is better practise to implement such algorithms using a single step() during graph processing which does not send any messages and sets the computed values as state on the vertices. For the degree algorithm above, we would instead have (this is how the build-in Degree works):

override def apply(graph: GraphPerspective): graph.Graph = {
  graph.step({
    vertex => 
      vertex.setState("inDegree", vertex.getInNeighbours().size)
      vertex.setState("outDegree", vertex.getOutNeighbours().size)
      vertex.setState("totalDegree", vertex.getAllNeighbours().size)
  })
}

override def tabularise(graph: GraphPerspective): Table = {
  graph.select({
    vertex => Row(vertex.getPropertyOrElse("name", vertex.ID()), 
      vertex.getState("inDegree"), vertex.getState("outDegree"), vertex.getState("totalDegree"))
  })
}

Implemented in this way, the algorithm can participate usefully in algorithm chaining discussed below.

One-step algorithms

One-step algorithms are those which require precisely one messaging step, and can be expressed using two calls to step() to send out the messages and collate the results. This maps to measures which require knowledge of a vertex’s neighbours and the connections between them. Some examples of these include:

  • Local triangle count

  • Local clustering coefficient

  • Average neighbour degree

For an example of this, let’s look at a snippet of the TriangleCount algorithm:

override def apply(graph: GraphPerspective): graph.Graph = {
  graph
    .step({
      vertex =>
        vertex.setState("triangles",0)
        val neighbours = vertex.getAllNeighbours().toSet
        vertex.messageAllNeighbours(neighbours)
    })
    .step({
      vertex =>
        val neighbours = vertex.getAllNeighbours().toSet
        val queue = vertex.messageQueue[Set[Long]]
        var tri = 0
        queue.foreach(
          nbs =>
            tri+=nbs.intersect(neighbours).size
        )
        vertex.setState("triangles",tri/2)
    })
}
    
override def tabularise(graph: GraphPerspective): Table = {
  graph
    .select({
      vertex =>
        Row(vertex.name(), vertex.getState[Int]("triangles"))
    })
}

The first step() function tells each vertex to send a list of its neighbours to all neighbours. Then the second step() function tells each vertex to compute the intersection of the received sets with its own set of neighbours. The sum of these intersections is twice the number of triangles for that vertex, so this number is halved.

Iterative algorithms

Finally, iterative algorithms cover those which require an unknown number of messaging steps, which are executed using a mixture of step() and iterate(). These algorithms correspond to measures that take into account the large scale structure of the graph, including:

  • Connected components

  • Single source shortest path

  • PageRank, eigenvector and hub/authority centrality

  • Community detection (e.g., using the label propagation algorithm)

  • Watts’ linear threshold model

  • Diffusion models (taint tracking, SIS/SIR)

An example of this is the ConnectedComponents algorithm discussed previously.

Tabularisers

It is possible to write algorithms that only extract vertex state and output it to table format. These are particularly useful as the final step in an algorithm chain where one may want to also extract intermediate results. In this case, there is no need to override the apply() method as the default implementation simply returns the input graph unchanged. A tabulariser can simply override the tabularise() method as needed.

Chaining algorithms

It is often useful to compose/chain different algorithms together. This is implemented in Raphtory using the -> operator of the algorithm classes.

As an example, consider the CBOD algorithm which detects outliers based on community labels for vertices.

In some cases, community labels may already be included in the input data. However, most of the time one would need to run a community detection algorithm (e.g., LPA) first to get the labels. One way to express this in Raphtory is to use chaining, i.e,

import com.raphtory.algorithms.generic.community.LPA
import com.raphtory.algorithms.generic.CBOD
val lpa_cbod = LPA() -> CBOD(label="lpalabel")

This results in a new algorithm with an apply() method that simply calls the apply() method of the input algorithms in sequence, i.e.,

lpa_cbod.apply(graph)

is equivalent to calling

CBOD(label="lpalabel").apply(LPA().apply(graph))

The tabularise method of a chained algorithm calls the corresponding methods of the last algorithm in the chain, i.e.,

lpa-cbod.tabularise(graph) 

is equivalent to

CBOD(label="lpalabel").tabularise(graph)

In this example, we first use LPA to compute community labels and store them in a vertex state with key "lpalabel". CBOD then uses the labels to identify outliers and writes out the results.

In the case of CBOD, one can also supply the community detection algorithm as an optional argument, i.e., CBOD(label="lpalabel", labeler=LPA()), where by default labeler=Identity which does nothing. This is an example of the intended use of the Identity algorithm.