This project has retired. For details please refer to its
Attic page.
DiskBackedPartitionStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.data;
20
21 import com.google.common.collect.Maps;
22 import org.apache.giraph.bsp.BspService;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.OutEdges;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.ooc.OutOfCoreEngine;
27 import org.apache.giraph.ooc.persistence.DataIndex;
28 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
29 import org.apache.giraph.partition.Partition;
30 import org.apache.giraph.partition.PartitionStore;
31 import org.apache.giraph.utils.ExtendedDataOutput;
32 import org.apache.giraph.utils.WritableUtils;
33 import org.apache.giraph.worker.BspServiceWorker;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapreduce.Mapper;
37 import org.apache.log4j.Logger;
38
39 import java.io.DataInput;
40 import java.io.DataOutput;
41 import java.io.IOException;
42 import java.util.Map;
43
44 import static com.google.common.base.Preconditions.checkNotNull;
45
46
47
48
49
50
51
52
53
54
55
56 public class DiskBackedPartitionStore<I extends WritableComparable,
57 V extends Writable, E extends Writable>
58 extends DiskBackedDataStore<ExtendedDataOutput>
59 implements PartitionStore<I, V, E> {
60
61 private static final Logger LOG =
62 Logger.getLogger(DiskBackedPartitionStore.class);
63
64 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
65
66 private final Mapper<?, ?, ?, ?>.Context context;
67
68 private final PartitionStore<I, V, E> partitionStore;
69
70
71
72
73
74 private final Map<Integer, Long> partitionVertexCount =
75 Maps.newConcurrentMap();
76
77
78
79
80
81 private final Map<Integer, Long> partitionEdgeCount =
82 Maps.newConcurrentMap();
83
84
85
86
87
88
89
90
91
92
93 public DiskBackedPartitionStore(
94 PartitionStore<I, V, E> partitionStore,
95 ImmutableClassesGiraphConfiguration<I, V, E> conf,
96 Mapper<?, ?, ?, ?>.Context context,
97 OutOfCoreEngine oocEngine) {
98 super(conf, oocEngine);
99 this.partitionStore = partitionStore;
100 this.conf = conf;
101 this.context = context;
102 }
103
104 @Override
105 public boolean addPartition(Partition<I, V, E> partition) {
106 boolean added = partitionStore.addPartition(partition);
107 if (added) {
108 oocEngine.getMetaPartitionManager()
109 .addPartition(partition.getId());
110 }
111 return added;
112 }
113
114 @Override
115 public Partition<I, V, E> removePartition(Integer partitionId) {
116
117
118 oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId);
119 oocEngine.retrievePartition(partitionId);
120 Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
121 checkNotNull(partition, "removePartition: partition " + partitionId +
122 " is not in memory for removal!");
123 oocEngine.getMetaPartitionManager().removePartition(partitionId);
124 return partition;
125 }
126
127 @Override
128 public boolean hasPartition(Integer partitionId) {
129 return oocEngine.getMetaPartitionManager().hasPartition(partitionId);
130 }
131
132 @Override
133 public Iterable<Integer> getPartitionIds() {
134 return oocEngine.getMetaPartitionManager().getPartitionIds();
135 }
136
137 @Override
138 public int getNumPartitions() {
139 return oocEngine.getMetaPartitionManager().getNumPartitions();
140 }
141
142 @Override
143 public long getPartitionVertexCount(Integer partitionId) {
144 if (partitionStore.hasPartition(partitionId)) {
145 return partitionStore.getPartitionVertexCount(partitionId);
146 } else {
147 return partitionVertexCount.get(partitionId);
148 }
149 }
150
151 @Override
152 public long getPartitionEdgeCount(Integer partitionId) {
153 if (partitionStore.hasPartition(partitionId)) {
154 return partitionStore.getPartitionEdgeCount(partitionId);
155 } else {
156 return partitionEdgeCount.get(partitionId);
157 }
158 }
159
160 @Override
161 public boolean isEmpty() {
162 return getNumPartitions() == 0;
163 }
164
165 @Override
166 public void startIteration() {
167 oocEngine.startIteration();
168 }
169
170 @Override
171 public Partition<I, V, E> getNextPartition() {
172 Integer partitionId = oocEngine.getNextPartition();
173 if (partitionId == null) {
174 return null;
175 }
176 Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
177 if (partition == null) {
178 if (LOG.isInfoEnabled()) {
179 LOG.info("getNextPartition: partition " + partitionId + " is not in " +
180 "the partition store. Creating an empty partition for it.");
181 }
182 partition = conf.createPartition(partitionId, context);
183 }
184 partitionStore.addPartition(partition);
185 return partition;
186 }
187
188 @Override
189 public void putPartition(Partition<I, V, E> partition) {
190 oocEngine.doneProcessingPartition(partition.getId());
191 }
192
193 @Override
194 public void addPartitionVertices(Integer partitionId,
195 ExtendedDataOutput extendedDataOutput) {
196 addEntry(partitionId, extendedDataOutput);
197 }
198
199 @Override
200 public void shutdown() {
201 oocEngine.shutdown();
202 }
203
204 @Override
205 public void initialize() {
206 oocEngine.initialize();
207 }
208
209
210
211
212
213
214
215
216 private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
217 throws IOException {
218 I id = conf.createVertexId();
219 id.readFields(in);
220 V value = null;
221 boolean hasNullValue = in.readBoolean();
222 if (!hasNullValue) {
223 value = conf.createVertexValue();
224 value.readFields(in);
225 }
226 OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
227 vertex.initialize(id, value, edges);
228 if (in.readBoolean()) {
229 vertex.voteToHalt();
230 } else {
231 vertex.wakeUp();
232 }
233 }
234
235
236
237
238
239
240
241
242 private void readOutEdges(DataInput in, Partition<I, V, E> partition)
243 throws IOException {
244 I id = conf.createVertexId();
245 id.readFields(in);
246 Vertex<I, V, E> v = partition.getVertex(id);
247 if (v == null) {
248 throw new IllegalStateException("Vertex with ID " + id +
249 " not found in partition " + partition.getId() +
250 " which has " + partition.getVertexCount() + " vertices and " +
251 partition.getEdgeCount() + " edges.");
252 }
253 OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
254 edges.readFields(in);
255 partition.saveVertex(v);
256 }
257
258 @Override
259 protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
260 DataIndex index) throws IOException {
261 long numBytes = 0;
262
263 if (hasPartitionDataOnFile.remove(partitionId)) {
264 Partition<I, V, E> partition = conf.createPartition(partitionId, context);
265 OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
266 index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
267 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
268 dataAccessor.prepareInput(ioThreadId, index.copy());
269 DataInput dataInput = inputWrapper.getDataInput();
270 long numVertices = dataInput.readLong();
271 for (long i = 0; i < numVertices; ++i) {
272 Vertex<I, V, E> vertex = conf.createVertex();
273 readVertexData(dataInput, vertex);
274 partition.putVertex(vertex);
275 }
276 numBytes += inputWrapper.finalizeInput(true);
277
278
279 index.removeLastIndex()
280 .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
281 inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
282 dataInput = inputWrapper.getDataInput();
283 for (int i = 0; i < numVertices; ++i) {
284 readOutEdges(dataInput, partition);
285 }
286
287
288 boolean shouldDeleteEdges = false;
289 if (!conf.isStaticGraph() ||
290 oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
291 shouldDeleteEdges = true;
292 }
293 numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
294 index.removeLastIndex();
295 partitionStore.addPartition(partition);
296 }
297 return numBytes;
298 }
299
300 @Override
301 protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException {
302 return WritableUtils.readExtendedDataOutput(in, conf);
303 }
304
305 @Override
306 protected void addEntryToInMemoryPartitionData(int partitionId,
307 ExtendedDataOutput vertices) {
308 if (!partitionStore.hasPartition(partitionId)) {
309 oocEngine.getMetaPartitionManager().addPartition(partitionId);
310 }
311 partitionStore.addPartitionVertices(partitionId, vertices);
312 }
313
314 @Override
315 public long loadPartitionData(int partitionId)
316 throws IOException {
317 return loadPartitionDataProxy(partitionId,
318 new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
319 }
320
321 @Override
322 public long offloadPartitionData(int partitionId)
323 throws IOException {
324 return offloadPartitionDataProxy(partitionId,
325 new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
326 }
327
328
329
330
331
332
333
334
335 private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
336 throws IOException {
337 vertex.getId().write(output);
338 V value = vertex.getValue();
339 if (value != null) {
340 output.writeBoolean(false);
341 value.write(output);
342 } else {
343 output.writeBoolean(true);
344 }
345 output.writeBoolean(vertex.isHalted());
346 }
347
348
349
350
351
352
353
354
355 private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
356 throws IOException {
357 vertex.getId().write(output);
358 OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
359 edges.write(output);
360 }
361
362 @Override
363 protected long offloadInMemoryPartitionData(
364 int partitionId, int ioThreadId, DataIndex index) throws IOException {
365 long numBytes = 0;
366 if (partitionStore.hasPartition(partitionId)) {
367 OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
368 partitionVertexCount.put(partitionId,
369 partitionStore.getPartitionVertexCount(partitionId));
370 partitionEdgeCount.put(partitionId,
371 partitionStore.getPartitionEdgeCount(partitionId));
372 Partition<I, V, E> partition =
373 partitionStore.removePartition(partitionId);
374 LOG.debug(
375 "Offloading partition " + partition + " DataIndex[" + index + "]");
376 index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
377 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
378 dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
379 DataOutput dataOutput = outputWrapper.getDataOutput();
380 dataOutput.writeLong(partition.getVertexCount());
381 for (Vertex<I, V, E> vertex : partition) {
382 writeVertexData(dataOutput, vertex);
383 }
384 numBytes += outputWrapper.finalizeOutput();
385 index.removeLastIndex();
386
387
388
389
390 index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
391 if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
392 !conf.isStaticGraph() ||
393 !dataAccessor.dataExist(ioThreadId, index)) {
394 outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
395 false);
396 for (Vertex<I, V, E> vertex : partition) {
397 writeOutEdges(outputWrapper.getDataOutput(), vertex);
398 }
399 numBytes += outputWrapper.finalizeOutput();
400 }
401 index.removeLastIndex();
402 hasPartitionDataOnFile.add(partitionId);
403 }
404 return numBytes;
405 }
406
407 @Override
408 protected void writeEntry(ExtendedDataOutput vertices, DataOutput out)
409 throws IOException {
410 WritableUtils.writeExtendedDataOutput(vertices, out);
411 }
412
413 @Override
414 public long offloadBuffers(int partitionId)
415 throws IOException {
416 return offloadBuffersProxy(partitionId,
417 new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
418 }
419
420 @Override
421 protected int entrySerializedSize(ExtendedDataOutput vertices) {
422 return vertices.getPos();
423 }
424 }