1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.comm.aggregators; 20 21 import java.util.AbstractMap; 22 import java.util.Map; 23 import java.util.Set; 24 import java.util.concurrent.ConcurrentMap; 25 26 import org.apache.giraph.reducers.ReduceOperation; 27 import org.apache.giraph.reducers.Reducer; 28 import org.apache.giraph.utils.TaskIdsPermitsBarrier; 29 import org.apache.hadoop.io.Writable; 30 import org.apache.hadoop.util.Progressable; 31 import org.apache.log4j.Logger; 32 33 import com.google.common.base.Function; 34 import com.google.common.collect.Iterables; 35 import com.google.common.collect.Maps; 36 37 /** 38 * Class for holding aggregators which current worker owns, 39 * and aggregating partial aggregator values from workers. 40 * 41 * Protocol: 42 * 1. Before the beginning of superstep, worker receives its aggregators 43 * from master, and these aggregators will be registered to this class. 44 * Multiple registrations can be called concurrently. 45 * 2. During the superstep, whenever a worker finishes computation, 46 * it will send partial aggregated values to worker owner. This class is used 47 * to help deserialize the arriving aggregator values, and aggregate the values 48 * at the destination owner worker; these can happen concurrently. 49 * (we know step 1. is finished before anything from step 2. happens because 50 * other workers can't start computation before they receive aggregators 51 * which this worker owns) 52 * 3. This class also tracks the number of partial aggregator requests which 53 * worker received. In the end of superstep, getMyAggregatorValuesWhenReady 54 * will be called to ensure everything was received and get the values which 55 * need to be sent to master. 56 * Because of this counting, in step 2. even if worker owns no aggregators, 57 * it will still send a message without aggregator data. 58 * 4. In the end we reset to prepare for the next superstep. 59 */ 60 public class OwnerAggregatorServerData { 61 /** Class logger */ 62 private static final Logger LOG = 63 Logger.getLogger(OwnerAggregatorServerData.class); 64 /** Map of aggregators which current worker owns */ 65 private final ConcurrentMap<String, Reducer<Object, Writable>> 66 myReducerMap = Maps.newConcurrentMap(); 67 /** 68 * Counts the requests with partial aggregated values from other workers. 69 * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it 70 * has to receive. 71 */ 72 private final TaskIdsPermitsBarrier workersBarrier; 73 /** Progressable used to report progress */ 74 private final Progressable progressable; 75 76 /** 77 * Constructor 78 * 79 * @param progressable Progressable used to report progress 80 */ 81 public OwnerAggregatorServerData(Progressable progressable) { 82 this.progressable = progressable; 83 workersBarrier = new TaskIdsPermitsBarrier(progressable); 84 } 85 86 /** 87 * Register a reducer which current worker owns. Thread-safe. 88 * 89 * @param name Name of aggregator 90 * @param reduceOp Reduce operation 91 */ 92 public void registerReducer(String name, 93 ReduceOperation<Object, Writable> reduceOp) { 94 if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) { 95 LOG.debug("registerAggregator: The first registration after a reset()"); 96 } 97 myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp)); 98 progressable.progress(); 99 } 100 101 /** 102 * Reduce partial value of one of current worker's reducers. 103 * 104 * Thread-safe. Call only after reducers have been registered. 105 * 106 * @param name Name of the reducer 107 * @param value Value to reduce to it 108 */ 109 public void reduce(String name, Writable value) { 110 Reducer<Object, Writable> reducer = myReducerMap.get(name); 111 synchronized (reducer) { 112 reducer.reduceMerge(value); 113 } 114 progressable.progress(); 115 } 116 117 118 /** 119 * Create initial value for a reducer. Used so requests 120 * would be able to deserialize data. 121 * 122 * Thread-safe. Call only after reducer has been registered. 123 * 124 * @param name Name of the reducer 125 * @return Empty value 126 */ 127 public Writable createInitialValue(String name) { 128 Reducer<Object, Writable> reducer = myReducerMap.get(name); 129 synchronized (reducer) { 130 return reducer.createInitialValue(); 131 } 132 } 133 134 /** 135 * Notify this object that a partial aggregated values request from some 136 * worker have been received. Thread-safe. 137 */ 138 public void receivedRequestFromWorker() { 139 workersBarrier.releaseOnePermit(); 140 } 141 142 /** 143 * Notify this object about the total number of requests which should 144 * arrive from one of the workers. Thread-safe. 145 * 146 * @param requestCount Number of requests which should arrive 147 * @param taskId Task id of that worker 148 */ 149 public void receivedRequestCountFromWorker(long requestCount, int taskId) { 150 workersBarrier.requirePermits(requestCount, taskId); 151 } 152 153 /** 154 * This function will wait until all partial aggregated values from all 155 * workers are ready and aggregated, and return final aggregated values 156 * afterwards. 157 * 158 * @param workerIds All workers in the job apart from the current one 159 * @return Iterable through final aggregated values which this worker owns 160 */ 161 public Iterable<Map.Entry<String, Writable>> 162 getMyReducedValuesWhenReady(Set<Integer> workerIds) { 163 workersBarrier.waitForRequiredPermits(workerIds); 164 if (LOG.isDebugEnabled()) { 165 LOG.debug("getMyAggregatorValuesWhenReady: Values ready"); 166 } 167 return Iterables.transform(myReducerMap.entrySet(), 168 new Function<Map.Entry<String, Reducer<Object, Writable>>, 169 Map.Entry<String, Writable>>() { 170 @Override 171 public Map.Entry<String, Writable> apply( 172 Map.Entry<String, Reducer<Object, Writable>> aggregator) { 173 return new AbstractMap.SimpleEntry<String, Writable>( 174 aggregator.getKey(), 175 aggregator.getValue().getCurrentValue()); 176 } 177 }); 178 } 179 180 /** 181 * Prepare for next superstep 182 */ 183 public void reset() { 184 myReducerMap.clear(); 185 if (LOG.isDebugEnabled()) { 186 LOG.debug("reset: Ready for next superstep"); 187 } 188 } 189 190 }