This project has retired. For details please refer to its
Attic page.
InternalApi xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.local;
19
20 import static com.google.common.base.Preconditions.checkState;
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ThreadLocalRandom;
31
32 import org.apache.giraph.aggregators.Aggregator;
33 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
34 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
35 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
36 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
37 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
38 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
39 import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
40 import org.apache.giraph.block_app.framework.api.Counter;
41 import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
42 import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
43 import org.apache.giraph.block_app.framework.internal.BlockCounters;
44 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
45 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
46 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
47 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
48 import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
49 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
50 import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
51 import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
52 import org.apache.giraph.comm.messages.PartitionSplitInfo;
53 import org.apache.giraph.conf.GiraphConstants;
54 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
55 import org.apache.giraph.conf.MessageClasses;
56 import org.apache.giraph.edge.Edge;
57 import org.apache.giraph.edge.OutEdges;
58 import org.apache.giraph.graph.Vertex;
59 import org.apache.giraph.graph.VertexMutations;
60 import org.apache.giraph.graph.VertexResolver;
61 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
62 import org.apache.giraph.partition.GraphPartitionerFactory;
63 import org.apache.giraph.partition.Partition;
64 import org.apache.giraph.reducers.ReduceOperation;
65 import org.apache.giraph.utils.TestGraph;
66 import org.apache.giraph.utils.WritableUtils;
67 import org.apache.giraph.worker.WorkerAggregatorDelegator;
68 import org.apache.giraph.worker.WorkerGlobalCommUsage;
69 import org.apache.hadoop.io.Writable;
70 import org.apache.hadoop.io.WritableComparable;
71
72 import com.google.common.base.Preconditions;
73
74 import it.unimi.dsi.fastutil.ints.IntArrayList;
75 import it.unimi.dsi.fastutil.ints.IntList;
76
77
78
79
80
81
82
83
84
85 @SuppressWarnings({ "rawtypes", "unchecked" })
86 class InternalApi<I extends WritableComparable, V extends Writable,
87 E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
88 private final TestGraph<I, V, E> inputGraph;
89 private final List<Partition<I, V, E>> partitions;
90 private final GraphPartitionerFactory<I, V, E> partitionerFactory;
91
92 private final ImmutableClassesGiraphConfiguration conf;
93 private final boolean runAllChecks;
94 private final InternalAggregators globalComm;
95 private final AggregatorToGlobalCommTranslation aggregators;
96
97 private final boolean createVertexOnMsgs;
98 private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations;
99
100 private InternalMessageStore previousMessages;
101 private InternalMessageStore nextMessages;
102
103 private MessageClasses previousMessageClasses;
104 private MessageClasses nextMessageClasses;
105
106 private final InternalWorkerApi workerApi;
107 private final BlockWorkerContextLogic workerContextLogic;
108 private List<Writable> previousWorkerMessages;
109 private List<Writable> nextWorkerMessages;
110
111 public InternalApi(
112 TestGraph<I, V, E> graph,
113 ImmutableClassesGiraphConfiguration conf,
114 int numPartitions,
115 boolean runAllChecks) {
116 this.inputGraph = graph;
117 this.partitions = new ArrayList<>(numPartitions);
118 for (int i = 0; i < numPartitions; i++) {
119 this.partitions.add(conf.createPartition(i, null));
120 }
121 this.partitionerFactory = conf.createGraphPartitioner();
122 Preconditions.checkNotNull(this.partitionerFactory);
123 Preconditions.checkState(this.partitions.size() == numPartitions);
124
125 for (Vertex<I, V, E> vertex : graph) {
126 getPartition(vertex.getId()).putVertex(vertex);
127 }
128 graph.clear();
129
130 this.conf = conf;
131 this.runAllChecks = runAllChecks;
132 this.globalComm = new InternalAggregators(runAllChecks);
133 this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm);
134 this.mutations = new ConcurrentHashMap<>();
135 this.workerApi = new InternalWorkerApi();
136 this.workerApi.setConf(conf);
137 this.workerApi.setWorkerGlobalCommUsage(this.globalComm);
138
139 this.createVertexOnMsgs =
140 GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf);
141 workerContextLogic = new BlockWorkerContextLogic();
142 }
143
144
145
146
147
148
149 class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
150 implements BlockWorkerSendApi<I, V, E, Writable>,
151 BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>,
152 BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor,
153 WorkerGlobalCommUsage {
154
155 @Override
156 public void addVertexRequest(I id, V value) {
157 addVertexRequest(id, value, conf.createAndInitializeOutEdges());
158 }
159
160 @Override
161 public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
162 Vertex<I, V, E> vertex = conf.createVertex();
163 vertex.initialize(id, value, edges);
164 getMutationFor(id).addVertex(vertex);
165 }
166
167 @Override
168 public void removeVertexRequest(I vertexId) {
169 getMutationFor(vertexId).removeVertex();
170 }
171
172 @Override
173 public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
174 getMutationFor(sourceVertexId).addEdge(edge);
175 }
176
177 @Override
178 public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
179 getMutationFor(sourceVertexId).removeEdge(targetVertexId);
180 }
181
182 @Override
183 public void sendMessage(I id, Writable message) {
184 nextMessages.sendMessage(id, message);
185 }
186
187 @Override
188 public void sendMessageToAllEdges(
189 Vertex<I, V, E> vertex, Writable message) {
190 sendMessageToMultipleEdges(
191 new TargetVertexIdIterator<>(vertex),
192 message);
193 }
194
195 @Override
196 public void sendMessageToMultipleEdges(
197 Iterator<I> vertexIdIterator, Writable message) {
198 nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message);
199 }
200
201 @Override
202 public int getMyWorkerIndex() {
203 return 0;
204 }
205
206 @Override
207 public int getWorkerCount() {
208 return 1;
209 }
210
211 @Override
212 public int getWorkerForVertex(I vertexId) {
213 return 0;
214 }
215
216 @Override
217 public void sendMessageToWorker(Writable message, int workerIndex) {
218 Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
219 "With just one worker you can only send worker message to itself, " +
220 "but tried to send to " + workerIndex);
221 nextWorkerMessages.add(message);
222 }
223
224 @Override
225 public Object getWorkerValue() {
226 return workerContextLogic.getWorkerValue();
227 }
228
229 @Override
230 public long getTotalNumVertices() {
231 return InternalApi.this.getTotalNumVertices();
232 }
233
234 @Override
235 public long getTotalNumEdges() {
236 return InternalApi.this.getTotalNumEdges();
237 }
238
239 @Override
240 public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
241 OD getOutputDesc(String confOption) {
242 return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
243 confOption);
244 }
245
246 @Override
247 public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
248 return workerContextLogic.getOutputHandle().getWriter(confOption);
249 }
250
251 @Override
252 public void setStatus(String status) {
253 }
254
255 @Override
256 public void progress() {
257 }
258
259 @Override
260 public Counter getCounter(final String group, final String name) {
261 return BlockCounters.getNoOpCounter();
262 }
263 }
264
265 @Override
266 public void broadcast(String name, Writable value) {
267 globalComm.broadcast(name, value);
268 }
269
270 @Override
271 public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
272 BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
273 broadcast(handle.getName(), object);
274 return handle;
275 }
276
277 @Override
278 public <S, R extends Writable> void registerReducer(
279 String name, ReduceOperation<S, R> reduceOp) {
280 globalComm.registerReducer(name, reduceOp);
281 }
282
283 @Override
284 public <S, R extends Writable> void registerReducer(
285 String name, ReduceOperation<S, R> reduceOp,
286 R globalInitialValue) {
287 globalComm.registerReducer(name, reduceOp, globalInitialValue);
288 }
289
290 @Override
291 public <R extends Writable> R getReduced(String name) {
292 return globalComm.getReduced(name);
293 }
294
295 @Override
296 public <A extends Writable> A getAggregatedValue(String name) {
297 return aggregators.getAggregatedValue(name);
298 }
299
300 @Override
301 public <A extends Writable> void setAggregatedValue(String name, A value) {
302 aggregators.setAggregatedValue(name, value);
303 }
304
305 @Override
306 public <A extends Writable>
307 boolean registerAggregator(
308 String name, Class<? extends Aggregator<A>> aggregatorClass)
309 throws InstantiationException, IllegalAccessException {
310 return aggregators.registerAggregator(name, aggregatorClass);
311 }
312
313 @Override
314 public <A extends Writable>
315 boolean registerPersistentAggregator(
316 String name, Class<? extends Aggregator<A>> aggregatorClass)
317 throws InstantiationException, IllegalAccessException {
318 return aggregators.registerPersistentAggregator(name, aggregatorClass);
319 }
320
321 @Override
322 public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
323 return conf;
324 }
325
326 @Override
327 public void setStatus(String status) {
328 }
329
330 @Override
331 public void progress() {
332 }
333
334 @Override
335 public Counter getCounter(final String group, final String name) {
336 return BlockCounters.getNoOpCounter();
337 }
338
339 private VertexMutations<I, V, E> getMutationFor(I vertexId) {
340 VertexMutations<I, V, E> curMutations = new VertexMutations<>();
341 VertexMutations<I, V, E> prevMutations =
342 mutations.putIfAbsent(vertexId, curMutations);
343 if (prevMutations != null) {
344 curMutations = prevMutations;
345 }
346 return curMutations;
347 }
348
349 public Iterable takeMessages(I id) {
350 if (previousMessages != null) {
351 Iterable result = previousMessages.takeMessages(id);
352 if (result != null) {
353 return result;
354 }
355 }
356 return Collections.emptyList();
357 }
358
359 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
360 if (previousMessages != null) {
361 Iterable result =
362 previousMessages.getPartitionDestinationVertices(partitionId);
363 if (result != null) {
364 return result;
365 }
366 }
367 return Collections.emptyList();
368 }
369
370 public List<Writable> takeWorkerMessages() {
371 if (previousWorkerMessages != null) {
372 List<Writable> ret = new ArrayList<>(previousWorkerMessages.size());
373 for (Writable message : previousWorkerMessages) {
374
375
376
377 ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ?
378 WritableUtils.createCopy(message) : message);
379 }
380 previousWorkerMessages = null;
381 if (runAllChecks) {
382 Collections.shuffle(ret);
383 }
384 return ret;
385 }
386 return Collections.emptyList();
387 }
388
389 public void afterWorkerBeforeMaster() {
390 globalComm.afterWorkerBeforeMaster();
391 aggregators.prepareSuperstep();
392 }
393
394 public void afterMasterBeforeWorker() {
395 aggregators.postMasterCompute();
396 }
397
398 public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
399 afterMasterBeforeWorker();
400
401 previousMessages = nextMessages;
402 previousMessageClasses = nextMessageClasses;
403 previousWorkerMessages = nextWorkerMessages;
404
405 nextMessageClasses = computation.getOutgoingMessageClasses(conf);
406 nextMessages = createMessageStore(
407 conf,
408 nextMessageClasses,
409 createPartitionInfo(),
410 runAllChecks
411 );
412 nextWorkerMessages = new ArrayList<>();
413
414
415 if (previousMessages != null) {
416 previousMessages.finalizeStore();
417 }
418
419 boolean ignoreExistingVertices = ignoreExistingVertices();
420
421
422 VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
423 for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
424 I vertexIndex = entry.getKey();
425 Vertex<I, V, E> originalVertex =
426 getPartition(vertexIndex).getVertex(vertexIndex);
427 VertexMutations<I, V, E> curMutations = entry.getValue();
428 Vertex<I, V, E> vertex = vertexResolver.resolve(
429 vertexIndex,
430 originalVertex,
431 curMutations,
432 !ignoreExistingVertices && previousMessages != null &&
433 previousMessages.hasMessage(vertexIndex)
434 );
435
436 if (vertex != null) {
437 getPartition(vertex.getId()).putVertex(vertex);
438 } else if (originalVertex != null) {
439 getPartition(originalVertex.getId()).removeVertex(
440 originalVertex.getId());
441 if (!ignoreExistingVertices && previousMessages != null) {
442 previousMessages.takeMessages(originalVertex.getId());
443 }
444 }
445 }
446 mutations.clear();
447
448 if (!ignoreExistingVertices && createVertexOnMsgs &&
449 previousMessages != null) {
450 Iterator<I> iter = previousMessages.targetVertexIds();
451 while (iter.hasNext()) {
452 I target = iter.next();
453 if (getPartition(target).getVertex(target) == null) {
454
455 I copyId = WritableUtils.createCopy(target);
456
457 Vertex<I, V, E> vertex =
458 vertexResolver.resolve(copyId, null, null, true);
459
460 if (vertex != null) {
461 getPartition(vertex.getId()).putVertex(vertex);
462 }
463 }
464 }
465 }
466 }
467
468 public boolean ignoreExistingVertices() {
469 return previousMessageClasses != null &&
470 previousMessageClasses.ignoreExistingVertices();
471 }
472
473 private <M extends Writable>
474 InternalMessageStore<I, M> createMessageStore(
475 ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
476 MessageClasses<I, M> messageClasses,
477 PartitionSplitInfo<I> partitionInfo,
478 boolean runAllChecks
479 ) {
480 InternalMessageStore<I, M> messageStore =
481 InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo);
482 if (runAllChecks) {
483 return new InternalChecksMessageStore<I, M>(
484 messageStore, conf, messageClasses.createMessageValueFactory(conf));
485 } else {
486 return messageStore;
487 }
488 }
489
490 private PartitionSplitInfo<I> createPartitionInfo() {
491 return new PartitionSplitInfo<I>() {
492
493 private IntList partitionIds;
494
495 private Queue<Partition<I, V, E>> partitionQueue;
496
497 @Override
498 public int getPartitionId(I vertexId) {
499 return partitionerFactory.getPartition(vertexId, partitions.size(), 1);
500 }
501
502 @Override
503 public Iterable<Integer> getPartitionIds() {
504 if (partitionIds == null) {
505 partitionIds = new IntArrayList(partitions.size());
506 for (int i = 0; i < partitions.size(); i++) {
507 partitionIds.add(i);
508 }
509 }
510 Preconditions.checkState(partitionIds.size() == partitions.size());
511 return partitionIds;
512 }
513
514 @Override
515 public long getPartitionVertexCount(Integer partitionId) {
516 return partitions.get(partitionId).getVertexCount();
517 }
518
519 @Override
520 public void startIteration() {
521 checkState(partitionQueue == null || partitionQueue.isEmpty(),
522 "startIteration: It seems that some of " +
523 "of the partitions from previous iteration over partition store are" +
524 " not yet processed.");
525
526 partitionQueue = new LinkedList<Partition<I, V, E>>();
527 for (Partition<I, V, E> partition : partitions) {
528 partitionQueue.add(partition);
529 }
530 }
531
532 @Override
533 public Partition getNextPartition() {
534 return partitionQueue.poll();
535 }
536
537 @Override
538 public void putPartition(Partition partition) {
539 }
540 };
541 }
542
543 public List<Partition<I, V, E>> getPartitions() {
544 return partitions;
545 }
546
547 public InternalWorkerApi getWorkerApi() {
548 return workerApi;
549 }
550
551 @Override
552 public long getTotalNumEdges() {
553 int numEdges = 0;
554 for (Partition<I, V, E> partition : partitions) {
555 numEdges += partition.getEdgeCount();
556 }
557 return numEdges;
558 }
559
560 @Override
561 public long getTotalNumVertices() {
562 int numVertices = 0;
563 for (Partition<I, V, E> partition : partitions) {
564 numVertices += partition.getVertexCount();
565 }
566 return numVertices;
567 }
568
569 @Override
570 public void logToCommandLine(String line) {
571 System.err.println("Command line: " + line);
572 }
573
574 @Override
575 public BlockOutputHandle getBlockOutputHandle() {
576 return workerContextLogic.getOutputHandle();
577 }
578
579 @Override
580 public <OW extends BlockOutputWriter,
581 OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
582 return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
583 confOption);
584 }
585
586 @Override
587 public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
588 return workerContextLogic.getOutputHandle().getWriter(confOption);
589 }
590
591 public BlockWorkerContextLogic getWorkerContextLogic() {
592 return workerContextLogic;
593 }
594
595 @Override
596 public int getWorkerCount() {
597 return 1;
598 }
599
600 private int getPartitionId(I id) {
601 Preconditions.checkNotNull(id);
602 return partitionerFactory.getPartition(id, partitions.size(), 1);
603 }
604
605 private Partition<I, V, E> getPartition(I id) {
606 return partitions.get(getPartitionId(id));
607 }
608
609 public void postApplication() {
610 for (Partition<I, V, E> partition : partitions) {
611 for (Vertex<I, V, E> vertex : partition) {
612 inputGraph.setVertex(vertex);
613 }
614 }
615 }
616 }