1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.comm.aggregators; 20 21 import java.io.IOException; 22 23 import org.apache.giraph.comm.GlobalCommType; 24 import org.apache.giraph.utils.WritableUtils; 25 import org.apache.hadoop.io.Writable; 26 27 /** 28 * Implementation of {@link CountingOutputStream} which allows writing of 29 * reduced values in the form of pair (name, type, value) 30 * 31 * There are two modes: 32 * - when class of the value is written into the stream. 33 * - when it isn't, and reader needs to know Class of the value in order 34 * to read it. 35 */ 36 public class GlobalCommValueOutputStream extends CountingOutputStream { 37 /** whether to write Class object for values into the stream */ 38 private final boolean writeClass; 39 40 /** 41 * Constructor 42 * 43 * @param writeClass boolean whether to write Class object for values 44 */ 45 public GlobalCommValueOutputStream(boolean writeClass) { 46 this.writeClass = writeClass; 47 } 48 49 /** 50 * Write global communication object to the stream 51 * and increment internal counter 52 * 53 * @param name Name 54 * @param type Global communication type 55 * @param value Object value 56 * @return Number of bytes occupied by the stream 57 * @throws IOException 58 */ 59 public int addValue(String name, GlobalCommType type, 60 Writable value) throws IOException { 61 incrementCounter(); 62 dataOutput.writeUTF(name); 63 dataOutput.writeByte(type.ordinal()); 64 if (writeClass) { 65 WritableUtils.writeWritableObject(value, dataOutput); 66 } else { 67 value.write(dataOutput); 68 } 69 return getSize(); 70 } 71 }