Overview
Giraph can use the
Rexster
REST API to load and store graphs from graph databases like
Neo4j,
OrientDB and others to perform a
computation. Graph databases that are supported by
Blueprints are also
available via Rexster. Additionally, a subset of the input graph can
be injected by means of
Gremlin scripts. This page is intended to get you started with the
Giraph API for Rexster I/O.
Quick Start For Inpatients
Since not everyone is interested in the whole story, here you can find
some easy steps to get quickly started using the Rexster I/O API. We are
assuming you already have a working Hadoop/Giraph setup. If it is not
so, start
here and then come back. This
is important since the
OutputFormat example is based on the
same example provided by the Quick Start guide.
Below you can find a single script to prepare the environment and a
small example to use the
OutputFormat. The only step required
to make the example work is to adjust the configuration variables to your
environment settings. For more details, read the rest of the
document :)
The script below also assumes that Hadoop is up and running
based on the Quick Start guide and the
tiny_graph.txt
input graph is in-place in the input directory.
#!/bin/bash
# Configuration
export REXSTER_VERSION=2.4.0
export HADOOP_VERSION=1.0.2
export GIRAPH_VERSION=1.2.0-SNAPSHOT
export GIRAPH_DIR=/path/to/giraph
export REXSTER_DIR=/path/to/rexster
export HADOOP_DIR=/path/to/hadoop
# Constants
export GIRAPH_REXSTER=${GIRAPH_DIR}/giraph-rexster/giraph-rexster-io
export GIRAPH_CORE=${GIRAPH_DIR}/giraph-core
export GIRAPH_EXAMPLES=${GIRAPH_DIR}/giraph-examples
export GIRAPH_KIBBLE=${GIRAPH_DIR}/giraph-rexster/giraph-kibble
export GIRAPH_REXSTER_JAR=${GIRAPH_REXSTER}/target/giraph-rexster-io-${GIRAPH_VERSION}.jar
export GIRAPH_CORE_JAR=${GIRAPH_CORE}/target/giraph-${GIRAPH_VERSION}-for-hadoop-${HADOOP_VERSION}-jar-with-dependencies.jar
export GIRAPH_EXAMPLES_JAR=${GIRAPH_EXAMPLES}/target/giraph-examples-${GIRAPH_VERSION}-for-hadoop-${HADOOP_VERSION}-jar-with-dependencies.jar
export GIRAPH_KIBBLE_JAR=${GIRAPH_KIBBLE}/target/giraph-kibble-${GIRAPH_VERSION}.jar
export HADOOP_CLASSPATH=${GIRAPH_REXSTER_JAR}:${GIRAPH_EXAMPLES_JAR}:${GIRAPH_CORE_JAR}
# Main
# prepare rexster
mkdir ${REXSTER_DIR}
cd ${REXSTER_DIR}
wget http://tinkerpop.com/downloads/rexster/rexster-server-${REXSTER_VERSION}.zip
unzip rexster-server-${REXSTER_VERSION}.zip
REXSTER_DIR=${REXSTER_DIR}/rexster-server-${REXSTER_VERSION}
# copy the compiled kibble, prepare the rexster configuration, and start rexster
cp ${GIRAPH_KIBBLE_JAR} ${REXSTER_DIR}/ext/
lines=$(wc -l ${REXSTER_DIR}/config/rexster.xml | cut -d" " -f1)
head -n +$(( lines - 2 )) ${REXSTER_DIR}/config/rexster.xml >\
${REXSTER_DIR}/config/rexster.giraph.xml
echo " <graph>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <graph-name>giraphgraph</graph-name>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <graph-location>/tmp/giraphgraph</graph-location>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <graph-type>tinkergraph</graph-type>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <graph-storage>graphson</graph-storage>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <extensions>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <allows>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <allow>tp:gremlin</allow>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " <allow>tp:giraph</allow>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " </allows>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " </extensions>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " </graph>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo " </graphs>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
echo "</rexster>" >> ${REXSTER_DIR}/config/rexster.giraph.xml
${REXSTER_DIR}/bin/rexster.sh -s -c ${REXSTER_DIR}/config/rexster.giraph.xml
# start a Giraph Job
su - hduser
${HADOOP_DIR}/bin/hadoop jar ${GIRAPH_EXAMPLES_JAR} org.apache.giraph.GiraphRunner \
-Dgiraph.rexster.output.graph=giraphgraph \
-Dgiraph.rexster.hostname=127.0.0.1 \
-libjars ${GIRAPH_REXSTER_JAR},${GIRAPH_CORE_JAR} \
org.apache.giraph.examples.SimpleShortestPathsComputation \
-vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat \
-vip input/ \
-vof org.apache.giraph.rexster.io.formats.RexsterLongDoubleFloatVertexOutputFormat \
-eof org.apache.giraph.rexster.io.formats.RexsterLongDoubleFloatEdgeOutputFormat \
-w 1
exit
Architectrue
The Rexster I/O Format is composed by three main components, namely
the
Rexster Input Format and the
Rexster Output
Format which are part of the Giraph code. Both components are
split into
Vertex and
Edge interfaces. Additionally,
the architecture provides the
Giraph Kibble, which is a Rexster
extension to provide the needed facilities to load and store the data
from and to the graph databases. The figure below shows the architecture
in a high level fashion.
The API
Because of how the
Basic
Rexster API is organized, the Giraph API requires the user to specify
both an
Vertex and a
Edge format in both the input and
the output format. Even though such a step is required, the user does
not have to deal with the Rexster connection, which can be easily
configured using the Giraph options provided.
In the next sections, you will be guided in the peculiarities of the
API, starting from the
configurations. Afterwards, we
will provide you with a short descriptionof how to prepare Rexster to be
used with Giraph. Finally, we will walk you through Input and the Output
format APIs and we will conclude presenting some cavet related to the
system.
Configuration Options
The configuration options which can be specified by the user of the
Rexster input format are the following. The configurations are group
in three different categories.
General Configurations,
Input
Format Configurations, and
Output Format Configurations.
General Configurations
label |
type |
default value |
description |
giraph.rexster.hostname |
string |
127.0.0.1 |
Rexster hostname which provides the REST API - required |
giraph.rexster.port |
integer |
8182 |
Rexster port where to contact the REST API. |
giraph.rexster.ssl |
boolean |
false |
Rexster flag to set the connection over SSL instaed of clear-text.
|
giraph.rexster.username |
string |
|
Rexster username to access the REST API. |
giraph.rexster.password |
string |
|
Rexster password to access the REST API. |
Input Format Configurations
label |
type |
default value |
description |
giraph.rexster.input.graph |
graphdb |
string |
Rexster input graph.
|
giraph.rexster.input.vertex |
integer |
1000 |
Rexster number of estimated vertices in the graph to be loaded.
|
giraph.rexster.input.edge |
integer |
1000 |
Rexster number of estimated vertices in the graph to be loaded.
|
giraph.input.rexster.vertices.gremlinScript |
string |
|
If the database is Gremlin enabled, the script will be used to
retrieve the vertices from the Rexster exposed database.
|
giraph.input.rexster.edges.gremlinScript |
string |
|
If the database is Gremlin enabled, the script will be used to
retrieve the edges from the Rexster exposed database.
|
Output Format Configurations
label |
type |
default value |
description |
giraph.rexster.output.graph |
graphdb |
string |
Rexster output graph.
|
giraph.rexster.output.vlabel |
string |
_vid |
Rexster Vertex ID label for the JSON format.
|
giraph.rexster.output.backoffDelay |
integer |
5 |
Rexster back-off delay in milliseconds which is multiplied to an
exponentially increasing counter. Needed to deal with deadlocks and
consistency raised by the graph database
|
giraph.rexster.output.backoffRetry |
integer |
20 |
Rexster output format wait timeout (seconds). This is used to wake up
the thread to call progress very x seconds if not progress from the
ZooKeeper is detected.
|
giraph.rexster.output.timeout |
integer |
10 |
Rexster output format wait timeout (seconds). This is
used to wake up the thread to call progress very x
seconds if not progress from the ZooKeeper is
detected.
|
giraph.rexster.output.vertex.txsize |
integer |
1000 |
Rexster Output format transaction size. This parameter
defines how many vertexes are sent for each
transaction.
|
giraph.rexster.output.edge.txsize |
integer |
1000 |
Rexster Output format transaction size. This parameter
defines how many edges are sent for each
transaction.
|
Prepare The Environment
In this section we will briefly explain how to prepare a Rexster server
for your computation. For additional information about Rexster and
the configuration of the server, you can take a look at the
Rexster Wiki.
As it is visible in the
quick
start above, to start a new Rexster server, it is extremely easy.
First of all, you need to download one of the versions available on the
Tinkerpop repository. We suggest you to get the most recent version, as we
will explain later when talking about
cavet. So, the
first step is to download rexster and unzip it.
$ wget http://tinkerpop.com/downloads/rexster/rexster-server-2.4.0.zip
$ unzip rexster-server-2.4.0.zip
At this point, it is important to perpare the database you are going to
use, allowing the Giraph Kibble to be available for the database. This is
done by adding the entry
<allow>tp:giraph</allow>"
for the desired graph under the
<extension> tag scope.
Moreover, you will need to copy the Giraph Kibble into the
ext/
directory of rexster.
$ cp /path/to/giraph/giraph-rexster/giraph-kibble/target/giraph-kibble-${hadoop.version}.jar rexster-server-2.4.0/ext/
At this point, just enter the rexster directory and start the server.
$ cd rexster-server-2.4.0
$ ./bin/rexster.sh -s
This command will automatically locate the configuration file in the
config/ directory and will automatically provide you with
some initial database. To test the server is properly working, open
a browser and type the following URL.
http://localhost:8182/graphs/
This will provide you with a JSON listing the available loaded graphs.
Example explained: Input Format
The first part of the API that we are presenting is the
Rexster Input Format. This API allows a Giraph computation to load
the graph from one database exposed by an existing
Vertex Input Format
As anticipated earlier, the input API provides two required abstract
classes, namely
RexsterVertexInputFormat and
RexsterEdgeInputFormat. This is required, since the Giraph
Kibble provides two different URIs to load the vertices and the edges.
NB: you need to make also sure that the rexster hostname is provided to
Giraph, since this is a mandatory parameter.
The two classes below are directly extracted from the Giraph source code
repository and exemplify how to implement custom
RexsterVertexInputFormat and
RexsterEdgeInputFormat.
public class RexsterLongDoubleFloatVertexInputFormat
extends RexsterVertexInputFormat<LongWritable, DoubleWritable,
FloatWritable> {
@Override
public RexsterVertexReader createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new RexsterLongDoubleFloatVertexReader();
}
/**
* Rexster vertex reader
*/
protected class RexsterLongDoubleFloatVertexReader
extends RexsterVertexReader {
@Override
protected Vertex<LongWritable, DoubleWritable, FloatWritable> parseVertex(
JSONObject jsonVertex) throws JSONException {
/* create the actual vertex */
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
getConf().createVertex();
Long id;
try {
id = jsonVertex.getLong("_id");
} catch (JSONException ex) {
/* OrientDB compatibility; try to transform it as long */
String idString = jsonVertex.getString("_id");
String[] splits = idString.split(":");
id = Long.parseLong(splits[1]);
}
vertex.initialize(new LongWritable(id), new DoubleWritable(0));
return vertex;
}
}
}
Edge Input Format
public class RexsterLongFloatEdgeInputFormat
extends RexsterEdgeInputFormat<LongWritable, FloatWritable> {
@Override
public RexsterEdgeReader createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new RexsterLongFloatEdgeReader();
}
protected class RexsterLongFloatEdgeReader extends RexsterEdgeReader {
/** source vertex of the edge */
private LongWritable sourceId;
@Override
public LongWritable getCurrentSourceId()
throws IOException, InterruptedException {
return this.sourceId;
}
@Override
protected Edge<LongWritable, FloatWritable> parseEdge(JSONObject jsonEdge)
throws JSONException {
Long value = jsonEdge.getLong("weight");
Long dest;
try {
dest = jsonEdge.getLong("_outV");
} catch (JSONException ex) {
/* OrientDB compatibility; try to transform it as long */
String idString = jsonEdge.getString("_outV");
String[] splits = idString.split(":");
dest = Long.parseLong(splits[1]);
}
Edge<LongWritable, FloatWritable> edge =
EdgeFactory.create(new LongWritable(dest), new FloatWritable(value));
Long sid;
try {
sid = jsonEdge.getLong("_inV");
} catch (JSONException ex) {
/* OrientDB compatibility; try to transform it as long */
String sidString = jsonEdge.getString("_inV");
String[] splits = sidString.split(":");
sid = Long.parseLong(splits[1]);
}
this.sourceId = new LongWritable(sid);
return edge;
}
}
}
Usage
To use these classes, it is simple and does not require any particular
effort. To provide you with an example, below you can find the Hadoop
command issued to start a Shortest Path computation by loading the
graph from Rexster.
hadoop jar /path/to/giraph/giraph-examples/target/giraph-examples-*-jar-with-dependencies.jar \
org.apache.giraph.GiraphRunner \
-libjars /path/to/giraph/giraph-rexster/giraph-rexster-io/target/giraph-rexster-io*-jar-with-dependencies.jar \
org.apache.giraph.examples.SimpleShortestPathsComputation \
-vif org.apache.giraph.rexster.io.formats.RexsterLongDoubleFloatVertexInputFormat \
-eif org.apache.giraph.rexster.io.formats.RexsterLongFloatEdgeInputFormat \
-vof org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat \
-op output \
-w 1
Example explained: Output Format
Vertex Output Format
Also in this case, the output API provides two required
classes, namely
RexsterVertexOutputFormat and
RexsterEdgeOutputFormat. Also in this case, both are required,
due to the way the Giraph Kibble provides manages the sotring of the
edges.
NB: to deal with databases deadlocks and consistency issues, the
Kibble uses the Exponetial Backoff strategy to complete the transation.
Make sure that the parameters for the time daley and number of retry, suit
your needs. Moreover, to reduce the quantiti of memory used by rexster,
the size of each transaction is also provided. Make sure that also this
parameter suits your environment.
Differently from the Input format present above, in this case you can
directly make us of the
RexsterVertexOutputFormat and
RexsterEdgeOutputFormat classes without the need to implement
your own. However, in some cases it is still reasonable to user your
own.
The two classes below are directly extracted from the Giraph source code
repository and exemplify how to implement custom
RexsterVertexOutputFormat and
RexsterEdgeOutputFormat.
public class RexsterLongDoubleFloatVertexOutputFormat
extends RexsterVertexOutputFormat<LongWritable, DoubleWritable,
FloatWritable> {
@Override
public RexsterVertexWriter createVertexWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
return new RexsterLongDoubleFloatVertexWriter();
}
/**
* Rexster vertex writer.
*/
protected class RexsterLongDoubleFloatVertexWriter
extends RexsterVertexWriter {
/** current vertex ID */
private LongWritable vertexId;
@Override
protected JSONObject getVertex(
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
throws JSONException {
vertexId = vertex.getId();
double value = vertex.getValue().get();
JSONObject jsonVertex = new JSONObject();
jsonVertex.accumulate("value", value);
return jsonVertex;
}
@Override
protected LongWritable getVertexId() {
return vertexId;
}
}
}
Edge Output Format
public class RexsterLongDoubleFloatEdgeOutputFormat
extends RexsterEdgeOutputFormat<LongWritable, DoubleWritable,
FloatWritable> {
@Override
public RexsterEdgeWriter createEdgeWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
return new RexsterLongDoubleFloatEdgeWriter();
}
/**
* Rexster edge writer.
*/
protected class RexsterLongDoubleFloatEdgeWriter
extends RexsterEdgeWriter {
@Override
protected JSONObject getEdge(LongWritable srcId, DoubleWritable srcValue,
Edge<LongWritable, FloatWritable> edge) throws JSONException {
long outId = srcId.get();
long inId = edge.getTargetVertexId().get();
float value = edge.getValue().get();
JSONObject jsonEdge = new JSONObject();
jsonEdge.accumulate("_outV", outId);
jsonEdge.accumulate("_inV", inId);
jsonEdge.accumulate("value", value);
return jsonEdge;
}
}
}
Usage
Also in this case, we provide you with an example of how to use these
classes.
hadoop jar /path/to/giraph/giraph-examples/target/giraph-examples-*-jar-with-dependencies.jar \
org.apache.giraph.GiraphRunner \
-libjars /path/to/giraph/giraph-rexster/giraph-rexster-io/target/giraph-rexster-io*-jar-with-dependencies.jar \
org.apache.giraph.examples.SimpleShortestPathsComputation \
-vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat \
-vof org.apache.giraph.rexster.io.formats.RexsterVertexOutputFormat \
-eof org.apache.giraph.rexster.io.formats.RexsterEdgeOutputFormat \
-vip input/ \
-w 1
Cavet
OrientDB
One of the most important details that you must be aware of is that
only using Rexster with a version equal or grater to 2.5.0 you will be
able to work with OrietnDB. Unfortunately, the previous versions of
Rexster include the buggy OrientDB API, which cause issues that are very
difficult to handle. With newer versions of OrientDB, the API has been
improved and the system works as expected.