This project has retired. For details please refer to its
Attic page.
InternalMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.local;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.ThreadLocalRandom;
25
26 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
27 import org.apache.giraph.comm.messages.MessageStore;
28 import org.apache.giraph.comm.messages.PartitionSplitInfo;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.conf.MessageClasses;
31 import org.apache.giraph.factories.MessageValueFactory;
32 import org.apache.giraph.utils.WritableUtils;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35
36 import com.google.common.collect.Iterators;
37
38
39
40
41
42
43
44 @SuppressWarnings("rawtypes")
45 interface InternalMessageStore
46 <I extends WritableComparable, M extends Writable> {
47 Iterator<I> targetVertexIds();
48 boolean hasMessage(I id);
49 Iterable<M> takeMessages(I id);
50 void sendMessage(I id, M message);
51 void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
52 void finalizeStore();
53 Iterable<I> getPartitionDestinationVertices(int partitionId);
54
55
56
57
58
59
60
61
62 class InternalWrappedMessageStore
63 <I extends WritableComparable, M extends Writable>
64 implements InternalMessageStore<I, M> {
65 private final MessageStore<I, M> messageStore;
66 private final PartitionSplitInfo<I> partitionInfo;
67
68 private InternalWrappedMessageStore(
69 ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
70 MessageStore<I, M> messageStore,
71 PartitionSplitInfo<I> partitionInfo
72 ) {
73 this.messageStore = messageStore;
74 this.partitionInfo = partitionInfo;
75 }
76
77 public static <I extends WritableComparable, M extends Writable>
78 InternalMessageStore<I, M> create(
79 ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
80 MessageClasses<I, M> messageClasses,
81 PartitionSplitInfo<I> partitionInfo
82 ) {
83 InMemoryMessageStoreFactory<I, M> factory =
84 new InMemoryMessageStoreFactory<>();
85 factory.initialize(partitionInfo, conf);
86 return new InternalWrappedMessageStore<>(
87 conf,
88 factory.newStore(messageClasses),
89 partitionInfo
90 );
91 }
92
93 @Override
94 public void sendMessage(I id, M message) {
95 try {
96 messageStore.addMessage(id, message);
97 } catch (IOException e) {
98 throw new RuntimeException(e);
99 }
100 }
101
102 @Override
103 public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
104 while (idIter.hasNext()) {
105 sendMessage(idIter.next(), message);
106 }
107 }
108
109 @Override
110 public Iterable<M> takeMessages(I id) {
111 Iterable<M> result = messageStore.getVertexMessages(id);
112 messageStore.clearVertexMessages(id);
113 return result;
114 }
115
116 @Override
117 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
118 return messageStore.getPartitionDestinationVertices(partitionId);
119 }
120
121 @Override
122 public Iterator<I> targetVertexIds() {
123 List<Iterator<I>> iterators = new ArrayList<>();
124 for (int partition : partitionInfo.getPartitionIds()) {
125 Iterable<I> vertices =
126 messageStore.getPartitionDestinationVertices(partition);
127 iterators.add(vertices.iterator());
128 }
129 return Iterators.concat(iterators.iterator());
130 }
131
132 @Override
133 public boolean hasMessage(I id) {
134 return messageStore.hasMessagesForVertex(id);
135 }
136
137 @Override
138 public void finalizeStore() {
139 messageStore.finalizeStore();
140 }
141 }
142
143
144
145
146
147 static class InternalChecksMessageStore
148 <I extends WritableComparable, M extends Writable>
149 implements InternalMessageStore<I, M> {
150 private final InternalMessageStore<I, M> messageStore;
151 private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
152 private final MessageValueFactory<M> messageFactory;
153
154 public InternalChecksMessageStore(
155 InternalMessageStore<I, M> messageStore,
156 ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
157 MessageValueFactory<M> messageFactory
158 ) {
159 this.messageStore = messageStore;
160 this.conf = conf;
161 this.messageFactory = messageFactory;
162 }
163
164
165
166
167 private M maybeMessageCopy(M message) {
168 M messageCopy = WritableUtils.createCopy(
169 message, messageFactory, conf);
170 return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message;
171 }
172
173 private void checkIdCopy(I id) {
174 WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf);
175 }
176
177 @Override
178 public void sendMessage(I id, M message) {
179 checkIdCopy(id);
180 messageStore.sendMessage(id, maybeMessageCopy(message));
181 }
182
183 @Override
184 public void sendMessageToMultipleEdges(
185 final Iterator<I> idIter, M message) {
186 messageStore.sendMessageToMultipleEdges(
187 new Iterator<I>() {
188 @Override
189 public boolean hasNext() {
190 return idIter.hasNext();
191 }
192
193 @Override
194 public I next() {
195 I id = idIter.next();
196 checkIdCopy(id);
197 return id;
198 }
199
200 @Override
201 public void remove() {
202 idIter.remove();
203 }
204 },
205 maybeMessageCopy(message));
206 }
207
208 @Override
209 public Iterable<M> takeMessages(I id) {
210 checkIdCopy(id);
211 return messageStore.takeMessages(id);
212 }
213
214 @Override
215 public boolean hasMessage(I id) {
216 return messageStore.hasMessage(id);
217 }
218
219 @Override
220 public Iterator<I> targetVertexIds() {
221 return messageStore.targetVertexIds();
222 }
223
224 @Override
225 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
226 return messageStore.getPartitionDestinationVertices(partitionId);
227 }
228
229 @Override
230 public void finalizeStore() {
231 messageStore.finalizeStore();
232 }
233 }
234 }