This project has retired. For details please refer to its
Attic page.
MigrationFullBlockFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.migration;
19
20 import java.util.Iterator;
21
22 import org.apache.giraph.block_app.framework.AbstractBlockFactory;
23 import org.apache.giraph.block_app.framework.block.Block;
24 import org.apache.giraph.block_app.framework.block.PieceCount;
25 import org.apache.giraph.block_app.framework.block.SequenceBlock;
26 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
27 import org.apache.giraph.block_app.framework.piece.Piece;
28 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
29 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
30 import org.apache.giraph.combiner.MessageCombiner;
31 import org.apache.giraph.conf.GiraphConfiguration;
32 import org.apache.giraph.function.Consumer;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35
36 import com.google.common.collect.AbstractIterator;
37 import com.google.common.collect.Iterators;
38
39
40
41
42 public abstract class MigrationFullBlockFactory
43 extends AbstractBlockFactory<MigrationSuperstepStage> {
44
45 @Override
46 public MigrationSuperstepStage createExecutionStage(
47 GiraphConfiguration conf) {
48 return new MigrationSuperstepStageImpl();
49 }
50
51 @Override
52 protected Class<? extends MigrationWorkerContext> getWorkerContextValueClass(
53 GiraphConfiguration conf) {
54 return MigrationWorkerContext.class;
55 }
56
57 @SuppressWarnings("rawtypes")
58 public <I extends WritableComparable, V extends Writable, E extends Writable,
59 MR extends Writable, MS extends Writable>
60 Block createMigrationAppBlock(
61 Class<? extends MigrationFullAbstractComputation<I, V, E, MR, MS>>
62 computationClass,
63 MigrationFullMasterCompute masterCompute,
64 Class<MS> messageClass,
65 Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass,
66 GiraphConfiguration conf) {
67 final MigrationPiece<I, V, E, MR, MS> piece =
68 MigrationPiece.createFirstFullMigrationPiece(
69 computationClass, masterCompute, messageClass,
70 messageCombinerClass);
71 piece.sanityTypeChecks(conf, null);
72
73 return new SequenceBlock(
74 new Piece<WritableComparable, Writable, Writable,
75 Writable, MigrationSuperstepStage>() {
76 @Override
77 public MigrationSuperstepStage nextExecutionStage(
78 MigrationSuperstepStage executionStage) {
79 return executionStage.changedMigrationSuperstep(0);
80 }
81 },
82 new Block() {
83 private MigrationPiece curPiece = piece;
84
85 @Override
86 public Iterator<AbstractPiece> iterator() {
87 return Iterators.concat(
88 Iterators.singletonIterator(curPiece),
89 new AbstractIterator<AbstractPiece>() {
90 @Override
91 protected AbstractPiece computeNext() {
92 curPiece = curPiece.getNextPiece();
93 if (curPiece == null) {
94 endOfData();
95 }
96 return curPiece;
97 }
98 });
99 }
100
101 @Override
102 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
103 consumer.apply(curPiece);
104 }
105
106 @Override
107 public PieceCount getPieceCount() {
108 return curPiece.getPieceCount();
109 }
110 }
111 );
112 }
113 }