This project has retired. For details please refer to its
Attic page.
OneMessagePerVertexStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.Collections;
25 import java.util.concurrent.ConcurrentMap;
26
27 import org.apache.giraph.bsp.CentralizedServiceWorker;
28 import org.apache.giraph.combiner.MessageCombiner;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.conf.MessageClasses;
31 import org.apache.giraph.factories.MessageValueFactory;
32 import org.apache.giraph.utils.VertexIdMessageIterator;
33 import org.apache.giraph.utils.VertexIdMessages;
34 import org.apache.giraph.utils.WritableUtils;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.io.WritableComparable;
37
38
39
40
41
42
43
44
45
46 public class OneMessagePerVertexStore<I extends WritableComparable,
47 M extends Writable> extends SimpleMessageStore<I, M, M> {
48
49 private final MessageCombiner<? super I, M> messageCombiner;
50
51
52
53
54
55
56
57 public OneMessagePerVertexStore(
58 MessageValueFactory<M> messageValueFactory,
59 PartitionSplitInfo<I> partitionInfo,
60 MessageCombiner<? super I, M> messageCombiner,
61 ImmutableClassesGiraphConfiguration<I, ?, ?> config
62 ) {
63 super(messageValueFactory, partitionInfo, config);
64 this.messageCombiner =
65 messageCombiner;
66 }
67
68 @Override
69 public boolean isPointerListEncoding() {
70 return false;
71 }
72
73 @Override
74 public void addPartitionMessages(
75 int partitionId,
76 VertexIdMessages<I, M> messages) {
77 ConcurrentMap<I, M> partitionMap =
78 getOrCreatePartitionMap(partitionId);
79 VertexIdMessageIterator<I, M> vertexIdMessageIterator =
80 messages.getVertexIdMessageIterator();
81
82
83 while (vertexIdMessageIterator.hasNext()) {
84 vertexIdMessageIterator.next();
85 I vertexId = vertexIdMessageIterator.getCurrentVertexId();
86 M currentMessage =
87 partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
88 if (currentMessage == null) {
89 M newMessage = messageCombiner.createInitialMessage();
90 currentMessage = partitionMap.putIfAbsent(
91 vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
92 if (currentMessage == null) {
93 currentMessage = newMessage;
94 }
95 }
96 synchronized (currentMessage) {
97 messageCombiner.combine(vertexId, currentMessage,
98 vertexIdMessageIterator.getCurrentMessage());
99 }
100 }
101 }
102
103 @Override
104 public void addMessage(I vertexId, M message) throws IOException {
105 ConcurrentMap<I, M> partitionMap =
106 getOrCreatePartitionMap(getPartitionId(vertexId));
107 M currentMessage = partitionMap.get(vertexId);
108 if (currentMessage == null) {
109 M newMessage = messageCombiner.createInitialMessage();
110
111 I copyId = WritableUtils.createCopy(vertexId);
112 currentMessage = partitionMap.putIfAbsent(copyId, newMessage);
113 if (currentMessage == null) {
114 currentMessage = newMessage;
115 }
116 }
117 synchronized (currentMessage) {
118 messageCombiner.combine(vertexId, currentMessage, message);
119 }
120 }
121
122 @Override
123 protected Iterable<M> getMessagesAsIterable(M message) {
124 return Collections.singleton(message);
125 }
126
127 @Override
128 protected int getNumberOfMessagesIn(ConcurrentMap<I, M> partitionMap) {
129 return partitionMap.size();
130 }
131
132 @Override
133 protected void writeMessages(M messages, DataOutput out) throws IOException {
134 messages.write(out);
135 }
136
137 @Override
138 protected M readFieldsForMessages(DataInput in) throws IOException {
139 M message = messageValueFactory.newInstance();
140 message.readFields(in);
141 return message;
142 }
143
144
145
146
147
148
149
150
151
152
153
154 public static <I extends WritableComparable, M extends Writable>
155 MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
156 CentralizedServiceWorker<I, ?, ?> service,
157 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
158 return new Factory<I, M>(service, config);
159 }
160
161
162
163
164
165
166
167 private static class Factory<I extends WritableComparable,
168 M extends Writable>
169 implements MessageStoreFactory<I, M, MessageStore<I, M>> {
170
171 private PartitionSplitInfo<I> partitionInfo;
172
173 private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
174
175
176
177
178
179 public Factory(
180 PartitionSplitInfo<I> partitionInfo,
181 ImmutableClassesGiraphConfiguration<I, ?, ?> config
182 ) {
183 this.partitionInfo = partitionInfo;
184 this.config = config;
185 }
186
187 @Override
188 public MessageStore<I, M> newStore(
189 MessageClasses<I, M> messageClasses) {
190 return new OneMessagePerVertexStore<I, M>(
191 messageClasses.createMessageValueFactory(config), partitionInfo,
192 messageClasses.createMessageCombiner(config), config);
193 }
194
195 @Override
196 public void initialize(PartitionSplitInfo<I> partitionInfo,
197 ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
198 this.partitionInfo = partitionInfo;
199 this.config = conf;
200 }
201 }
202 }