1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.persistence;
2021import org.apache.giraph.conf.GiraphConstants;
22import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23import org.apache.giraph.conf.IntConfOption;
24import org.apache.giraph.utils.ExtendedDataOutput;
25import org.apache.giraph.utils.io.BigDataInput;
26import org.apache.giraph.utils.io.BigDataOutput;
2728import java.io.DataInput;
29import java.io.DataOutput;
30import java.io.IOException;
31import java.util.concurrent.ConcurrentHashMap;
32import java.util.concurrent.LinkedBlockingDeque;
3334/**35 * Implementation of data accessor which keeps all the data serialized but in36 * memory. Useful to keep the number of used objects under control.37 */38publicclassInMemoryDataAccessorimplementsOutOfCoreDataAccessor {
39/** Configuration */40privatefinal ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
41/**Factory for data outputs */42privatefinalPooledBigDataOutputFactory outputFactory;
43/**DataInputOutput for each DataIndex used */44privatefinal ConcurrentHashMap<
45 DataIndex, PooledBigDataOutputFactory.Output> data;
4647/**48 * Constructor49 *50 * @param conf Configuration51 */52publicInMemoryDataAccessor(
53 ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
54this.conf = conf;
55 outputFactory = newPooledBigDataOutputFactory(conf);
56 data = new ConcurrentHashMap<>();
57 }
5859 @Override
60publicvoid initialize() {
61// No-op62 }
6364 @Override
65publicvoid shutdown() {
66// No-op67 }
6869 @Override
70publicint getNumAccessorThreads() {
71return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
72 }
7374 @Override
75publicDataInputWrapper prepareInput(int threadId,
76DataIndex index) throws IOException {
77returnnewInMemoryDataInputWrapper(
78newBigDataInput(data.get(index)), index);
79 }
8081 @Override
82publicDataOutputWrapper prepareOutput(int threadId,
83DataIndex index, boolean shouldAppend) throws IOException {
84// Don't need to worry about synchronization here since only one thread85// can deal with one index86 PooledBigDataOutputFactory.Output output = data.get(index);
87if (output == null || !shouldAppend) {
88 output = outputFactory.createOutput();
89 data.put(index, output);
90 }
91returnnewInMemoryDataOutputWrapper(output);
92 }
9394 @Override
95publicboolean dataExist(int threadId, DataIndex index) {
96return data.containsKey(index);
97 }
9899/**100 * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}101 */102publicstaticclassInMemoryDataOutputWrapperimplementsDataOutputWrapper {
103/**Output to write data to */104privatefinalBigDataOutput output;
105/** Size of output at the moment it was created */106privatefinallong initialSize;
107108/**109 * Constructor110 *111 * @param output Output to write data to112 */113publicInMemoryDataOutputWrapper(BigDataOutput output) {
114this.output = output;
115 initialSize = output.getSize();
116 }
117118 @Override
119public DataOutput getDataOutput() {
120return output;
121 }
122123 @Override
124publiclong finalizeOutput() {
125return output.getSize() - initialSize;
126 }
127 }
128129/**130 * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}131 */132publicclassInMemoryDataInputWrapperimplementsDataInputWrapper {
133/** Input to read data from */134privatefinalBigDataInput input;
135/**DataIndex which this wrapper belongs to */136privatefinalDataIndex index;
137138/**139 * Constructor140 *141 * @param input Input to read data from142 * @param index DataIndex which this wrapper belongs to143 */144publicInMemoryDataInputWrapper(
145BigDataInput input, DataIndex index) {
146this.input = input;
147this.index = index;
148 }
149150 @Override
151public DataInput getDataInput() {
152return input;
153 }
154155 @Override
156publiclong finalizeInput(boolean deleteOnClose) {
157if (deleteOnClose) {
158 data.remove(index).returnData();
159 }
160return input.getPos();
161 }
162 }
163164/**165 * Factory for pooled big data outputs166 */167privatestaticclassPooledBigDataOutputFactory {
168/** How big pool of byte arrays to keep */169publicstaticfinalIntConfOption BYTE_ARRAY_POOL_SIZE =
170newIntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
171"How big pool of byte arrays to keep");
172/** How big byte arrays to make */173publicstaticfinalIntConfOption BYTE_ARRAY_SIZE =
174newIntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
175"How big byte arrays to make");
176177/** Configuration */178privatefinalImmutableClassesGiraphConfiguration conf;
179/** Pool of reusable byte[] */180privatefinal LinkedBlockingDeque<byte[]> byteArrayPool;
181/** How big byte arrays to make */182privatefinalint byteArraySize;
183184/**185 * Constructor186 *187 * @param conf Configuration188 */189publicPooledBigDataOutputFactory(
190ImmutableClassesGiraphConfiguration conf) {
191this.conf = conf;
192 byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
193 byteArraySize = BYTE_ARRAY_SIZE.get(conf);
194 }
195196/**197 * Create new output to write to198 *199 * @return Output to write to200 */201publicOutput createOutput() {
202returnnewOutput(conf);
203 }
204205/**206 * Implementation of BigDataOutput207 */208privateclassOutputextendsBigDataOutput {
209/**210 * Constructor211 *212 * @param conf Configuration213 */214publicOutput(ImmutableClassesGiraphConfiguration conf) {
215super(conf);
216 }
217218/**219 * Return all data structures related to this data output.220 * Can't use the same instance after this call anymore.221 */222protectedvoid returnData() {
223if (dataOutputs != null) {
224for (ExtendedDataOutput dataOutput : dataOutputs) {
225 byteArrayPool.offer(dataOutput.getByteArray());
226 }
227 }
228 byteArrayPool.offer(currentDataOutput.getByteArray());
229 }
230231 @Override
232protectedExtendedDataOutput createOutput(int size) {
233 byte[] data = byteArrayPool.pollLast();
234return conf.createExtendedDataOutput(
235 data == null ? new byte[byteArraySize] : data, 0);
236 }
237238 @Override
239protectedint getMaxSize() {
240return byteArraySize;
241 }
242 }
243 }
244 }