1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers.collect;
1920import java.util.ArrayList;
21import java.util.List;
2223import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
25import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
26import org.apache.giraph.master.MasterGlobalCommUsage;
27import org.apache.giraph.reducers.ReduceOperation;
28import org.apache.giraph.worker.WorkerBroadcastUsage;
29import org.apache.giraph.writable.kryo.KryoWritableWrapper;
3031/**32 * ShardedReducerHandle where we keep a list of reduced values33 *34 * @param <S> Single value type35 */36publicclass CollectShardedReducerHandle<S>
37extends ShardedReducerHandle<S, List<S>> {
38publicCollectShardedReducerHandle(CreateReducersApi reduceApi) {
39 register(reduceApi);
40 }
4142 @Override
43public ReduceOperation<S, KryoWritableWrapper<List<S>>>
44 createReduceOperation() {
45returnnew CollectReduceOperation<>();
46 }
4748 @Override
49public List<S> createReduceResult(MasterGlobalCommUsage master) {
50int size = 0;
51for (int i = 0; i < REDUCER_COUNT; i++) {
52 size += reducers.get(i).getReducedValue(master).get().size();
53 }
54return createList(size);
55 }
5657public List<S> createList(int size) {
58returnnew ArrayList<S>(size);
59 }
6061 @Override
62public BroadcastHandle<List<S>> createBroadcastHandle(
63 BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
64returnnewCollectShardedBroadcastHandle(broadcasts);
65 }
6667/**68 * BroadcastHandle for CollectShardedReducerHandle69 */70publicclassCollectShardedBroadcastHandleextendsShardedBroadcastHandle {
71publicCollectShardedBroadcastHandle(
72 BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
73super(broadcasts);
74 }
7576 @Override
77public List<S> createBroadcastResult(WorkerBroadcastUsage worker) {
78int size = 0;
79for (int i = 0; i < REDUCER_COUNT; i++) {
80 size += broadcasts.get(i).getBroadcast(worker).get().size();
81 }
82return createList(size);
83 }
84 }
85 }