1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers.collect;
1920import org.apache.giraph.block_app.framework.api.CreateReducersApi;
21import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
22import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
23import org.apache.giraph.master.MasterGlobalCommUsage;
24import org.apache.giraph.reducers.ReduceOperation;
25import org.apache.giraph.types.ops.PrimitiveTypeOps;
26import org.apache.giraph.types.ops.TypeOpsUtils;
27import org.apache.giraph.types.ops.collections.array.WArrayList;
28import org.apache.giraph.worker.WorkerBroadcastUsage;
29import org.apache.giraph.writable.kryo.KryoWritableWrapper;
3031/**32 * ShardedReducerHandle where we keep a list of reduced values,33 * when primitives are used34 *35 * @param <S> Single value type36 */37publicclass CollectShardedPrimitiveReducerHandle<S>
38extends ShardedReducerHandle<S, WArrayList<S>> {
39/**40 * Type ops if available, or null41 */42privatefinal PrimitiveTypeOps<S> typeOps;
4344publicCollectShardedPrimitiveReducerHandle(finalCreateReducersApi reduceApi,
45 Class<S> valueClass) {
46 typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass);
47 register(reduceApi);
48 }
4950 @Override
51public ReduceOperation<S, KryoWritableWrapper<WArrayList<S>>>
52 createReduceOperation() {
53returnnew CollectPrimitiveReduceOperation<>(typeOps);
54 }
5556 @Override
57public WArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
58int size = 0;
59for (int i = 0; i < REDUCER_COUNT; i++) {
60 size += reducers.get(i).getReducedValue(master).get().size();
61 }
62return createList(size);
63 }
6465public WArrayList<S> createList(int size) {
66return typeOps.createArrayList(size);
67 }
6869 @Override
70public BroadcastHandle<WArrayList<S>> createBroadcastHandle(
71 BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>> broadcasts) {
72returnnewCollectShardedPrimitiveBroadcastHandle(broadcasts);
73 }
7475/**76 * Broadcast handle for CollectShardedPrimitiveReducerHandle77 */78publicclassCollectShardedPrimitiveBroadcastHandle79extendsShardedBroadcastHandle {
80publicCollectShardedPrimitiveBroadcastHandle(
81 BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>>
82 broadcasts) {
83super(broadcasts);
84 }
8586 @Override
87public WArrayList<S> createBroadcastResult(
88WorkerBroadcastUsage worker) {
89int size = 0;
90for (int i = 0; i < REDUCER_COUNT; i++) {
91 size += broadcasts.get(i).getBroadcast(worker).get().size();
92 }
93return createList(size);
94 }
95 }
96 }