1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers.collect;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
2324import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
25import org.apache.giraph.types.ops.PrimitiveTypeOps;
26import org.apache.giraph.types.ops.TypeOpsUtils;
27import org.apache.giraph.types.ops.collections.ResettableIterator;
28import org.apache.giraph.types.ops.collections.array.WArrayList;
29import org.apache.giraph.utils.WritableUtils;
3031/**32 * Collect primitive values reduce operation33 *34 * @param <S> Primitive Writable type, which has its type ops35 */36publicclass CollectPrimitiveReduceOperation<S>
37extends KryoWrappedReduceOperation<S, WArrayList<S>> {
38/**39 * Type ops if available, or null40 */41private PrimitiveTypeOps<S> typeOps;
4243/** For reflection only */44publicCollectPrimitiveReduceOperation() {
45 }
4647publicCollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
48this.typeOps = typeOps;
49 }
5051 @Override
52public WArrayList<S> createValue() {
53return createList();
54 }
5556 @Override
57publicvoid reduce(WArrayList<S> reduceInto, S value) {
58 reduceInto.addW(value);
59 }
6061 @Override
62publicvoid reduceMerge(final WArrayList<S> reduceInto,
63 WArrayList<S> toReduce) {
64 ResettableIterator<S> iterator = toReduce.fastIteratorW();
65while (iterator.hasNext()) {
66 reduceInto.addW(iterator.next());
67 }
68 }
6970public WArrayList<S> createList() {
71return typeOps.createArrayList();
72 }
7374 @Override
75publicvoid write(DataOutput out) throws IOException {
76 WritableUtils.writeClass(typeOps.getTypeClass(), out);
77 }
7879 @Override
80publicvoid readFields(DataInput in) throws IOException {
81 typeOps = TypeOpsUtils.getPrimitiveTypeOps(
82 WritableUtils.<S>readClass(in));
83 }
84 }