1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.utils.io;
2021import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22import org.apache.giraph.utils.ExtendedDataOutput;
23import org.apache.hadoop.io.Writable;
2425import com.google.common.collect.Iterables;
26import com.google.common.collect.Lists;
2728import java.io.DataInput;
29import java.io.DataOutput;
30import java.io.IOException;
31import java.util.ArrayList;
32import java.util.List;
3334/**35 * Implementations of {@link ExtendedDataOutput} are limited because they can36 * only handle up to 1GB of data. This {@link DataOutput} overcomes that37 * limitation, with almost no additional cost when data is not huge.38 *39 * Goes in pair with {@link BigDataInput}40 */41publicclassBigDataOutputimplements DataOutput, Writable {
42/** Default initial size of the stream */43privatestaticfinalint DEFAULT_INITIAL_SIZE = 16;
44/** Max allowed size of the stream */45privatestaticfinalint MAX_SIZE = 1 << 25;
46/**47 * Create a new stream when we have less then this number of bytes left in48 * the stream. Should be larger than the largest serialized primitive.49 */50privatestaticfinalint SIZE_DELTA = 100;
5152/** Data output which we are currently writing to */53protectedExtendedDataOutput currentDataOutput;
54/** List of filled outputs, will be null until we get a lot of data */55protected List<ExtendedDataOutput> dataOutputs;
56/** Configuration */57protectedfinalImmutableClassesGiraphConfiguration conf;
5859/**60 * Constructor61 *62 * @param conf Configuration63 */64publicBigDataOutput(ImmutableClassesGiraphConfiguration conf) {
65this(DEFAULT_INITIAL_SIZE, conf);
66 }
6768/**69 * Constructor70 *71 * @param initialSize Initial size of data output72 * @param conf Configuration73 */74publicBigDataOutput(int initialSize,
75ImmutableClassesGiraphConfiguration conf) {
76this.conf = conf;
77 dataOutputs = null;
78 currentDataOutput = createOutput(initialSize);
79 }
8081/**82 * Get max size for single data output83 *84 * @return Max size for single data output85 */86protectedint getMaxSize() {
87return MAX_SIZE;
88 }
8990/**91 * Create next data output92 *93 * @param size Size of data output to create94 * @return Created data output95 */96protectedExtendedDataOutput createOutput(int size) {
97return conf.createExtendedDataOutput(size);
98 }
99100/**101 * Get DataOutput which data should be written to. If current DataOutput is102 * full it will create a new one.103 *104 * @return DataOutput which data should be written to105 */106privateExtendedDataOutput getDataOutputToWriteTo() {
107return getDataOutputToWriteTo(SIZE_DELTA);
108 }
109110/**111 * Get DataOutput which data should be written to. If current DataOutput is112 * full it will create a new one.113 *114 * @param additionalSize How many additional bytes we need space for115 * @return DataOutput which data should be written to116 */117privateExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
118if (currentDataOutput.getPos() + additionalSize > getMaxSize()) {
119if (dataOutputs == null) {
120 dataOutputs = new ArrayList<>(1);
121 }
122 dataOutputs.add(currentDataOutput);
123 currentDataOutput = createOutput(getMaxSize());
124 }
125return currentDataOutput;
126 }
127128/**129 * Get number of DataOutputs which contain written data.130 *131 * @return Number of DataOutputs which contain written data132 */133publicint getNumberOfDataOutputs() {
134return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
135 }
136137/**138 * Get DataOutputs which contain written data.139 *140 * @return DataOutputs which contain written data141 */142public Iterable<ExtendedDataOutput> getDataOutputs() {
143 ArrayList<ExtendedDataOutput> currentList =
144 Lists.newArrayList(currentDataOutput);
145if (dataOutputs == null) {
146return currentList;
147 } else {
148return Iterables.concat(dataOutputs, currentList);
149 }
150 }
151152publicImmutableClassesGiraphConfiguration getConf() {
153return conf;
154 }
155156/**157 * Get number of bytes written to this data output158 *159 * @return Size in bytes160 */161publiclong getSize() {
162long size = currentDataOutput.getPos();
163if (dataOutputs != null) {
164for (ExtendedDataOutput dataOutput : dataOutputs) {
165 size += dataOutput.getPos();
166 }
167 }
168return size;
169 }
170171 @Override
172publicvoid write(int b) throws IOException {
173 getDataOutputToWriteTo().write(b);
174 }
175176 @Override
177publicvoid write(byte[] b) throws IOException {
178 write(b, 0, b.length);
179 }
180181 @Override
182publicvoid write(byte[] b, int off, int len) throws IOException {
183if (len <= getMaxSize()) {
184 getDataOutputToWriteTo(len).write(b, off, len);
185 } else {
186// When we try to write more bytes than the biggest size of single data187// output, we need to split up the byte array into multiple chunks188while (len > 0) {
189int toWrite = Math.min(getMaxSize(), len);
190 write(b, off, toWrite);
191 len -= toWrite;
192 off += toWrite;
193 }
194 }
195 }
196197 @Override
198publicvoid writeBoolean(boolean v) throws IOException {
199 getDataOutputToWriteTo().writeBoolean(v);
200 }
201202 @Override
203publicvoid writeByte(int v) throws IOException {
204 getDataOutputToWriteTo().writeByte(v);
205 }
206207 @Override
208publicvoid writeShort(int v) throws IOException {
209 getDataOutputToWriteTo().writeShort(v);
210 }
211212 @Override
213publicvoid writeChar(int v) throws IOException {
214 getDataOutputToWriteTo().writeChar(v);
215 }
216217 @Override
218publicvoid writeInt(int v) throws IOException {
219 getDataOutputToWriteTo().writeInt(v);
220 }
221222 @Override
223publicvoid writeLong(long v) throws IOException {
224 getDataOutputToWriteTo().writeLong(v);
225 }
226227 @Override
228publicvoid writeFloat(float v) throws IOException {
229 getDataOutputToWriteTo().writeFloat(v);
230 }
231232 @Override
233publicvoid writeDouble(double v) throws IOException {
234 getDataOutputToWriteTo().writeDouble(v);
235 }
236237 @Override
238publicvoid writeBytes(String s) throws IOException {
239 getDataOutputToWriteTo().writeBytes(s);
240 }
241242 @Override
243publicvoid writeChars(String s) throws IOException {
244 getDataOutputToWriteTo().writeChars(s);
245 }
246247 @Override
248publicvoid writeUTF(String s) throws IOException {
249 getDataOutputToWriteTo().writeUTF(s);
250 }
251252/**253 * Write one of data outputs to another data output254 *255 * @param dataOutput Data output to write256 * @param out Data output to write to257 */258privatevoid writeExtendedDataOutput(ExtendedDataOutput dataOutput,
259 DataOutput out) throws IOException {
260 out.writeInt(dataOutput.getPos());
261 out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
262 }
263264/**265 * Read data output from data input266 *267 * @param in Data input to read from268 * @return Data output read269 */270privateExtendedDataOutput readExtendedDataOutput(
271 DataInput in) throws IOException {
272int length = in.readInt();
273 byte[] data = new byte[length];
274 in.readFully(data);
275return conf.createExtendedDataOutput(data, data.length);
276 }
277278 @Override
279publicvoid write(DataOutput out) throws IOException {
280if (dataOutputs == null) {
281 out.writeInt(0);
282 } else {
283 out.writeInt(dataOutputs.size());
284for (ExtendedDataOutput stream : dataOutputs) {
285 writeExtendedDataOutput(stream, out);
286 }
287 }
288 writeExtendedDataOutput(currentDataOutput, out);
289 }
290291 @Override
292publicvoid readFields(DataInput in) throws IOException {
293int size = in.readInt();
294if (size == 0) {
295 dataOutputs = null;
296 } else {
297 dataOutputs = new ArrayList<ExtendedDataOutput>(size);
298while (size-- > 0) {
299 dataOutputs.add(readExtendedDataOutput(in));
300 }
301 }
302 currentDataOutput = readExtendedDataOutput(in);
303 }
304 }