1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers.collect;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
23import java.util.ArrayList;
24import java.util.List;
2526import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
27import org.apache.giraph.types.ops.PrimitiveTypeOps;
28import org.apache.giraph.types.ops.TypeOpsUtils;
29import org.apache.giraph.types.ops.collections.ResettableIterator;
30import org.apache.giraph.types.ops.collections.array.WArrayList;
31import org.apache.giraph.utils.WritableUtils;
3233/**34 * Collect tuples of primitive values reduce operation35 */36publicclassCollectTuplesOfPrimitivesReduceOperation37extends KryoWrappedReduceOperation<List<Object>, List<WArrayList>> {
38/**39 * Type ops if available, or null40 */41private List<PrimitiveTypeOps> typeOpsList;
4243/** For reflection only */44publicCollectTuplesOfPrimitivesReduceOperation() {
45 }
4647publicCollectTuplesOfPrimitivesReduceOperation(
48 List<PrimitiveTypeOps> typeOpsList) {
49this.typeOpsList = typeOpsList;
50 }
5152 @Override
53public List<WArrayList> createValue() {
54 List<WArrayList> ret = new ArrayList<>(typeOpsList.size());
55for (PrimitiveTypeOps typeOps : typeOpsList) {
56 ret.add(typeOps.createArrayList());
57 }
58return ret;
59 }
6061 @Override
62publicvoid reduce(List<WArrayList> reduceInto, List<Object> value) {
63for (int i = 0; i < reduceInto.size(); i++) {
64 reduceInto.get(i).addW(value.get(i));
65 }
66 }
6768 @Override
69publicvoid reduceMerge(final List<WArrayList> reduceInto,
70 List<WArrayList> toReduce) {
71for (int i = 0; i < reduceInto.size(); i++) {
72ResettableIterator iterator = toReduce.get(i).fastIteratorW();
73while (iterator.hasNext()) {
74 reduceInto.get(i).addW(iterator.next());
75 }
76 }
77 }
7879 @Override
80publicvoid write(DataOutput out) throws IOException {
81 out.writeInt(typeOpsList.size());
82for (PrimitiveTypeOps typeOps : typeOpsList) {
83 WritableUtils.writeClass(typeOps.getTypeClass(), out);
84 }
85 }
8687 @Override
88publicvoid readFields(DataInput in) throws IOException {
89int size = in.readInt();
90 typeOpsList = new ArrayList<>(size);
91for (int i = 0; i < size; i++) {
92 typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(
93 WritableUtils.readClass(in)));
94 }
95 }
96 }