1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.bsp;
20
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.List;
24
25 import org.apache.giraph.comm.ServerData;
26 import org.apache.giraph.comm.WorkerClient;
27 import org.apache.giraph.comm.messages.PartitionSplitInfo;
28 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
29 import org.apache.giraph.graph.FinishedSuperstepStats;
30 import org.apache.giraph.graph.GlobalStats;
31 import org.apache.giraph.graph.GraphTaskManager;
32 import org.apache.giraph.graph.VertexEdgeCount;
33 import org.apache.giraph.io.superstep_output.SuperstepOutput;
34 import org.apache.giraph.metrics.GiraphTimerContext;
35 import org.apache.giraph.partition.PartitionOwner;
36 import org.apache.giraph.partition.PartitionStats;
37 import org.apache.giraph.partition.PartitionStore;
38 import org.apache.giraph.worker.WorkerAggregatorHandler;
39 import org.apache.giraph.worker.WorkerContext;
40 import org.apache.giraph.worker.WorkerInfo;
41 import org.apache.giraph.worker.WorkerInputSplitsHandler;
42 import org.apache.giraph.worker.WorkerObserver;
43 import org.apache.hadoop.io.Writable;
44 import org.apache.hadoop.io.WritableComparable;
45
46 /**
47 * All workers should have access to this centralized service to
48 * execute the following methods.
49 *
50 * @param <I> Vertex id
51 * @param <V> Vertex value
52 * @param <E> Edge value
53 */
54 @SuppressWarnings("rawtypes")
55 public interface CentralizedServiceWorker<I extends WritableComparable,
56 V extends Writable, E extends Writable>
57 extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
58 /**
59 * Setup (must be called prior to any other function)
60 *
61 * @return Finished superstep stats for the input superstep
62 */
63 FinishedSuperstepStats setup();
64
65 /**
66 * Get the worker information
67 *
68 * @return Worker information
69 */
70 WorkerInfo getWorkerInfo();
71
72 /**
73 * Get the worker client (for instantiating WorkerClientRequestProcessor
74 * instances.
75 *
76 * @return Worker client
77 */
78 WorkerClient<I, V, E> getWorkerClient();
79
80 /**
81 * Get the worker context.
82 *
83 * @return worker's WorkerContext
84 */
85 WorkerContext getWorkerContext();
86
87 /**
88 * Get the observers for this Worker.
89 *
90 * @return array of WorkerObservers.
91 */
92 WorkerObserver[] getWorkerObservers();
93
94 /**
95 * Get the partition store for this worker.
96 * The partitions contain the vertices for
97 * this worker and can be used to run compute() for the vertices or do
98 * checkpointing.
99 *
100 * @return The partition store for this worker.
101 */
102 PartitionStore<I, V, E> getPartitionStore();
103
104 /**
105 * Both the vertices and the messages need to be checkpointed in order
106 * for them to be used. This is done after all messages have been
107 * delivered, but prior to a superstep starting.
108 */
109 void storeCheckpoint() throws IOException;
110
111 /**
112 * Load the vertices, edges, messages from the beginning of a superstep.
113 * Will load the vertex partitions as designated by the master and set the
114 * appropriate superstep.
115 *
116 * @param superstep which checkpoint to use
117 * @return Graph-wide vertex and edge counts
118 * @throws IOException
119 */
120 VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
121
122 /**
123 * Take all steps prior to actually beginning the computation of a
124 * superstep.
125 *
126 * @return Collection of all the partition owners from the master for this
127 * superstep.
128 */
129 Collection<? extends PartitionOwner> startSuperstep();
130
131 /**
132 * Worker is done with its portion of the superstep. Report the
133 * worker level statistics after the computation.
134 *
135 * @param partitionStatsList All the partition stats for this worker
136 * @param superstepTimerContext superstep timer context only given when the
137 * function needs to stop the timer, otherwise null.
138 * @return Stats of the superstep completion
139 */
140 FinishedSuperstepStats finishSuperstep(
141 List<PartitionStats> partitionStatsList,
142 GiraphTimerContext superstepTimerContext);
143
144 /**
145 * Get the partition id that a vertex id would belong to.
146 *
147 * @param vertexId Vertex id
148 * @return Partition id
149 */
150 @Override
151 int getPartitionId(I vertexId);
152
153 /**
154 * Whether a partition with given id exists on this worker.
155 *
156 * @param partitionId Partition id
157 * @return True iff this worker has the specified partition
158 */
159 boolean hasPartition(Integer partitionId);
160
161 /**
162 * Every client will need to get a partition owner from a vertex id so that
163 * they know which worker to sent the request to.
164 *
165 * @param vertexId Vertex index to look for
166 * @return PartitionOnwer that should contain this vertex if it exists
167 */
168 PartitionOwner getVertexPartitionOwner(I vertexId);
169
170 /**
171 * Get all partition owners.
172 *
173 * @return Iterable through partition owners
174 */
175 Iterable<? extends PartitionOwner> getPartitionOwners();
176
177 /**
178 * If desired by the user, vertex partitions are redistributed among
179 * workers according to the chosen WorkerGraphPartitioner.
180 *
181 * @param masterSetPartitionOwners Partition owner info passed from the
182 * master.
183 */
184 void exchangeVertexPartitions(
185 Collection<? extends PartitionOwner> masterSetPartitionOwners);
186
187 /**
188 * Get the GraphTaskManager that this service is using. Vertices need to know
189 * this.
190 *
191 * @return the GraphTaskManager instance for this compute node
192 */
193 GraphTaskManager<I, V, E> getGraphTaskManager();
194
195 /**
196 * Operations that will be called if there is a failure by a worker.
197 */
198 void failureCleanup();
199
200 /**
201 * Get server data
202 *
203 * @return Server data
204 */
205 ServerData<I, V, E> getServerData();
206
207 /**
208 * Get worker aggregator handler
209 *
210 * @return Worker aggregator handler
211 */
212 WorkerAggregatorHandler getAggregatorHandler();
213
214 /**
215 * Final preparation for superstep, called after startSuperstep and
216 * potential loading from checkpoint, right before the computation started
217 * TODO how to avoid this additional function
218 */
219 void prepareSuperstep();
220
221 /**
222 * Get the superstep output class
223 *
224 * @return SuperstepOutput
225 */
226 SuperstepOutput<I, V, E> getSuperstepOutput();
227
228 /**
229 * Clean up the service (no calls may be issued after this)
230 *
231 * @param finishedSuperstepStats Finished supestep stats
232 * @throws IOException
233 * @throws InterruptedException
234 */
235 void cleanup(FinishedSuperstepStats finishedSuperstepStats)
236 throws IOException, InterruptedException;
237
238 /**
239 * Loads Global stats from zookeeper.
240 * @return global stats stored in zookeeper for
241 * previous superstep.
242 */
243 GlobalStats getGlobalStats();
244
245 /**
246 * Get input splits handler used during input
247 *
248 * @return Input splits handler
249 */
250 WorkerInputSplitsHandler getInputSplitsHandler();
251
252 /**
253 * Received addresses and partitions assignments from master.
254 *
255 * @param addressesAndPartitions Addresses and partitions assignment
256 */
257 void addressesAndPartitionsReceived(
258 AddressesAndPartitionsWritable addressesAndPartitions);
259
260 /**
261 * Store the counter values in the zookeeper after every superstep
262 * and also after all supersteps are done. This is called before closing
263 * the zookeeper. We need to call this method after calling cleanup on the
264 * worker, since some counters are updated during cleanup
265 * @param allSuperstepsDone boolean value whether all the supersteps
266 * are completed
267 */
268 void storeCountersInZooKeeper(boolean allSuperstepsDone);
269
270 /**
271 * Close zookeeper
272 */
273 void closeZooKeeper();
274 }