This project has retired. For details please refer to its
Attic page.
LocalBlockRunner xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.local;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.apache.giraph.block_app.framework.BlockFactory;
29 import org.apache.giraph.block_app.framework.BlockUtils;
30 import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
31 import org.apache.giraph.block_app.framework.block.Block;
32 import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
33 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
34 import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
35 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
36 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
37 import org.apache.giraph.conf.BooleanConfOption;
38 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39 import org.apache.giraph.conf.IntConfOption;
40 import org.apache.giraph.graph.OnlyIdVertex;
41 import org.apache.giraph.graph.Vertex;
42 import org.apache.giraph.io.SimpleVertexWriter;
43 import org.apache.giraph.partition.Partition;
44 import org.apache.giraph.utils.InternalVertexRunner;
45 import org.apache.giraph.utils.TestGraph;
46 import org.apache.giraph.utils.Trimmable;
47 import org.apache.giraph.utils.WritableUtils;
48 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
49 import org.apache.hadoop.io.Writable;
50 import org.apache.hadoop.io.WritableComparable;
51 import org.apache.hadoop.util.Progressable;
52
53 import com.google.common.base.Preconditions;
54 import com.google.common.collect.Iterables;
55
56
57
58
59
60
61
62 @SuppressWarnings({ "rawtypes", "unchecked" })
63 public class LocalBlockRunner {
64
65 public static final IntConfOption NUM_THREADS = new IntConfOption(
66 "test.LocalBlockRunner.NUM_THREADS", 3, "");
67
68 public static final IntConfOption NUM_PARTITIONS = new IntConfOption(
69 "test.LocalBlockRunner.NUM_PARTITIONS", 16, "");
70
71
72
73
74 public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
75 "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
76
77 public static final BooleanConfOption SERIALIZE_MASTER =
78 new BooleanConfOption(
79 "test.LocalBlockRunner.SERIALIZE_MASTER", false, "");
80
81 private LocalBlockRunner() { }
82
83
84
85
86
87
88
89
90 public static
91 <I extends WritableComparable, V extends Writable, E extends Writable>
92 TestGraph<I, V, E> runApp(
93 TestGraph<I, V, E> graph, boolean useFullDigraphTests) throws Exception {
94 if (useFullDigraphTests) {
95 return InternalVertexRunner.runWithInMemoryOutput(graph.getConf(), graph);
96 } else {
97 runApp(graph);
98 return graph;
99 }
100 }
101
102
103
104
105
106 public static
107 <I extends WritableComparable, V extends Writable, E extends Writable>
108 void runApp(TestGraph<I, V, E> graph) {
109 SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
110 runAppWithVertexOutput(graph, noOpVertexSaver);
111 }
112
113
114
115
116
117 public static
118 <I extends WritableComparable, V extends Writable, E extends Writable>
119 void runBlock(
120 TestGraph<I, V, E> graph, Block block, Object executionStage) {
121 SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
122 runBlockWithVertexOutput(
123 block, executionStage, graph, noOpVertexSaver);
124 }
125
126
127
128
129
130
131 public static
132 <I extends WritableComparable, V extends Writable, E extends Writable>
133 void runAppWithVertexOutput(
134 TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
135 BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
136 runBlockWithVertexOutput(
137 factory.createBlock(graph.getConf()),
138 factory.createExecutionStage(graph.getConf()),
139 graph, vertexSaver);
140 }
141
142
143
144
145
146 public static
147 <I extends WritableComparable, V extends Writable, E extends Writable>
148 void runBlockWithVertexOutput(
149 Block block, Object executionStage, TestGraph<I, V, E> graph,
150 final SimpleVertexWriter<I, V, E> vertexSaver
151 ) {
152 Preconditions.checkNotNull(block);
153 Preconditions.checkNotNull(graph);
154 ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
155 int numThreads = NUM_THREADS.get(conf);
156 int numPartitions = NUM_PARTITIONS.get(conf);
157 boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
158 boolean serializeMaster = SERIALIZE_MASTER.get(conf);
159 final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
160
161 final InternalApi internalApi =
162 new InternalApi(graph, conf, numPartitions, runAllChecks);
163 final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
164
165 BlockUtils.checkBlockTypes(block, executionStage, conf);
166
167 BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
168 blockMasterLogic.initialize(block, executionStage, internalApi);
169
170 BlockWorkerContextLogic workerContextLogic =
171 internalApi.getWorkerContextLogic();
172 workerContextLogic.preApplication(internalWorkerApi,
173 new BlockOutputHandle("", conf, new Progressable() {
174 @Override
175 public void progress() {
176 }
177 }));
178
179 ExecutorService executor = Executors.newFixedThreadPool(numThreads);
180
181 if (runAllChecks) {
182 for (Vertex<I, V, E> vertex : graph) {
183 V value = conf.createVertexValue();
184 WritableUtils.copyInto(vertex.getValue(), value);
185 vertex.setValue(value);
186
187 vertex.setEdges((Iterable) WritableUtils.createCopy(
188 (Writable) vertex.getEdges(), conf.getOutEdgesClass(), conf));
189 }
190 }
191
192 final AtomicBoolean anyVertexAlive = new AtomicBoolean(true);
193
194 for (int superstep = 0;; superstep++) {
195
196 if (serializeMaster) {
197 blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy(
198 new KryoWritableWrapper<>(blockMasterLogic),
199 KryoWritableWrapper.class,
200 conf).get();
201 blockMasterLogic.initializeAfterRead(internalApi);
202 }
203
204 if (!anyVertexAlive.get()) {
205 break;
206 }
207
208 final BlockWorkerPieces workerPieces =
209 blockMasterLogic.computeNext(superstep);
210 if (workerPieces == null) {
211 if (!conf.doOutputDuringComputation()) {
212 List<Partition<I, V, E>> partitions = internalApi.getPartitions();
213 for (Partition<I, V, E> partition : partitions) {
214 for (Vertex<I, V, E> vertex : partition) {
215 try {
216 vertexSaver.writeVertex(vertex);
217 } catch (IOException | InterruptedException e) {
218 throw new RuntimeException(e);
219 }
220 }
221 }
222 }
223 int left = executor.shutdownNow().size();
224 Preconditions.checkState(0 == left, "Some work still left to be done?");
225 break;
226 } else {
227 internalApi.afterMasterBeforeWorker(workerPieces);
228 List<Partition<I, V, E>> partitions = internalApi.getPartitions();
229
230 workerContextLogic.preSuperstep(
231 internalWorkerApi,
232 internalWorkerApi,
233 KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
234 internalApi.takeWorkerMessages());
235
236 final CountDownLatch latch = new CountDownLatch(numPartitions);
237 final AtomicReference<Throwable> exception = new AtomicReference<>();
238 anyVertexAlive.set(false);
239 for (final Partition<I, V, E> partition : partitions) {
240 executor.execute(new Runnable() {
241 @Override
242 public void run() {
243 try {
244 boolean anyCurVertexAlive = false;
245 BlockWorkerPieces localPieces =
246 KryoWritableWrapper.wrapAndCopy(workerPieces);
247
248 BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
249 localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
250
251 if (internalApi.ignoreExistingVertices()) {
252 Iterable<I> destinations =
253 internalApi.getPartitionDestinationVertices(
254 partition.getId());
255 if (!Iterables.isEmpty(destinations)) {
256 OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
257
258 for (I vertexId : destinations) {
259 Iterable messages = internalApi.takeMessages(vertexId);
260 Preconditions.checkState(!Iterables.isEmpty(messages));
261 vertex.setId(vertexId);
262 localLogic.compute(vertex, messages);
263
264 anyCurVertexAlive = true;
265 }
266 }
267 } else {
268 for (Vertex<I, V, E> vertex : partition) {
269 Iterable messages =
270 internalApi.takeMessages(vertex.getId());
271 if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
272 vertex.wakeUp();
273 }
274
275 if (!vertex.isHalted()) {
276 localLogic.compute(vertex, messages);
277
278
279 vertex.unwrapMutableEdges();
280
281 if (vertex instanceof Trimmable) {
282 ((Trimmable) vertex).trim();
283 }
284
285
286 if (doOutputDuringComputation) {
287 vertexSaver.writeVertex(vertex);
288 }
289
290 partition.saveVertex(vertex);
291 }
292
293 if (!vertex.isHalted()) {
294 anyCurVertexAlive = true;
295 }
296 }
297 }
298
299 if (anyCurVertexAlive) {
300 anyVertexAlive.set(true);
301 }
302 localLogic.postSuperstep();
303
304
305 } catch (Throwable t) {
306
307 t.printStackTrace();
308 exception.set(t);
309 }
310
311 latch.countDown();
312 }
313 });
314 }
315
316 try {
317 latch.await();
318 } catch (InterruptedException e) {
319 throw new RuntimeException("Thread intentionally interrupted", e);
320 }
321
322 if (exception.get() != null) {
323 throw new RuntimeException("Worker failed", exception.get());
324 }
325
326 workerContextLogic.postSuperstep();
327
328 internalApi.afterWorkerBeforeMaster();
329 }
330 }
331
332 workerContextLogic.postApplication();
333 internalApi.postApplication();
334 }
335
336 private static
337 <I extends WritableComparable, E extends Writable, V extends Writable>
338 SimpleVertexWriter<I, V, E> noOpVertexSaver() {
339 return new SimpleVertexWriter<I, V, E>() {
340 @Override
341 public void writeVertex(Vertex<I, V, E> vertex)
342 throws IOException, InterruptedException {
343
344 }
345 };
346 }
347
348 }