1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.utils;
2021import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
2223import java.util.concurrent.atomic.AtomicInteger;
2425import org.apache.giraph.conf.FloatConfOption;
26import org.apache.giraph.conf.GiraphConstants;
27import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28import org.apache.giraph.conf.IntConfOption;
2930/**31 * Wraps a list of byte array outputs and provides convenient32 * utilities on top of it33 */34publicclassExtendedByteArrayOutputBuffer {
35/**36 * This option sets the capacity of an37 * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in38 * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}39 */40publicstaticfinalIntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
41newIntConfOption("giraph.capacityOfDataOutInBuffer",
42 1024 * GiraphConstants.ONE_KB,
43"Set the capacity of dataoutputs in dataout buffer");
4445/**46 * This option sets the maximum fraction of a47 * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in48 * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer})49 * that can be filled50 */51publicstaticfinalFloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
52newFloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
53"Set the maximum fraction of dataoutput capacity allowed to fill");
5455/** Maximum size allowed for one byte array output */56privatefinalint maxBufSize;
57/** Stop writing to buffer after threshold has been reached */58privatefinalint threshold;
59/** Giraph configuration */60privatefinal ImmutableClassesGiraphConfiguration<?, ? , ?> config;
6162/** Map of index => byte array outputs */63privatefinal Int2ObjectOpenHashMap<ExtendedDataOutput>
64 bytearrayOutputs = new Int2ObjectOpenHashMap<>();
65/** Size of byte array outputs map */66privatefinal AtomicInteger mapSize = new AtomicInteger(0);
67/** Thread local variable to get hold of a byte array output stream */68privatefinal ThreadLocal<IndexAndDataOut> threadLocal =
69new ThreadLocal<IndexAndDataOut>() {
70 @Override
71protectedIndexAndDataOut initialValue() {
72return newIndexAndDataOutput();
73 }
74 };
7576/**77 * Constructor78 *79 * @param config configuration80 */81publicExtendedByteArrayOutputBuffer(
82 ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
83this.config = config;
8485 maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
86 threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
87 maxBufSize);
88 }
8990/**91 * Return threadLocal indexAndDataOutput instance92 *93 * @return threadLocal indexAndDataOutput instance94 */95publicIndexAndDataOut getIndexAndDataOut() {
96IndexAndDataOut indexAndDataOut = threadLocal.get();
97if (indexAndDataOut.dataOutput.getPos() >= threshold) {
98 indexAndDataOut = newIndexAndDataOutput();
99 threadLocal.set(indexAndDataOut);
100 }
101return indexAndDataOut;
102 }
103104/**105 * Get dataoutput from bytearrayOutputs106 *107 * @param index index in bytearrayOutputs108 * @return extendeddataoutput at given index109 */110publicExtendedDataOutput getDataOutput(int index) {
111return bytearrayOutputs.get(index);
112 }
113114/**115 * Holder for index & DataOutput objects116 */117publicstaticclassIndexAndDataOut {
118/** Index */119privatefinalint index;
120/** Dataouput instance */121privatefinalExtendedDataOutput dataOutput;
122123/**124 * Constructor125 *126 * @param index index in bytearrayOutputs127 * @param dataOutput dataoutput128 */129publicIndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
130this.index = index;
131this.dataOutput = dataOutput;
132 }
133134publicint getIndex() {
135return index;
136 }
137138publicExtendedDataOutput getDataOutput() {
139return dataOutput;
140 }
141 }
142143/**144 * Create a new IndexAndDataOutput instance145 * @return new IndexAndDataOutput instance146 */147privateIndexAndDataOut newIndexAndDataOutput() {
148int index = mapSize.getAndIncrement();
149ExtendedDataOutput output = config.createExtendedDataOutput(
150 maxBufSize);
151synchronized (bytearrayOutputs) {
152 bytearrayOutputs.put(index, output);
153 }
154returnnewIndexAndDataOut(index, output);
155 }
156 }