This project has retired. For details please refer to its
Attic page.
ByteArrayVertexIdMessages xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24
25 import org.apache.giraph.factories.MessageValueFactory;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29
30
31
32
33
34
35 @SuppressWarnings("unchecked")
36 public class ByteArrayVertexIdMessages<I extends WritableComparable,
37 M extends Writable> extends ByteArrayVertexIdData<I, M>
38 implements VertexIdMessages<I, M> {
39
40 private final MessageValueFactory<M> messageValueFactory;
41
42 private boolean useMessageSizeEncoding = false;
43
44
45
46
47
48
49 public ByteArrayVertexIdMessages(
50 MessageValueFactory<M> messageValueFactory) {
51 this.messageValueFactory = messageValueFactory;
52 }
53
54
55
56
57
58
59 private void setUseMessageSizeEncoding() {
60 if (!getConf().useOutgoingMessageCombiner()) {
61 useMessageSizeEncoding = getConf().useMessageSizeEncoding();
62 } else {
63 useMessageSizeEncoding = false;
64 }
65 }
66
67 @Override
68 public M createData() {
69 return messageValueFactory.newInstance();
70 }
71
72 @Override
73 public void writeData(ExtendedDataOutput out, M message) throws IOException {
74 message.write(out);
75 }
76
77 @Override
78 public void readData(ExtendedDataInput in, M message) throws IOException {
79 message.readFields(in);
80 }
81
82 @Override
83 public void initialize() {
84 super.initialize();
85 setUseMessageSizeEncoding();
86 }
87
88 @Override
89 public void initialize(int expectedSize) {
90 super.initialize(expectedSize);
91 setUseMessageSizeEncoding();
92 }
93
94 @Override
95 public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
96 return new ByteStructVertexIdMessageIterator<>(this);
97 }
98
99 @Override
100 public void add(I vertexId, M message) {
101 if (!useMessageSizeEncoding) {
102 super.add(vertexId, message);
103 } else {
104 try {
105 vertexId.write(extendedDataOutput);
106 writeMessageWithSize(message);
107 } catch (IOException e) {
108 throw new IllegalStateException("add: IOException occurred");
109 }
110 }
111 }
112
113 @Override
114 public void add(byte[] serializedId, int idPos, M message) {
115 if (!useMessageSizeEncoding) {
116 super.add(serializedId, idPos, message);
117 } else {
118 try {
119 extendedDataOutput.write(serializedId, 0, idPos);
120 writeMessageWithSize(message);
121 } catch (IOException e) {
122 throw new IllegalStateException("add: IOException occurred");
123 }
124 }
125 }
126
127
128
129
130
131
132 private void writeMessageWithSize(M message) throws IOException {
133 int pos = extendedDataOutput.getPos();
134 extendedDataOutput.skipBytes(4);
135 writeData(extendedDataOutput, message);
136 extendedDataOutput.writeInt(
137 pos, extendedDataOutput.getPos() - pos - 4);
138 }
139
140 @Override
141 public ByteStructVertexIdMessageBytesIterator<I, M>
142 getVertexIdMessageBytesIterator() {
143 if (!useMessageSizeEncoding) {
144 return null;
145 }
146 return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
147 @Override
148 public void writeCurrentMessageBytes(DataOutput dataOutput) {
149 try {
150 dataOutput.write(extendedDataOutput.getByteArray(),
151 messageOffset, messageBytes);
152 } catch (NegativeArraySizeException e) {
153 VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
154 } catch (IOException e) {
155 throw new IllegalStateException("writeCurrentMessageBytes: Got " +
156 "IOException", e);
157 }
158 }
159 };
160 }
161
162 @Override
163 public void write(DataOutput dataOutput) throws IOException {
164 dataOutput.writeBoolean(useMessageSizeEncoding);
165 super.write(dataOutput);
166 }
167
168 @Override
169 public void readFields(DataInput dataInput) throws IOException {
170 useMessageSizeEncoding = dataInput.readBoolean();
171 super.readFields(dataInput);
172 }
173 }