This project has retired. For details please refer to its
Attic page.
DiskBackedEdgeStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.data;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.edge.EdgeStore;
23 import org.apache.giraph.ooc.OutOfCoreEngine;
24 import org.apache.giraph.ooc.persistence.DataIndex;
25 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
26 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
27 import org.apache.giraph.utils.VertexIdEdges;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.log4j.Logger;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35
36
37
38
39
40
41
42
43 public class DiskBackedEdgeStore<I extends WritableComparable,
44 V extends Writable, E extends Writable>
45 extends DiskBackedDataStore<VertexIdEdges<I, E>>
46 implements EdgeStore<I, V, E> {
47
48 private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
49
50 private final EdgeStore<I, V, E> edgeStore;
51
52 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
53
54
55
56
57
58
59
60
61
62 public DiskBackedEdgeStore(
63 EdgeStore<I, V, E> edgeStore,
64 ImmutableClassesGiraphConfiguration<I, V, E> conf,
65 OutOfCoreEngine oocEngine) {
66 super(conf, oocEngine);
67 this.edgeStore = edgeStore;
68 this.conf = conf;
69 }
70
71 @Override
72 public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) {
73 addEntry(partitionId, edges);
74 }
75
76 @Override
77 public void moveEdgesToVertices() {
78 edgeStore.moveEdgesToVertices();
79 }
80
81 @Override
82 public void writePartitionEdgeStore(int partitionId, DataOutput output)
83 throws IOException {
84
85
86 throw new IllegalStateException("writePartitionEdgeStore: this method " +
87 "should not be called for DiskBackedEdgeStore!");
88 }
89
90 @Override
91 public void readPartitionEdgeStore(int partitionId, DataInput input)
92 throws IOException {
93
94
95 throw new IllegalStateException("readPartitionEdgeStore: this method " +
96 "should not be called for DiskBackedEdgeStore!");
97 }
98
99 @Override
100 public boolean hasEdgesForPartition(int partitionId) {
101
102
103 throw new IllegalStateException("hasEdgesForPartition: this method " +
104 "should not be called for DiskBackedEdgeStore!");
105 }
106
107 @Override
108 public long loadPartitionData(int partitionId)
109 throws IOException {
110 return loadPartitionDataProxy(partitionId,
111 new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
112 }
113
114 @Override
115 public long offloadPartitionData(int partitionId)
116 throws IOException {
117 return offloadPartitionDataProxy(partitionId,
118 new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
119 }
120
121 @Override
122 public long offloadBuffers(int partitionId)
123 throws IOException {
124 return offloadBuffersProxy(partitionId,
125 new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
126 }
127
128 @Override
129 protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out)
130 throws IOException {
131 edges.write(out);
132 }
133
134 @Override
135 protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException {
136 VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>();
137 vertexIdEdges.setConf(conf);
138 vertexIdEdges.readFields(in);
139 return vertexIdEdges;
140 }
141
142 @Override
143 protected long loadInMemoryPartitionData(
144 int partitionId, int ioThreadId, DataIndex index) throws IOException {
145 long numBytes = 0;
146 if (hasPartitionDataOnFile.remove(partitionId)) {
147 OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
148 oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
149 edgeStore.readPartitionEdgeStore(partitionId,
150 inputWrapper.getDataInput());
151 numBytes = inputWrapper.finalizeInput(true);
152 }
153 return numBytes;
154 }
155
156 @Override
157 protected long offloadInMemoryPartitionData(
158 int partitionId, int ioThreadId, DataIndex index) throws IOException {
159 long numBytes = 0;
160 if (edgeStore.hasEdgesForPartition(partitionId)) {
161 OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
162 oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
163 false);
164 edgeStore.writePartitionEdgeStore(partitionId,
165 outputWrapper.getDataOutput());
166 numBytes = outputWrapper.finalizeOutput();
167 hasPartitionDataOnFile.add(partitionId);
168 }
169 return numBytes;
170 }
171
172 @Override
173 protected int entrySerializedSize(VertexIdEdges<I, E> edges) {
174 return edges.getSerializedSize();
175 }
176
177 @Override
178 protected void addEntryToInMemoryPartitionData(int partitionId,
179 VertexIdEdges<I, E> edges) {
180 oocEngine.getMetaPartitionManager().addPartition(partitionId);
181 edgeStore.addPartitionEdges(partitionId, edges);
182 }
183 }