This project has retired. For details please refer to its
Attic page.
DefaultMessageClasses xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.conf;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.combiner.MessageCombiner;
25 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26 import org.apache.giraph.factories.DefaultMessageValueFactory;
27 import org.apache.giraph.factories.MessageValueFactory;
28 import org.apache.giraph.utils.ReflectionUtils;
29 import org.apache.giraph.utils.WritableUtils;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.io.WritableComparable;
32 import com.google.common.base.Preconditions;
33
34
35
36
37
38
39
40 public class DefaultMessageClasses
41 <I extends WritableComparable, M extends Writable>
42 implements MessageClasses<I, M> {
43
44 private Class<M> messageClass;
45
46 private Class<? extends MessageValueFactory<M>>
47 messageValueFactoryClass;
48
49 private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
50
51 private boolean messageClassModified;
52
53 private MessageEncodeAndStoreType messageEncodeAndStoreType;
54
55
56 public DefaultMessageClasses() {
57 }
58
59
60
61
62
63
64
65
66 public DefaultMessageClasses(
67 Class<M> messageClass,
68 Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
69 Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
70 MessageEncodeAndStoreType messageEncodeAndStoreType) {
71 this.messageClass = messageClass;
72 this.messageValueFactoryClass = messageValueFactoryClass;
73 this.messageCombinerClass = messageCombinerClass;
74 this.messageClassModified = false;
75 this.messageEncodeAndStoreType = messageEncodeAndStoreType;
76 }
77
78 @Override
79 public Class<M> getMessageClass() {
80 return messageClass;
81 }
82
83 @Override
84 public MessageValueFactory<M> createMessageValueFactory(
85 ImmutableClassesGiraphConfiguration conf) {
86 if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
87 return new DefaultMessageValueFactory<>(messageClass, conf);
88 }
89
90 MessageValueFactory factory = ReflectionUtils.newInstance(
91 messageValueFactoryClass, conf);
92 if (!factory.newInstance().getClass().equals(messageClass)) {
93 throw new IllegalStateException("Message factory " +
94 messageValueFactoryClass + " creates " +
95 factory.newInstance().getClass().getName() + ", but message type is " +
96 messageClass.getName());
97 }
98 return factory;
99 }
100
101 @Override
102 public MessageCombiner<? super I, M> createMessageCombiner(
103 ImmutableClassesGiraphConfiguration conf) {
104 if (messageCombinerClass == null) {
105 return null;
106 }
107
108 MessageCombiner combiner =
109 ReflectionUtils.newInstance(messageCombinerClass, conf);
110 if (combiner != null) {
111 Preconditions.checkState(
112 combiner.createInitialMessage().getClass().equals(messageClass));
113 }
114 return combiner;
115 }
116
117 @Override
118 public boolean useMessageCombiner() {
119 return messageCombinerClass != null;
120 }
121
122 @Override
123 public boolean ignoreExistingVertices() {
124 return false;
125 }
126
127 @Override
128 public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
129 return messageEncodeAndStoreType;
130 }
131
132 @Override
133 public MessageClasses<I, M> createCopyForNewSuperstep() {
134 return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
135 messageCombinerClass, messageEncodeAndStoreType);
136 }
137
138 @Override
139 public void verifyConsistent(
140 ImmutableClassesGiraphConfiguration conf) {
141 Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
142 MessageValueFactory.class, messageValueFactoryClass);
143 ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
144 "Message factory", messageValueFactoryClass);
145
146 if (messageCombinerClass != null) {
147 Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
148 MessageCombiner.class, messageCombinerClass);
149 ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
150 "Vertex id", messageCombinerClass);
151 ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
152 "Outgoing message", messageCombinerClass);
153 }
154 }
155
156
157
158
159
160 public void setMessageClass(Class<M> messageClass) {
161 this.messageClassModified = true;
162 this.messageClass = messageClass;
163 }
164
165
166
167
168
169 public void setIfNotModifiedMessageClass(Class<M> messageClass) {
170 if (!messageClassModified) {
171 this.messageClass = messageClass;
172 }
173 }
174
175 public void setMessageCombinerClass(
176 Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
177 this.messageCombinerClass = messageCombinerClass;
178 }
179
180 public void setMessageValueFactoryClass(
181 Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
182 this.messageValueFactoryClass = messageValueFactoryClass;
183 }
184
185 public void setMessageEncodeAndStoreType(
186 MessageEncodeAndStoreType messageEncodeAndStoreType) {
187 this.messageEncodeAndStoreType = messageEncodeAndStoreType;
188 }
189
190 @Override
191 public void write(DataOutput out) throws IOException {
192 WritableUtils.writeClass(messageClass, out);
193 WritableUtils.writeClass(messageValueFactoryClass, out);
194 WritableUtils.writeClass(messageCombinerClass, out);
195 out.writeBoolean(messageClassModified);
196 out.writeByte(messageEncodeAndStoreType.ordinal());
197 }
198
199 @Override
200 public void readFields(DataInput in) throws IOException {
201 messageClass = WritableUtils.readClass(in);
202 messageValueFactoryClass = WritableUtils.readClass(in);
203 messageCombinerClass = WritableUtils.readClass(in);
204 messageClassModified = in.readBoolean();
205 messageEncodeAndStoreType =
206 messageEncodeAndStoreType.values()[in.readByte()];
207 }
208 }