This project has retired. For details please refer to its
Attic page.
MigrationMasterCompute xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.migration;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.aggregators.Aggregator;
25 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
26 import org.apache.giraph.block_app.framework.api.StatusReporter;
27 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
28 import org.apache.giraph.combiner.MessageCombiner;
29 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
30 import org.apache.giraph.conf.TypesHolder;
31 import org.apache.giraph.reducers.ReduceOperation;
32 import org.apache.giraph.utils.ReflectionUtils;
33 import org.apache.hadoop.io.Writable;
34
35
36
37
38
39 @SuppressWarnings({"unchecked", "rawtypes"})
40 public abstract class MigrationMasterCompute
41 extends DefaultImmutableClassesGiraphConfigurable implements Writable {
42 private BlockMasterApi api;
43
44 final void init(BlockMasterApi masterApi) {
45 this.api = masterApi;
46 setConf(masterApi.getConf());
47 }
48
49 @Override
50 public void readFields(DataInput in) throws IOException {
51 }
52
53 @Override
54 public void write(DataOutput out) throws IOException {
55 }
56
57 public void compute() {
58 }
59
60 public void initialize() throws InstantiationException,
61 IllegalAccessException {
62 }
63
64 @SuppressWarnings("deprecation")
65 public long getTotalNumVertices() {
66 return api.getTotalNumVertices();
67 }
68
69 @SuppressWarnings("deprecation")
70 public long getTotalNumEdges() {
71 return api.getTotalNumEdges();
72 }
73
74 public final <S, R extends Writable> void registerReducer(
75 String name, ReduceOperation<S, R> reduceOp) {
76 api.registerReducer(name, reduceOp);
77 }
78
79 public final <S, R extends Writable> void registerReducer(
80 String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
81 api.registerReducer(
82 name, reduceOp, globalInitialValue);
83 }
84
85 public final <T extends Writable> T getReduced(String name) {
86 return api.getReduced(name);
87 }
88
89 public final void broadcast(String name, Writable object) {
90 api.broadcast(name, object);
91 }
92
93 public final <A extends Writable> boolean registerAggregator(
94 String name, Class<? extends Aggregator<A>> aggregatorClass)
95 throws InstantiationException, IllegalAccessException {
96 return api.registerAggregator(
97 name, aggregatorClass);
98 }
99
100 @SuppressWarnings("deprecation")
101 public final <A extends Writable> boolean registerPersistentAggregator(
102 String name,
103 Class<? extends Aggregator<A>> aggregatorClass) throws
104 InstantiationException, IllegalAccessException {
105 return api.registerPersistentAggregator(name, aggregatorClass);
106 }
107
108 public final <A extends Writable> A getAggregatedValue(String name) {
109 return api.<A>getAggregatedValue(name);
110 }
111
112 public final <A extends Writable> void setAggregatedValue(
113 String name, A value) {
114 api.setAggregatedValue(name, value);
115 }
116
117 public final void logToCommandLine(String line) {
118 api.logToCommandLine(line);
119 }
120
121 public final StatusReporter getContext() {
122 return api;
123 }
124
125
126
127
128
129 public static class MigrationFullMasterCompute
130 extends MigrationMasterCompute {
131 private long superstep;
132 private boolean halt;
133 private Class<? extends MigrationAbstractComputation> computationClass;
134 private Class<? extends MigrationAbstractComputation> newComputationClass;
135 private Class<? extends Writable> originalMessage;
136 private Class<? extends Writable> newMessage;
137 private Class<? extends MessageCombiner> originalMessageCombiner;
138 private Class<? extends MessageCombiner> newMessageCombiner;
139
140 final void init(
141 long superstep,
142 Class<? extends MigrationAbstractComputation> computationClass,
143 Class<? extends Writable> message,
144 Class<? extends MessageCombiner> messageCombiner) {
145 this.superstep = superstep;
146 this.halt = false;
147 this.computationClass = computationClass;
148 this.newComputationClass = null;
149 this.originalMessage = message;
150 this.newMessage = null;
151 this.originalMessageCombiner = messageCombiner;
152 this.newMessageCombiner = null;
153 }
154
155 public final long getSuperstep() {
156 return superstep;
157 }
158
159 @Override
160 public final long getTotalNumVertices() {
161 if (superstep == 0) {
162 throw new RuntimeException(
163 "getTotalNumVertices not available in superstep=0");
164 }
165 return super.getTotalNumVertices();
166 }
167
168 @Override
169 public final long getTotalNumEdges() {
170 if (superstep == 0) {
171 throw new RuntimeException(
172 "getTotalNumEdges not available in superstep=0");
173 }
174 return super.getTotalNumEdges();
175 }
176
177
178 public final void haltComputation() {
179 halt = true;
180 }
181
182 public final boolean isHalted() {
183 return halt;
184 }
185
186 public final void setComputation(
187 Class<? extends MigrationFullAbstractComputation> computation) {
188 if (computation != null) {
189 newComputationClass = computation;
190 } else {
191
192 this.computationClass = null;
193 }
194 }
195
196 public final
197 Class<? extends MigrationAbstractComputation> getComputation() {
198 if (newComputationClass != null) {
199 return newComputationClass;
200 }
201 if (computationClass != null) {
202 return computationClass;
203 }
204 return null;
205 }
206
207 public final void setMessageCombiner(
208 Class<? extends MessageCombiner> combinerClass) {
209 this.newMessageCombiner = combinerClass;
210 }
211
212 public final Class<? extends MessageCombiner> getMessageCombiner() {
213 return newMessageCombiner != null ?
214 newMessageCombiner : originalMessageCombiner;
215 }
216
217 public final void setIncomingMessage(
218 Class<? extends Writable> incomingMessageClass) {
219 if (!originalMessage.equals(incomingMessageClass)) {
220 throw new IllegalArgumentException(
221 originalMessage + " and " + incomingMessageClass + " must be same");
222 }
223 }
224
225 public final void setOutgoingMessage(
226 Class<? extends Writable> outgoingMessageClass) {
227 newMessage = outgoingMessageClass;
228 }
229
230 final Class<? extends Writable> getOutgoingMessage() {
231 if (newMessage != null) {
232 return newMessage;
233 }
234
235 if (newComputationClass == null) {
236 return originalMessage;
237 }
238 Class[] computationTypes = ReflectionUtils.getTypeArguments(
239 TypesHolder.class, newComputationClass);
240 return computationTypes[4];
241 }
242
243 final Class<? extends MigrationAbstractComputation> getComputationClass() {
244 return newComputationClass != null ?
245 newComputationClass : computationClass;
246 }
247
248 final
249 Class<? extends MigrationAbstractComputation> getNewComputationClass() {
250 return newComputationClass;
251 }
252
253 final Class<? extends Writable> getNewMessage() {
254 return newMessage;
255 }
256
257 final Class<? extends MessageCombiner> getNewMessageCombiner() {
258 return newMessageCombiner;
259 }
260 }
261 }