Documentation (raphtory-client)

This is the class to create a raphtory client which interacts with pulsar. The main purpose is to pull data to allow for analysis in python.

This library can be installed via pip install raphtory-client

The source code can be found https://github.com/raphtory/raphtory/python

You must set the raphtory environment variable RAPHTORY_PYTHON_ACTIVE to true to ensure Raphtory launches the python gateway server, it is set to false by default. Without this, the python client will not work.

RaphtoryClient Objects

class raphtoryclient()
    def __init__(self, pulsar_admin_url="http://127.0.0.1:8080", pulsar_client_args=None, raphtory_deployment_id=None, conn_file_info=None):

This is the class to create a raphtory client which interacts with pulsar.

Attributes:

  • admin_url str: the url for the pulsar admin client

  • pulsar_client_args: Dict of arguments to be used in the pulsar client, keys must match pulsar.Client parameters

  • raphtory_deployment_id string : deployment id of the running raphtory instance

  • conn_file_info string: absolute file path of java gateway connection file, usually /tmp/deployment_id+'_python_gateway_connection_file

importDefaults

def importDefaults(self)

Helper function that imports a some default classes used by Raphtory.

Arguments:

none

Returns:

none

java_import

def java_import(self, import_class):

Wrapper around the py4j java import class, to improve readability cleaner.

Arguments:

  • import_class string: Java class to import

Returns:

none

make_name

def make_name()

Helper function which generates a random subscription suffix for the reader.

Arguments:

none

Returns:

  • str - subscription suffix

setupPulsarClient

def setupPulsarClient(client_args, max_attempts=5)

Setups a pulsar client using the pulsar address. Retries at least 5 times before returning

Attributes:

  • client_args dict: Dict of arguments to be used in the pulsar client, keys must match pulsar.Client parameters

  • max_attempts int : Number of attempts to retry

Returns:

  • PulsarClient - A pulsar client object if successful

  • None None - None if not successful

createReader

def createReaders(topic, subscription_name='', schema=schema.StringSchema())

Setups a single pulsar reader, which reads from a pulsar topic. Retries at least 5 times before returning

Arguments:

  • topic str : Names of topic to read from

  • subscription_name str : Name for this readers subscription

  • schema Pulsar.Schema : Schema to use for reader

Returns:

  • PulsarReader Pulsar.reader : A pulsar reader object

  • None None - None if not successful

getStats

def getStats(topic, tenant="public", namespace="default")

Reads stats from a pulsar topic using the admin interface. If success returns the response as json else returns an empty dict.

Arguments:

  • topic str - Topic to obtain stats from

  • tenant str - (Optional, default: public) Pulsar tenant to access

  • namepsace str - (Optional, default: default) Pulsar namespace to access

Returns:

json response (dict/json): The response of the request. If unsuccessful then returns an empty dict.

getDataframe

def getResults(reader, delimiter=',', max_messages=sys.maxsize, col_names=[])

Using the reader, reads a complete topic and converts it into a pandas Dataframe. This will split each message from the reader using the class delimiter. By default this expects the results to have three columns called timestamp, window and id. Any columns after this are called result_N.

Arguments:

  • reader Pulsar.Reader : Reader where messages will be pulled from

  • delimiter str : the delimiter for parsing the results

  • max_messages int : (Optional, default:sys.maxsize) The number of messages to return. By default, it returns the entire topic. This may cause memory issues.

  • col_names list[string] : (Optional: default: [“timestamp”, “window”, “id”]). These are the names of the columns. By default this expects the results to have three columns called timestamp, window and id. Any columns after this are called result_N.

Returns:

  • dataframe pandas.dataframe - A dataframe of the topic

find_dates

def find_dates(all_data, node_a_id=0, node_b_id=1, time_col=2)

Given a dataframe of edges, this will find the first time an item was seen. This is returned as a dict with the key being the id and the value time. This can be helpful when trying to identify when a node was first created.

Arguments:

  • all_data dataframe - A dataframe containing a column with keys and a column with times/numbers to compare with.

  • node_a_id int - Position of the id or name to use as key for node A

  • node_b_id int - Position of the id or name to use as key for node B

  • time_col int - Position of the time column which is compared

Returns:

  • first_seen dict - A dictionary with the key= node_id and the value = time

add_node_attributes

def add_node_attributes(G, results, abbr, row_id=2, time_col=0, window_col=-1, result_col=3)

Given a graph, an array of attributes and a abbreviation. This will add all the attributes to the graph. For example, given a graph G, results

Arguments:

  • G networkx.graph - A networkx graph

  • results list[dict] - A list of dataframes which contain attributes. The format for attributes is a dataframe with - id - node id - timestamp - time the attribute was created - window - (optional) the window the attribute was created - result_col - the value of the attribute

  • abbr list(str) : A list of strings which correspond to the abbreviation used when appending the attribute.

  • row_id int/str: Column position which contains ID / Name of the row id column to use, must be the same across results

  • time_col int/str: Column position which contains the timestamp / Name of the timestamp column to use

  • result_col int/str: Column position which contains result / Name of the result column

  • window_col int/str: (Optional, default:’window’) Column position which contains window / Name of the window column, set to ‘’ if not being used

createGraphFromEdgeList

def createGraphFromEdgeList(df, isMultiGraph=True)

Builds a simple networkx graph from an edge list in Raphtory.

Arguments:

  • df pandas.Dataframe: A dataframe of an edgelist where, col 0: timestamp, col 1: source, col 2: destination

  • isMultiGraph bool - If False will use DiGraph, otherwise will use MultiGraph

Returns:

  • G networkx.DiGraph - The graph as built in networkx

createLOTRGraph

def createLOTRGraph(filePath, from_time=0, to_time=sys.maxsize, source_id=0, target_id=1, timestamp_col=2)

Example graph builder in python. Given a csv edgelist this will create a graph using networkx based on the lotr data.

Arguments:

  • filePath str - Location of csv file

  • from_time int - (Optional, default: 0) timestamp to start building graph from

  • to_time int - (Optional, default: sys.maxsize) timestamp to stop building graph

  • source_id int - column for source node

  • target_id int - column for target node

  • timestamp_col int - column for lotr timestamp

Returns:

  • G networkx.DiGraph - The graph as built in networkx

setupRaphtory

def setupRaphtory(self):

Setups a raphtory java client via the gateway object. This allows the user to invoke raphtory java/scalaa methods as if they were running on the raphtory/scala version. Note that arguements must be correct or the methods will not be called. Uses internal raphtory_deployment_id: the deployment id of the raphtory instance to connect to

Arguments:

  • none

Returns:

  • graph: raphtory client graph object

setupJavaGateway

def setupJavaGateway(self, conn_info_file=None):

Creates Java<>Raphtory gateway and imports required files. The gateway allows the user to run/read java/scala methods and objects from python.

Arguments:

  • conn_info_file: full absolute file path to the connection info, usually /tmp/deployment_id+'_python_gateway_connection_file

Returns:

  • py4j.java_gateway: py4j java gateway object

java

def java(self):

Short helper function to make code easier to read

Arguments: none

Returns:

  • java gateway jvm: gateway jvm object