1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.aggregators;
2021import java.util.Collections;
22import java.util.List;
23import java.util.Map;
24import java.util.Map.Entry;
25import java.util.Set;
26import java.util.concurrent.ConcurrentMap;
2728import org.apache.giraph.comm.GlobalCommType;
29import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30import org.apache.giraph.master.MasterInfo;
31import org.apache.giraph.reducers.ReduceOperation;
32import org.apache.giraph.reducers.Reducer;
33import org.apache.giraph.utils.TaskIdsPermitsBarrier;
34import org.apache.hadoop.io.Writable;
35import org.apache.hadoop.util.Progressable;
36import org.apache.log4j.Logger;
3738import com.google.common.base.Preconditions;
39import com.google.common.collect.Lists;
40import com.google.common.collect.Maps;
4142/**43 * Accepts aggregators and their values from previous superstep from master44 * and workers which own aggregators. Keeps data received from master so it45 * could be distributed later. Also counts the requests so we would know46 * when we are done receiving requests.47 *48 * Only restriction is that we need to call registerAggregatorClass before49 * calling createAggregatorInitialValue, other than that methods of this class50 * are thread-safe.51 */52publicclassAllAggregatorServerData {
53/** Class logger */54privatestaticfinal Logger LOG =
55 Logger.getLogger(AllAggregatorServerData.class);
56/** Map of broadcasted values from master */57privatefinal ConcurrentMap<String, Writable>
58 broadcastedMap = Maps.newConcurrentMap();
59/** Map of registered reducers for current superstep */60privatefinal ConcurrentMap<String, ReduceOperation<Object, Writable>>
61 reduceOpMap = Maps.newConcurrentMap();
62/**63 * Counts the requests with final aggregators from master.64 * It uses values from special aggregators65 * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)66 * to know how many requests it has to receive.67 */68privatefinalTaskIdsPermitsBarrier masterBarrier;
69/**70 * Aggregator data which this worker received from master and which it is71 * going to distribute before starting next superstep. Thread-safe.72 */73privatefinal List<byte[]> masterData =
74 Collections.synchronizedList(Lists.<byte[]>newArrayList());
75/**76 * Counts the requests with final aggregators from other workers.77 * It uses values from special aggregators78 * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)79 * to know how many requests it has to receive.80 */81privatefinalTaskIdsPermitsBarrier workersBarrier;
82/** Progressable used to report progress */83privatefinal Progressable progressable;
84/** Configuration */85privatefinalImmutableClassesGiraphConfiguration conf;
8687/**88 * Constructor89 *90 * @param progressable Progressable used to report progress91 * @param conf Configuration92 */93publicAllAggregatorServerData(Progressable progressable,
94ImmutableClassesGiraphConfiguration conf) {
95this.progressable = progressable;
96this.conf = conf;
97 workersBarrier = newTaskIdsPermitsBarrier(progressable);
98 masterBarrier = newTaskIdsPermitsBarrier(progressable);
99 }
100101/**102 * Received value through global communication from master.103 * @param name Name104 * @param type Global communication type105 * @param value Object value106 */107publicvoid receiveValueFromMaster(
108 String name, GlobalCommType type, Writable value) {
109switch (type) {
110case BROADCAST:
111 broadcastedMap.put(name, value);
112break;
113114case REDUCE_OPERATIONS:
115 reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
116break;
117118default:
119thrownew IllegalArgumentException("Unkown request type " + type);
120 }
121 progressable.progress();
122 }
123124/**125 * Notify this object that an aggregator request from master has been126 * received.127 *128 * @param data Byte request with data received from master129 */130publicvoid receivedRequestFromMaster(byte[] data) {
131 masterData.add(data);
132 masterBarrier.releaseOnePermit();
133 }
134135/**136 * Notify this object about the total number of requests which should137 * arrive from master.138 *139 * @param requestCount Number of requests which should arrive140 * @param taskId Task id of master141 */142publicvoid receivedRequestCountFromMaster(long requestCount, int taskId) {
143 masterBarrier.requirePermits(requestCount, taskId);
144 }
145146/**147 * Notify this object that an aggregator request from some worker has been148 * received.149 */150publicvoid receivedRequestFromWorker() {
151 workersBarrier.releaseOnePermit();
152 }
153154/**155 * Notify this object about the total number of requests which should156 * arrive from one of the workers.157 *158 * @param requestCount Number of requests which should arrive159 * @param taskId Task id of that worker160 */161publicvoid receivedRequestCountFromWorker(long requestCount, int taskId) {
162 workersBarrier.requirePermits(requestCount, taskId);
163 }
164165/**166 * This function will wait until all aggregator requests from master have167 * arrived, and return that data afterwards.168 *169 * @param masterInfo Master info170 * @return Iterable through data received from master171 */172public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) {
173 masterBarrier.waitForRequiredPermits(
174 Collections.singleton(masterInfo.getTaskId()));
175if (LOG.isDebugEnabled()) {
176 LOG.debug("getDataFromMasterWhenReady: " +
177"Aggregator data for distribution ready");
178 }
179return masterData;
180 }
181182/**183 * This function will wait until all aggregator requests from workers have184 * arrived, and fill the maps for next superstep when ready.185 *186 * @param workerIds All workers in the job apart from the current one187 * @param broadcastedMapToFill Broadcast map to fill out188 * @param reducerMapToFill Registered reducer map to fill out.189 */190publicvoid fillNextSuperstepMapsWhenReady(
191 Set<Integer> workerIds,
192 Map<String, Writable> broadcastedMapToFill,
193 Map<String, Reducer<Object, Writable>> reducerMapToFill) {
194 workersBarrier.waitForRequiredPermits(workerIds);
195if (LOG.isDebugEnabled()) {
196 LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
197 }
198199 Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
200"broadcastedMap needs to be empty for filling");
201 Preconditions.checkArgument(reducerMapToFill.isEmpty(),
202"reducerMap needs to be empty for filling");
203204 broadcastedMapToFill.putAll(broadcastedMap);
205206for (Entry<String, ReduceOperation<Object, Writable>> entry :
207 reduceOpMap.entrySet()) {
208 reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
209 }
210211 broadcastedMap.clear();
212 reduceOpMap.clear();
213 masterData.clear();
214if (LOG.isDebugEnabled()) {
215 LOG.debug("reset: Ready for next superstep");
216 }
217 }
218 }
219