1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.graph;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22
23 import org.apache.giraph.bsp.CentralizedServiceWorker;
24 import org.apache.giraph.comm.WorkerClientRequestProcessor;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
26 import org.apache.giraph.conf.TypesHolder;
27 import org.apache.giraph.edge.Edge;
28 import org.apache.giraph.edge.OutEdges;
29 import org.apache.giraph.worker.WorkerAggregatorUsage;
30 import org.apache.giraph.worker.WorkerContext;
31 import org.apache.giraph.worker.WorkerGlobalCommUsage;
32 import org.apache.giraph.worker.WorkerIndexUsage;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35 import org.apache.hadoop.mapreduce.Mapper;
36
37 /**
38 * Interface for an application for computation.
39 *
40 * During the superstep there can be several instances of this interface,
41 * each doing computation on one partition of the graph's vertices.
42 *
43 * Note that each thread will have its own {@link Computation},
44 * so accessing any data from this class is thread-safe.
45 * However, accessing global data (like data from {@link WorkerContext})
46 * is not thread-safe.
47 *
48 * Objects of this interface only live for a single superstep.
49 *
50 * @param <I> Vertex id
51 * @param <V> Vertex data
52 * @param <E> Edge data
53 * @param <M1> Incoming message type
54 * @param <M2> Outgoing message type
55 */
56 public interface Computation<I extends WritableComparable,
57 V extends Writable, E extends Writable, M1 extends Writable,
58 M2 extends Writable>
59 extends TypesHolder<I, V, E, M1, M2>,
60 ImmutableClassesGiraphConfigurable<I, V, E>,
61 WorkerGlobalCommUsage, WorkerAggregatorUsage, WorkerIndexUsage<I> {
62 /**
63 * Must be defined by user to do computation on a single Vertex.
64 *
65 * @param vertex Vertex
66 * @param messages Messages that were sent to this vertex in the previous
67 * superstep. Each message is only guaranteed to have
68 * a life expectancy as long as next() is not called.
69 */
70 void compute(Vertex<I, V, E> vertex, Iterable<M1> messages)
71 throws IOException;
72
73 /**
74 * Prepare for computation. This method is executed exactly once prior to
75 * {@link #compute(Vertex, Iterable)} being called for any of the vertices
76 * in the partition.
77 */
78 void preSuperstep();
79
80 /**
81 * Finish computation. This method is executed exactly once after computation
82 * for all vertices in the partition is complete.
83 */
84 void postSuperstep();
85
86 /**
87 * Initialize, called by infrastructure before the superstep starts.
88 * Shouldn't be called by user code.
89 *
90 * @param graphState Graph state
91 * @param workerClientRequestProcessor Processor for handling requests
92 * @param serviceWorker Centralized service worker
93 * @param workerGlobalCommUsage Worker global communication usage
94 */
95 void initialize(GraphState graphState,
96 WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
97 CentralizedServiceWorker<I, V, E> serviceWorker,
98 WorkerGlobalCommUsage workerGlobalCommUsage);
99
100 /**
101 * Retrieves the current superstep.
102 *
103 * @return Current superstep
104 */
105 long getSuperstep();
106
107 /**
108 * Get the total (all workers) number of vertices that
109 * existed in the previous superstep.
110 *
111 * @return Total number of vertices (-1 if first superstep)
112 */
113 long getTotalNumVertices();
114
115 /**
116 * Get the total (all workers) number of edges that
117 * existed in the previous superstep.
118 *
119 * @return Total number of edges (-1 if first superstep)
120 */
121 long getTotalNumEdges();
122
123 /**
124 * Send a message to a vertex id.
125 *
126 * @param id Vertex id to send the message to
127 * @param message Message data to send
128 */
129 void sendMessage(I id, M2 message);
130
131 /**
132 * Send a message to all edges.
133 *
134 * @param vertex Vertex whose edges to send the message to.
135 * @param message Message sent to all edges.
136 */
137 void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message);
138
139 /**
140 * Send a message to multiple target vertex ids in the iterator.
141 *
142 * @param vertexIdIterator An iterator to multiple target vertex ids.
143 * @param message Message sent to all targets in the iterator.
144 */
145 void sendMessageToMultipleEdges(Iterator<I> vertexIdIterator, M2 message);
146
147 /**
148 * Sends a request to create a vertex that will be available during the
149 * next superstep.
150 *
151 * @param id Vertex id
152 * @param value Vertex value
153 * @param edges Initial edges
154 * @throws IOException
155 */
156 void addVertexRequest(I id, V value, OutEdges<I, E> edges) throws IOException;
157
158 /**
159 * Sends a request to create a vertex that will be available during the
160 * next superstep.
161 *
162 * @param id Vertex id
163 * @param value Vertex value
164 * @throws IOException
165 */
166 void addVertexRequest(I id, V value) throws IOException;
167
168 /**
169 * Request to remove a vertex from the graph
170 * (applied just prior to the next superstep).
171 *
172 * @param vertexId Id of the vertex to be removed.
173 * @throws IOException
174 */
175 void removeVertexRequest(I vertexId) throws IOException;
176
177 /**
178 * Request to add an edge of a vertex in the graph
179 * (processed just prior to the next superstep)
180 *
181 * @param sourceVertexId Source vertex id of edge
182 * @param edge Edge to add
183 * @throws IOException
184 */
185 void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) throws IOException;
186
187 /**
188 * Request to remove all edges from a given source vertex to a given target
189 * vertex (processed just prior to the next superstep).
190 *
191 * @param sourceVertexId Source vertex id
192 * @param targetVertexId Target vertex id
193 * @throws IOException
194 */
195 void removeEdgesRequest(I sourceVertexId, I targetVertexId)
196 throws IOException;
197
198 /**
199 * Get the mapper context
200 *
201 * @return Mapper context
202 */
203 Mapper.Context getContext();
204
205 /**
206 * Get the worker context
207 *
208 * @param <W> WorkerContext class
209 * @return WorkerContext context
210 */
211 @SuppressWarnings("unchecked")
212 <W extends WorkerContext> W getWorkerContext();
213 }