This project has retired. For details please refer to its
Attic page.
ObjectMessageClasses xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.piece.messages;
19
20 import org.apache.giraph.combiner.MessageCombiner;
21 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.conf.MessageClasses;
24 import org.apache.giraph.factories.MessageValueFactory;
25 import org.apache.giraph.utils.ReflectionUtils;
26 import org.apache.giraph.writable.kryo.KryoWritable;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29
30 import com.google.common.base.Preconditions;
31
32
33
34
35
36
37
38
39 public class ObjectMessageClasses<I extends WritableComparable,
40 M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
41 private final Class<M> messageClass;
42 private final SupplierFromConf<MessageValueFactory<M>>
43 messageValueFactorySupplier;
44 private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
45 messageCombinerSupplier;
46 private final MessageEncodeAndStoreType messageEncodeAndStoreType;
47 private final boolean ignoreExistingVertices;
48
49 public ObjectMessageClasses() {
50 this(null, null, null, null, false);
51 }
52
53 public ObjectMessageClasses(Class<M> messageClass,
54 SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
55 SupplierFromConf<? extends MessageCombiner<? super I, M>>
56 messageCombinerSupplier,
57 MessageEncodeAndStoreType messageEncodeAndStoreType,
58 boolean ignoreExistingVertices) {
59 this.messageClass = messageClass;
60 this.messageValueFactorySupplier = messageValueFactorySupplier;
61 this.messageCombinerSupplier = messageCombinerSupplier;
62 this.messageEncodeAndStoreType = messageEncodeAndStoreType;
63 this.ignoreExistingVertices = ignoreExistingVertices;
64 }
65
66 @Override
67 public Class<M> getMessageClass() {
68 return messageClass;
69 }
70
71 @Override
72 public MessageValueFactory<M> createMessageValueFactory(
73 ImmutableClassesGiraphConfiguration conf) {
74 return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
75 }
76
77 @Override
78 public MessageCombiner<? super I, M> createMessageCombiner(
79 ImmutableClassesGiraphConfiguration<I, ? extends Writable,
80 ? extends Writable> conf) {
81 return messageCombinerSupplier != null ?
82 Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
83 }
84
85 @Override
86 public boolean useMessageCombiner() {
87 return messageCombinerSupplier != null;
88 }
89
90 @Override
91 public boolean ignoreExistingVertices() {
92 return ignoreExistingVertices;
93 }
94
95 @Override
96 public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
97 return messageEncodeAndStoreType;
98 }
99
100 @Override
101 public MessageClasses<I, M> createCopyForNewSuperstep() {
102 return new ObjectMessageClasses<>(
103 messageClass, messageValueFactorySupplier,
104 messageCombinerSupplier, messageEncodeAndStoreType,
105 ignoreExistingVertices);
106 }
107
108 @Override
109 public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
110 MessageValueFactory<M> messageValueFactory =
111 messageValueFactorySupplier.apply(conf);
112 Preconditions.checkState(
113 messageValueFactory.newInstance().getClass().equals(messageClass));
114
115 if (messageCombinerSupplier != null) {
116 MessageCombiner<? super I, M> messageCombiner =
117 messageCombinerSupplier.apply(conf);
118 Preconditions.checkState(messageCombiner.createInitialMessage()
119 .getClass().equals(messageClass));
120 Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
121 MessageCombiner.class, messageCombiner.getClass());
122 ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
123 "Vertex id", messageCombiner.getClass());
124 ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
125 "Outgoing message", messageCombiner.getClass());
126 }
127 }
128 }