1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.aggregators;
20
21 import java.io.IOException;
22 import org.apache.giraph.utils.ExtendedDataOutput;
23 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
24
25 /**
26 * Wrapper for output stream which keeps the place in the beginning for the
27 * count of objects which were written to it
28 */
29 public abstract class CountingOutputStream {
30 /** DataOutput to which subclasses will be writing data */
31 protected ExtendedDataOutput dataOutput;
32 /** Counter for objects which were written to the stream */
33 private int counter;
34
35 /**
36 * Default constructor
37 */
38 public CountingOutputStream() {
39 dataOutput = new UnsafeByteArrayOutputStream();
40 reset();
41 }
42
43 /**
44 * Subclasses should call this method when an object is written
45 */
46 protected void incrementCounter() {
47 counter++;
48 }
49
50 /**
51 * Get the number of bytes in the stream
52 *
53 * @return Number of bytes
54 */
55 protected int getSize() {
56 return dataOutput.getPos();
57 }
58
59 /**
60 * Returns all the data from the stream and clears it.
61 *
62 * @return Number of objects followed by the data written to the stream
63 */
64 public byte[] flush() {
65 dataOutput.writeInt(0, counter);
66 // Actual flush not required, this is a byte array
67 byte[] ret = dataOutput.toByteArray();
68 reset();
69 return ret;
70 }
71
72 /**
73 * Reset the stream
74 */
75 private void reset() {
76 dataOutput.reset();
77 // reserve the place for count to be written in the end
78 try {
79 dataOutput.writeInt(0);
80 } catch (IOException e) {
81 throw new IllegalStateException("reset: Got IOException", e);
82 }
83 counter = 0;
84 }
85 }