I
- Vertex idV
- Vertex dataE
- Edge datapublic class DiskBackedPartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable> extends DiskBackedDataStore<ExtendedDataOutput> implements PartitionStore<I,V,E>
hasPartitionDataOnFile, MINIMUM_BUFFER_SIZE_TO_FLUSH, oocEngine
Constructor and Description |
---|
DiskBackedPartitionStore(PartitionStore<I,V,E> partitionStore,
ImmutableClassesGiraphConfiguration<I,V,E> conf,
org.apache.hadoop.mapreduce.Mapper.Context context,
OutOfCoreEngine oocEngine)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
protected void |
addEntryToInMemoryPartitionData(int partitionId,
ExtendedDataOutput vertices)
Adds a single entry for a given partition to the in-memory data store.
|
boolean |
addPartition(Partition<I,V,E> partition)
Add a *new* partition to the store.
|
void |
addPartitionVertices(Integer partitionId,
ExtendedDataOutput extendedDataOutput)
Add vertices to a given partition from a given DataOutput instance.
|
protected int |
entrySerializedSize(ExtendedDataOutput vertices)
Gets the size of a given entry in bytes.
|
Partition<I,V,E> |
getNextPartition()
Return the next partition in iteration for the current superstep.
|
int |
getNumPartitions()
Return the number of stored partitions.
|
long |
getPartitionEdgeCount(Integer partitionId)
Return the number of edges in a partition.
|
Iterable<Integer> |
getPartitionIds()
Return the ids of all the stored partitions as an Iterable.
|
long |
getPartitionVertexCount(Integer partitionId)
Return the number of vertices in a partition.
|
boolean |
hasPartition(Integer partitionId)
Whether a specific partition is present in the store.
|
void |
initialize()
Called at the beginning of the computation.
|
boolean |
isEmpty()
Whether the partition store is empty.
|
protected long |
loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Loads data of a partition into data store.
|
long |
loadPartitionData(int partitionId)
Loads and assembles all data for a given partition, and put it into the
data store.
|
long |
offloadBuffers(int partitionId)
Offloads raw data buffers of a given partition to disk, and returns the
number of bytes offloaded from memory to disk.
|
protected long |
offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Offloads data of a partition in data store to disk.
|
long |
offloadPartitionData(int partitionId)
Offloads partition data of a given partition in the data store to disk, and
returns the number of bytes offloaded from memory to disk.
|
void |
putPartition(Partition<I,V,E> partition)
Put a partition back to the store.
|
protected ExtendedDataOutput |
readNextEntry(DataInput in)
Reads the next available raw entry from a given input stream.
|
Partition<I,V,E> |
removePartition(Integer partitionId)
Remove a partition and return it.
|
void |
shutdown()
Called at the end of the computation.
|
void |
startIteration()
Start the iteration cycle to iterate over partitions.
|
protected void |
writeEntry(ExtendedDataOutput vertices,
DataOutput out)
Writes a single raw entry to a given output stream.
|
addEntry, getCandidateBuffersToOffload, loadPartitionDataProxy, offloadBuffersProxy, offloadPartitionDataProxy
public DiskBackedPartitionStore(PartitionStore<I,V,E> partitionStore, ImmutableClassesGiraphConfiguration<I,V,E> conf, org.apache.hadoop.mapreduce.Mapper.Context context, OutOfCoreEngine oocEngine)
partitionStore
- In-memory partition store for which out-of-code
partition store would be a wrapperconf
- Configurationcontext
- Job contextoocEngine
- Out-of-core enginepublic boolean addPartition(Partition<I,V,E> partition)
PartitionStore
addPartition
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partition
- Partition to addpublic Partition<I,V,E> removePartition(Integer partitionId)
PartitionStore
removePartition
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partitionId
- Partition idpublic boolean hasPartition(Integer partitionId)
PartitionStore
hasPartition
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partitionId
- Partition idpublic Iterable<Integer> getPartitionIds()
PartitionStore
getPartitionIds
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public int getNumPartitions()
PartitionStore
getNumPartitions
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public long getPartitionVertexCount(Integer partitionId)
PartitionStore
getPartitionVertexCount
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partitionId
- Partition idpublic long getPartitionEdgeCount(Integer partitionId)
PartitionStore
getPartitionEdgeCount
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partitionId
- Partition idpublic boolean isEmpty()
PartitionStore
isEmpty
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public void startIteration()
PartitionStore
PartitionStore.getNextPartition()
to iterate over the partitions.
Each time PartitionStore.getNextPartition()
is called an unprocessed partition in
the current iteration is returned. After processing of the partition is
done, partition should be put back in the store using
PartitionStore.putPartition(Partition)
. Here is an example of the entire
workflow:
In the main thread:
partitionStore.startIteration();
In multiple threads iterating over the partitions:
Partition partition = partitionStore.getNextPartition();
... do stuff with partition ...
partitionStore.putPartition(partition);
Called from a single thread.startIteration
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public Partition<I,V,E> getNextPartition()
PartitionStore
PartitionStore.putPartition(Partition)
after use. Look at comments on
PartitionStore.startIteration()
for more detail.getNextPartition
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public void putPartition(Partition<I,V,E> partition)
PartitionStore
PartitionStore.getNextPartition()
.
Look at comments on PartitionStore.startIteration()
for more detail.putPartition
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partition
- Partitionpublic void addPartitionVertices(Integer partitionId, ExtendedDataOutput extendedDataOutput)
PartitionStore
addPartitionVertices
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
partitionId
- Partition idextendedDataOutput
- Output containing serialized vertex datapublic void shutdown()
PartitionStore
shutdown
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
public void initialize()
PartitionStore
initialize
in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>
protected long loadInMemoryPartitionData(int partitionId, int ioThreadId, DataIndex index) throws IOException
DiskBackedDataStore
loadInMemoryPartitionData
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to load its dataioThreadId
- id of the IO thread performing the loadindex
- data index chain for the data to loadIOException
protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException
DiskBackedDataStore
readNextEntry
in class DiskBackedDataStore<ExtendedDataOutput>
in
- input stream to read the entry fromIOException
protected void addEntryToInMemoryPartitionData(int partitionId, ExtendedDataOutput vertices)
DiskBackedDataStore
addEntryToInMemoryPartitionData
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to add the data tovertices
- input entry to add to the data storepublic long loadPartitionData(int partitionId) throws IOException
DiskBackedDataStore
loadPartitionData
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to load and assemble all data forIOException
public long offloadPartitionData(int partitionId) throws IOException
DiskBackedDataStore
offloadPartitionData
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to offload its dataIOException
protected long offloadInMemoryPartitionData(int partitionId, int ioThreadId, DataIndex index) throws IOException
DiskBackedDataStore
offloadInMemoryPartitionData
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to offload to diskioThreadId
- id of the IO thread performing the offloadindex
- data index chain for the data to offloadIOException
protected void writeEntry(ExtendedDataOutput vertices, DataOutput out) throws IOException
DiskBackedDataStore
writeEntry
in class DiskBackedDataStore<ExtendedDataOutput>
vertices
- entry to write to outputout
- output stream to write the entry toIOException
public long offloadBuffers(int partitionId) throws IOException
DiskBackedDataStore
offloadBuffers
in class DiskBackedDataStore<ExtendedDataOutput>
partitionId
- id of the partition to offload its raw data buffersIOException
protected int entrySerializedSize(ExtendedDataOutput vertices)
DiskBackedDataStore
entrySerializedSize
in class DiskBackedDataStore<ExtendedDataOutput>
vertices
- input entry to find its sizeCopyright © 2011-2020 The Apache Software Foundation. All Rights Reserved.