This project has retired. For details please refer to its
Attic page.
DiskBackedMessageStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.data;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24
25 import org.apache.giraph.comm.messages.MessageStore;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.giraph.factories.MessageValueFactory;
28 import org.apache.giraph.ooc.OutOfCoreEngine;
29 import org.apache.giraph.ooc.persistence.DataIndex;
30 import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
31 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
32 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
33 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
34 import org.apache.giraph.utils.VertexIdMessages;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.io.WritableComparable;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45
46 public class DiskBackedMessageStore<I extends WritableComparable,
47 M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
48 implements MessageStore<I, M> {
49
50 private static final Logger LOG =
51 Logger.getLogger(DiskBackedMessageStore.class);
52
53 private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
54
55 private final MessageStore<I, M> messageStore;
56
57 private final boolean useMessageCombiner;
58
59 private final long superstep;
60
61 private final MessageValueFactory<M> messageValueFactory;
62
63
64
65
66
67
68
69
70 private enum SerializedMessageClass {
71
72 BYTE_ARRAY_VERTEX_ID_MESSAGES,
73
74 BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88 public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
89 config,
90 OutOfCoreEngine oocEngine,
91 MessageStore<I, M> messageStore,
92 boolean useMessageCombiner, long superstep) {
93 super(config, oocEngine);
94 this.config = config;
95 this.messageStore = messageStore;
96 this.useMessageCombiner = useMessageCombiner;
97 this.superstep = superstep;
98 this.messageValueFactory = config.createOutgoingMessageValueFactory();
99 }
100
101 @Override
102 public boolean isPointerListEncoding() {
103 return messageStore.isPointerListEncoding();
104 }
105
106 @Override
107 public Iterable<M> getVertexMessages(I vertexId) {
108 return messageStore.getVertexMessages(vertexId);
109 }
110
111 @Override
112 public void clearVertexMessages(I vertexId) {
113 messageStore.clearVertexMessages(vertexId);
114 }
115
116 @Override
117 public void clearAll() {
118 messageStore.clearAll();
119 }
120
121 @Override
122 public boolean hasMessagesForVertex(I vertexId) {
123 return messageStore.hasMessagesForVertex(vertexId);
124 }
125
126 @Override
127 public boolean hasMessagesForPartition(int partitionId) {
128 return messageStore.hasMessagesForPartition(partitionId);
129 }
130
131 @Override
132 public void addPartitionMessages(
133 int partitionId, VertexIdMessages<I, M> messages) {
134 if (useMessageCombiner) {
135 messageStore.addPartitionMessages(partitionId, messages);
136 } else {
137 addEntry(partitionId, messages);
138 }
139 }
140
141 @Override
142 public void addMessage(I vertexId, M message) throws IOException {
143 if (useMessageCombiner) {
144 messageStore.addMessage(vertexId, message);
145 } else {
146
147 throw new UnsupportedOperationException();
148 }
149 }
150
151
152
153
154
155
156
157
158 private static String getPath(String basePath, long superstep) {
159 return basePath + "_messages-S" + superstep;
160 }
161
162 @Override
163 public long loadPartitionData(int partitionId)
164 throws IOException {
165 if (!useMessageCombiner) {
166 return loadPartitionDataProxy(partitionId,
167 new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
168 .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
169 } else {
170 return 0;
171 }
172 }
173
174 @Override
175 public long offloadPartitionData(int partitionId)
176 throws IOException {
177 if (!useMessageCombiner) {
178 return offloadPartitionDataProxy(partitionId,
179 new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
180 .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
181 } else {
182 return 0;
183 }
184 }
185
186 @Override
187 public long offloadBuffers(int partitionId)
188 throws IOException {
189 if (!useMessageCombiner) {
190 return offloadBuffersProxy(partitionId,
191 new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
192 .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
193 } else {
194 return 0;
195 }
196 }
197
198 @Override
199 public void finalizeStore() {
200 messageStore.finalizeStore();
201 }
202
203 @Override
204 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
205 return messageStore.getPartitionDestinationVertices(partitionId);
206 }
207
208 @Override
209 public void clearPartition(int partitionId) {
210 messageStore.clearPartition(partitionId);
211 }
212
213 @Override
214 public void writePartition(DataOutput out, int partitionId)
215 throws IOException {
216 messageStore.writePartition(out, partitionId);
217 }
218
219 @Override
220 public void readFieldsForPartition(DataInput in, int partitionId)
221 throws IOException {
222 messageStore.readFieldsForPartition(in, partitionId);
223 }
224
225 @Override
226 protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
227 throws IOException {
228 SerializedMessageClass messageClass;
229 if (messages instanceof ByteArrayVertexIdMessages) {
230 messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
231 } else if (messages instanceof ByteArrayOneMessageToManyIds) {
232 messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
233 } else {
234 throw new IllegalStateException("writeEntry: serialized message " +
235 "type is not supported");
236 }
237 out.writeByte(messageClass.ordinal());
238 messages.write(out);
239 }
240
241 @Override
242 protected VertexIdMessages<I, M> readNextEntry(DataInput in)
243 throws IOException {
244 byte messageType = in.readByte();
245 SerializedMessageClass messageClass =
246 SerializedMessageClass.values()[messageType];
247 VertexIdMessages<I, M> vim;
248 switch (messageClass) {
249 case BYTE_ARRAY_VERTEX_ID_MESSAGES:
250 vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
251 vim.setConf(config);
252 break;
253 case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
254 vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
255 vim.setConf(config);
256 break;
257 default:
258 throw new IllegalStateException("readNextEntry: unsupported " +
259 "serialized message type!");
260 }
261 vim.readFields(in);
262 return vim;
263 }
264
265 @Override
266 protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
267 DataIndex index) throws IOException {
268 long numBytes = 0;
269 if (hasPartitionDataOnFile.remove(partitionId)) {
270 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
271 oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
272 messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
273 partitionId);
274 numBytes = inputWrapper.finalizeInput(true);
275 }
276 return numBytes;
277 }
278
279 @Override
280 protected long offloadInMemoryPartitionData(
281 int partitionId, int ioThreadId, DataIndex index) throws IOException {
282 long numBytes = 0;
283 if (messageStore.hasMessagesForPartition(partitionId)) {
284 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
285 oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
286 false);
287 messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
288 messageStore.clearPartition(partitionId);
289 numBytes = outputWrapper.finalizeOutput();
290 hasPartitionDataOnFile.add(partitionId);
291 }
292 return numBytes;
293 }
294
295 @Override
296 protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
297 return messages.getSerializedSize();
298 }
299
300 @Override
301 protected void addEntryToInMemoryPartitionData(int partitionId,
302 VertexIdMessages<I, M>
303 messages) {
304 messageStore.addPartitionMessages(partitionId, messages);
305 }
306 }