This project has retired. For details please refer to its
Attic page.
SuperstepClasses xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26
27 import org.apache.giraph.combiner.MessageCombiner;
28 import org.apache.giraph.conf.DefaultMessageClasses;
29 import org.apache.giraph.conf.GiraphClasses;
30 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31 import org.apache.giraph.conf.MessageClasses;
32 import org.apache.giraph.conf.TypesHolder;
33 import org.apache.giraph.graph.Computation;
34 import org.apache.giraph.graph.Language;
35 import org.apache.giraph.utils.ReflectionUtils;
36 import org.apache.giraph.utils.WritableUtils;
37 import org.apache.hadoop.io.Writable;
38 import org.apache.hadoop.io.WritableComparable;
39 import org.apache.log4j.Logger;
40 import com.google.common.base.Preconditions;
41
42
43
44
45 public class SuperstepClasses implements Writable {
46
47 private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
48
49 private final ImmutableClassesGiraphConfiguration conf;
50
51
52 private Class<? extends Computation> computationClass;
53
54 private MessageClasses<? extends WritableComparable, ? extends Writable>
55 incomingMessageClasses;
56
57 private MessageClasses<? extends WritableComparable, ? extends Writable>
58 outgoingMessageClasses;
59
60
61
62
63
64
65
66
67 public SuperstepClasses(
68 ImmutableClassesGiraphConfiguration conf,
69 Class<? extends Computation> computationClass,
70 MessageClasses<? extends WritableComparable, ? extends Writable>
71 incomingMessageClasses,
72 MessageClasses<? extends WritableComparable, ? extends Writable>
73 outgoingMessageClasses) {
74 this.conf = conf;
75 this.computationClass = computationClass;
76 this.incomingMessageClasses = incomingMessageClasses;
77 this.outgoingMessageClasses = outgoingMessageClasses;
78 }
79
80
81
82
83
84
85 public static SuperstepClasses createToRead(
86 ImmutableClassesGiraphConfiguration conf) {
87 return new SuperstepClasses(conf, null, null, null);
88 }
89
90
91
92
93
94
95
96 public static SuperstepClasses createAndExtractTypes(
97 ImmutableClassesGiraphConfiguration conf) {
98 return new SuperstepClasses(
99 conf,
100 conf.getComputationClass(),
101 conf.getOutgoingMessageClasses(),
102 conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
103 }
104
105 public Class<? extends Computation> getComputationClass() {
106 return computationClass;
107 }
108
109 public MessageClasses<? extends WritableComparable, ? extends Writable>
110 getOutgoingMessageClasses() {
111 return outgoingMessageClasses;
112 }
113
114
115
116
117
118
119
120
121 public void setOutgoingMessageClasses(
122 MessageClasses<? extends WritableComparable, ? extends Writable>
123 outgoingMessageClasses) {
124 this.outgoingMessageClasses = outgoingMessageClasses;
125 }
126
127
128
129
130
131 public void setComputationClass(
132 Class<? extends Computation> computationClass) {
133 this.computationClass = computationClass;
134
135 if (computationClass != null) {
136 Class[] computationTypes = ReflectionUtils.getTypeArguments(
137 TypesHolder.class, computationClass);
138 if (computationTypes[4] != null &&
139 outgoingMessageClasses instanceof DefaultMessageClasses) {
140 ((DefaultMessageClasses) outgoingMessageClasses)
141 .setIfNotModifiedMessageClass(computationTypes[4]);
142 }
143 }
144 }
145
146
147
148
149
150
151
152
153
154 public void setMessageCombinerClass(
155 Class<? extends MessageCombiner> messageCombinerClass) {
156 Preconditions.checkState(
157 outgoingMessageClasses instanceof DefaultMessageClasses);
158 ((DefaultMessageClasses) outgoingMessageClasses).
159 setMessageCombinerClass(messageCombinerClass);
160 }
161
162
163
164
165
166 @Deprecated
167 public void setIncomingMessageClass(
168 Class<? extends Writable> incomingMessageClass) {
169 if (!incomingMessageClasses.getMessageClass().
170 equals(incomingMessageClass)) {
171 throw new IllegalArgumentException(
172 "Cannot change incoming message class from " +
173 incomingMessageClasses.getMessageClass() +
174 " previously, to " + incomingMessageClass);
175 }
176 }
177
178
179
180
181
182
183
184
185
186 public void setOutgoingMessageClass(
187 Class<? extends Writable> outgoingMessageClass) {
188 Preconditions.checkState(
189 outgoingMessageClasses instanceof DefaultMessageClasses);
190 ((DefaultMessageClasses) outgoingMessageClasses).
191 setMessageClass(outgoingMessageClass);
192 }
193
194
195
196
197
198 public Class<? extends MessageCombiner> getMessageCombinerClass() {
199 MessageCombiner combiner =
200 outgoingMessageClasses.createMessageCombiner(conf);
201 return combiner != null ? combiner.getClass() : null;
202 }
203
204
205
206
207
208
209
210
211 public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
212
213
214
215
216 if (COMPUTATION_LANGUAGE.get(conf) == Language.JYTHON) {
217 return;
218 }
219
220 Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
221 TypesHolder.class, computationClass);
222 ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
223 "Vertex id", computationClass);
224 ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
225 "Vertex value", computationClass);
226 ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
227 "Edge value", computationClass);
228
229 if (checkMatchingMesssageTypes) {
230 ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
231 computationTypes[3], "Incoming message type", computationClass);
232 }
233
234 ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
235 computationTypes[4], "Outgoing message type", computationClass);
236
237 outgoingMessageClasses.verifyConsistent(conf);
238 }
239
240
241
242
243
244 public void updateGiraphClasses(GiraphClasses classes) {
245 classes.setComputationClass(computationClass);
246 classes.setIncomingMessageClasses(incomingMessageClasses);
247 classes.setOutgoingMessageClasses(outgoingMessageClasses);
248 }
249
250 @Override
251 public void write(DataOutput output) throws IOException {
252 WritableUtils.writeClass(computationClass, output);
253 WritableUtils.writeWritableObject(incomingMessageClasses, output);
254 WritableUtils.writeWritableObject(outgoingMessageClasses, output);
255 }
256
257 @Override
258 public void readFields(DataInput input) throws IOException {
259 computationClass = WritableUtils.readClass(input);
260 incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
261 outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
262 }
263
264 @Override
265 public String toString() {
266 String computationName = computationClass == null ? "_not_set_" :
267 computationClass.getName();
268 return "(computation=" + computationName +
269 ",incoming=" + incomingMessageClasses +
270 ",outgoing=" + outgoingMessageClasses + ")";
271 }
272
273 }