This project has retired. For details please refer to its
Attic page.
BigDataInput xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils.io;
20
21 import org.apache.giraph.utils.ExtendedDataInput;
22 import org.apache.giraph.utils.ExtendedDataOutput;
23 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28
29
30
31
32
33
34
35
36 public class BigDataInput implements ExtendedDataInput {
37
38 private static final ExtendedDataInput EMPTY_INPUT =
39 new UnsafeByteArrayInputStream(new byte[0]);
40
41
42 private ExtendedDataInput currentInput;
43
44 private final List<ExtendedDataInput> dataInputs;
45
46 private int currentPositionInInputs;
47
48
49
50
51
52
53 public BigDataInput(BigDataOutput bigDataOutput) {
54 dataInputs = new ArrayList<ExtendedDataInput>(
55 bigDataOutput.getNumberOfDataOutputs());
56 for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
57 dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
58 dataOutput.getByteArray(), 0, dataOutput.getPos()));
59 }
60 currentPositionInInputs = -1;
61 moveToNextDataInput();
62 }
63
64
65 private void moveToNextDataInput() {
66 currentPositionInInputs++;
67 if (currentPositionInInputs < dataInputs.size()) {
68 currentInput = dataInputs.get(currentPositionInInputs);
69 } else {
70 currentInput = EMPTY_INPUT;
71 }
72 }
73
74
75
76
77
78 private void checkIfShouldMoveToNextDataInput() {
79 if (currentInput.endOfInput()) {
80 moveToNextDataInput();
81 }
82 }
83
84 @Override
85 public void readFully(byte[] b) throws IOException {
86 readFully(b, 0, b.length);
87 }
88
89 @Override
90 public void readFully(byte[] b, int off, int len) throws IOException {
91 checkIfShouldMoveToNextDataInput();
92 int available = currentInput.available();
93 if (len <= available) {
94 currentInput.readFully(b, off, len);
95 } else {
96
97
98 currentInput.readFully(b, off, available);
99 readFully(b, off + available, len - available);
100 }
101 }
102
103 @Override
104 public boolean readBoolean() throws IOException {
105 checkIfShouldMoveToNextDataInput();
106 return currentInput.readBoolean();
107 }
108
109 @Override
110 public byte readByte() throws IOException {
111 checkIfShouldMoveToNextDataInput();
112 return currentInput.readByte();
113 }
114
115 @Override
116 public int readUnsignedByte() throws IOException {
117 checkIfShouldMoveToNextDataInput();
118 return currentInput.readUnsignedByte();
119 }
120
121 @Override
122 public short readShort() throws IOException {
123 checkIfShouldMoveToNextDataInput();
124 return currentInput.readShort();
125 }
126
127 @Override
128 public int readUnsignedShort() throws IOException {
129 checkIfShouldMoveToNextDataInput();
130 return currentInput.readUnsignedShort();
131 }
132
133 @Override
134 public char readChar() throws IOException {
135 checkIfShouldMoveToNextDataInput();
136 return currentInput.readChar();
137 }
138
139 @Override
140 public int readInt() throws IOException {
141 checkIfShouldMoveToNextDataInput();
142 return currentInput.readInt();
143 }
144
145 @Override
146 public long readLong() throws IOException {
147 checkIfShouldMoveToNextDataInput();
148 return currentInput.readLong();
149 }
150
151 @Override
152 public float readFloat() throws IOException {
153 checkIfShouldMoveToNextDataInput();
154 return currentInput.readFloat();
155 }
156
157 @Override
158 public double readDouble() throws IOException {
159 checkIfShouldMoveToNextDataInput();
160 return currentInput.readDouble();
161 }
162
163 @Override
164 public String readLine() throws IOException {
165 checkIfShouldMoveToNextDataInput();
166 return currentInput.readLine();
167 }
168
169 @Override
170 public String readUTF() throws IOException {
171 checkIfShouldMoveToNextDataInput();
172 return currentInput.readUTF();
173 }
174
175 @Override
176 public int skipBytes(int n) throws IOException {
177 int bytesLeftToSkip = n;
178 while (bytesLeftToSkip > 0) {
179 int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
180 bytesLeftToSkip -= bytesSkipped;
181 if (bytesLeftToSkip > 0) {
182 moveToNextDataInput();
183 if (endOfInput()) {
184 break;
185 }
186 }
187 }
188 return n - bytesLeftToSkip;
189 }
190
191 @Override
192 public int getPos() {
193 int pos = 0;
194 for (int i = 0; i <= currentPositionInInputs; i++) {
195 pos += dataInputs.get(i).getPos();
196 }
197 return pos;
198 }
199
200 @Override
201 public int available() {
202 throw new UnsupportedOperationException("available: " +
203 "Not supported with BigDataIO because overflow can happen");
204 }
205
206 @Override
207 public boolean endOfInput() {
208 return currentInput == EMPTY_INPUT ||
209 (dataInputs.get(currentPositionInInputs).endOfInput() &&
210 currentPositionInInputs == dataInputs.size() - 1);
211 }
212 }