1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers.collect;
1920import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21import org.apache.giraph.block_app.framework.api.CreateReducersApi;
22import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
23import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
24import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
25import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
26import org.apache.giraph.function.Supplier;
27import org.apache.giraph.master.MasterGlobalCommUsage;
28import org.apache.giraph.reducers.ReduceOperation;
29import org.apache.giraph.worker.WorkerBroadcastUsage;
30import org.apache.giraph.writable.kryo.KryoWritableWrapper;
31import org.apache.giraph.writable.kryo.TransientRandom;
3233/**34 * Reducing values into a list of reducers, randomly,35 * and getting the results of all reducers together36 *37 * @param <S> Single value type38 * @param <R> Reduced value type39 */40publicabstractclass ShardedReducerHandle<S, R>
41implements ReducerHandle<S, R> {
42// Use a prime number for number of reducers, large enough to make sure43// request sizes are within expected size (0.5MB)44protectedstaticfinalint REDUCER_COUNT = 39989;
4546protectedfinalTransientRandom random = newTransientRandom();
4748protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers;
4950publicfinalvoid register(finalCreateReducersApi reduceApi) {
51 reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT,
52new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() {
53 @Override
54public ReducerHandle<S, KryoWritableWrapper<R>> get() {
55return reduceApi.createLocalReducer(createReduceOperation());
56 }
57 });
58 }
5960 @Override
61publicfinalvoid reduce(S value) {
62 reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value);
63 }
6465 @Override
66publicfinal R getReducedValue(MasterGlobalCommUsage master) {
67 KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
68 createReduceResult(master));
69 ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
70 createReduceOperation();
71for (int i = 0; i < REDUCER_COUNT; i++) {
72 reduceOperation.reduceMerge(ret,
73 reducers.get(i).getReducedValue(master));
74 }
75return ret.get();
76 }
7778publicabstract ReduceOperation<S, KryoWritableWrapper<R>>
79 createReduceOperation();
8081public R createReduceResult(MasterGlobalCommUsage master) {
82return createReduceOperation().createInitialValue().get();
83 }
8485public BroadcastHandle<R> createBroadcastHandle(
86 BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
87returnnewShardedBroadcastHandle(broadcasts);
88 }
8990 @Override
91publicfinal BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) {
92return createBroadcastHandle(reducers.broadcastValue(masterApi));
93 }
9495/**96 * Broadcast for ShardedReducerHandle97 */98publicclassShardedBroadcastHandleimplements BroadcastHandle<R> {
99protectedfinal BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts;
100101publicShardedBroadcastHandle(
102 BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
103this.broadcasts = broadcasts;
104 }
105106public R createBroadcastResult(WorkerBroadcastUsage worker) {
107return createReduceOperation().createInitialValue().get();
108 }
109110 @Override
111publicfinal R getBroadcast(WorkerBroadcastUsage worker) {
112 KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
113 createBroadcastResult(worker));
114 ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
115 createReduceOperation();
116for (int i = 0; i < REDUCER_COUNT; i++) {
117 reduceOperation.reduceMerge(ret,
118 broadcasts.get(i).getBroadcast(worker));
119 }
120return ret.get();
121 }
122 }
123 }