This project has retired. For details please refer to its
Attic page.
LongPointerListPerVertexStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.primitives.long_id;
20
21 import it.unimi.dsi.fastutil.longs.LongArrayList;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26
27 import org.apache.giraph.comm.messages.MessageStore;
28 import org.apache.giraph.comm.messages.PartitionSplitInfo;
29 import org.apache.giraph.comm.messages.PointerListMessagesIterable;
30 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31 import org.apache.giraph.factories.MessageValueFactory;
32 import org.apache.giraph.utils.EmptyIterable;
33 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
34 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
35 import org.apache.giraph.utils.ExtendedDataOutput;
36 import org.apache.giraph.utils.VertexIdMessageIterator;
37 import org.apache.giraph.utils.VertexIdMessages;
38 import org.apache.hadoop.io.LongWritable;
39 import org.apache.hadoop.io.Writable;
40
41
42
43
44
45
46
47
48 public class LongPointerListPerVertexStore<M extends Writable>
49 extends LongAbstractListStore<M, LongArrayList>
50 implements MessageStore<LongWritable, M> {
51
52
53 private final ExtendedByteArrayOutputBuffer bytesBuffer;
54
55
56
57
58
59
60
61
62 public LongPointerListPerVertexStore(
63 MessageValueFactory<M> messageValueFactory,
64 PartitionSplitInfo<LongWritable> partitionInfo,
65 ImmutableClassesGiraphConfiguration<LongWritable,
66 Writable, Writable> config) {
67 super(messageValueFactory, partitionInfo, config);
68 bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
69 }
70
71 @Override
72 public boolean isPointerListEncoding() {
73 return true;
74 }
75
76 @Override
77 protected LongArrayList createList() {
78 return new LongArrayList();
79 }
80
81 @Override
82 public void addPartitionMessages(
83 int partitionId,
84 VertexIdMessages<LongWritable, M> messages
85 ) {
86 try {
87 VertexIdMessageIterator<LongWritable, M> iterator =
88 messages.getVertexIdMessageIterator();
89 long pointer = 0;
90 LongArrayList list;
91 while (iterator.hasNext()) {
92 iterator.next();
93 M msg = iterator.getCurrentMessage();
94 list = getList(iterator.getCurrentVertexId());
95
96 if (iterator.isNewMessage()) {
97 IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
98 pointer = indexAndDataOut.getIndex();
99 pointer <<= 32;
100 ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
101 pointer += dataOutput.getPos();
102 msg.write(dataOutput);
103 }
104 synchronized (list) {
105 list.add(pointer);
106 }
107 }
108 } catch (IOException e) {
109 throw new RuntimeException("addPartitionMessages: IOException while" +
110 " adding messages for a partition: " + e);
111 }
112 }
113
114 @Override
115 public void addMessage(LongWritable vertexId, M message) throws IOException {
116 LongArrayList list = getList(vertexId);
117 IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
118 long pointer = indexAndDataOut.getIndex();
119 pointer <<= 32;
120 ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
121 pointer += dataOutput.getPos();
122 message.write(dataOutput);
123
124 synchronized (list) {
125 list.add(pointer);
126 }
127 }
128
129 @Override
130 public Iterable<M> getVertexMessages(LongWritable vertexId) {
131 LongArrayList list = getPartitionMap(vertexId).get(
132 vertexId.get());
133 if (list == null) {
134 return EmptyIterable.get();
135 } else {
136 return new PointerListMessagesIterable<>(messageValueFactory,
137 list, bytesBuffer);
138 }
139 }
140
141
142 @Override
143 public void writePartition(DataOutput out, int partitionId)
144 throws IOException {
145 }
146
147 @Override
148 public void readFieldsForPartition(DataInput in, int partitionId)
149 throws IOException {
150 }
151 }