This project has retired. For details please refer to its
Attic page.
WorkerAggregatorHandler xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.worker;
19
20 import java.io.IOException;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24
25 import org.apache.giraph.bsp.CentralizedServiceWorker;
26 import org.apache.giraph.comm.GlobalCommType;
27 import org.apache.giraph.comm.aggregators.AggregatorUtils;
28 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
29 import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
30 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
31 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
32 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33 import org.apache.giraph.reducers.ReduceOperation;
34 import org.apache.giraph.reducers.Reducer;
35 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
36 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
37 import org.apache.giraph.utils.WritableUtils;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.util.Progressable;
40 import org.apache.log4j.Logger;
41
42 import com.google.common.collect.Maps;
43 import com.google.common.collect.Sets;
44
45
46 public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
47
48 private static final Logger LOG =
49 Logger.getLogger(WorkerAggregatorHandler.class);
50
51 private final Map<String, Writable> broadcastedMap =
52 Maps.newHashMap();
53
54 private final Map<String, Reducer<Object, Writable>> reducerMap =
55 Maps.newHashMap();
56
57
58 private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
59
60 private final Progressable progressable;
61
62 private final int maxBytesPerAggregatorRequest;
63
64 private final ImmutableClassesGiraphConfiguration conf;
65
66
67
68
69
70
71
72
73 public WorkerAggregatorHandler(
74 CentralizedServiceWorker<?, ?, ?> serviceWorker,
75 ImmutableClassesGiraphConfiguration conf,
76 Progressable progressable) {
77 this.serviceWorker = serviceWorker;
78 this.progressable = progressable;
79 this.conf = conf;
80 maxBytesPerAggregatorRequest = conf.getInt(
81 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
82 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
83 }
84
85 @Override
86 public <B extends Writable> B getBroadcast(String name) {
87 B value = (B) broadcastedMap.get(name);
88 if (value == null) {
89 LOG.warn("getBroadcast: " +
90 AggregatorUtils.getUnregisteredBroadcastMessage(name,
91 broadcastedMap.size() != 0, conf));
92 }
93 return value;
94 }
95
96 @Override
97 public void reduce(String name, Object value) {
98 Reducer<Object, Writable> reducer = reducerMap.get(name);
99 if (reducer != null) {
100 progressable.progress();
101 synchronized (reducer) {
102 reducer.reduce(value);
103 }
104 } else {
105 throw new IllegalStateException("reduce: " +
106 AggregatorUtils.getUnregisteredReducerMessage(name,
107 reducerMap.size() != 0, conf));
108 }
109 }
110
111
112
113
114
115
116 @Override
117 public void reduceMerge(String name, Writable valueToReduce) {
118 Reducer<Object, Writable> reducer = reducerMap.get(name);
119 if (reducer != null) {
120 progressable.progress();
121 synchronized (reducer) {
122 reducer.reduceMerge(valueToReduce);
123 }
124 } else {
125 throw new IllegalStateException("reduce: " +
126 AggregatorUtils.getUnregisteredReducerMessage(name,
127 reducerMap.size() != 0, conf));
128 }
129 }
130
131
132
133
134
135
136 public void prepareSuperstep(
137 WorkerAggregatorRequestProcessor requestProcessor) {
138 broadcastedMap.clear();
139 reducerMap.clear();
140
141 if (LOG.isDebugEnabled()) {
142 LOG.debug("prepareSuperstep: Start preparing aggregators");
143 }
144 AllAggregatorServerData allGlobalCommData =
145 serviceWorker.getServerData().getAllAggregatorData();
146
147 Iterable<byte[]> dataToDistribute =
148 allGlobalCommData.getDataFromMasterWhenReady(
149 serviceWorker.getMasterInfo());
150 try {
151
152 requestProcessor.distributeReducedValues(dataToDistribute);
153 } catch (IOException e) {
154 throw new IllegalStateException("prepareSuperstep: " +
155 "IOException occurred while trying to distribute aggregators", e);
156 }
157
158 allGlobalCommData.fillNextSuperstepMapsWhenReady(
159 getOtherWorkerIdsSet(), broadcastedMap,
160 reducerMap);
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("prepareSuperstep: Aggregators prepared");
163 }
164 }
165
166
167
168
169
170
171 public void finishSuperstep(
172 WorkerAggregatorRequestProcessor requestProcessor) {
173 if (LOG.isInfoEnabled()) {
174 LOG.info("finishSuperstep: Start gathering aggregators, " +
175 "workers will send their aggregated values " +
176 "once they are done with superstep computation");
177 }
178 OwnerAggregatorServerData ownerGlobalCommData =
179 serviceWorker.getServerData().getOwnerAggregatorData();
180
181
182 for (Map.Entry<String, Reducer<Object, Writable>> entry :
183 reducerMap.entrySet()) {
184 try {
185 boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
186 entry.getValue().getCurrentValue());
187 if (!sent) {
188
189 ownerGlobalCommData.reduce(entry.getKey(),
190 entry.getValue().getCurrentValue());
191 }
192 } catch (IOException e) {
193 throw new IllegalStateException("finishSuperstep: " +
194 "IOException occurred while sending aggregator " +
195 entry.getKey() + " to its owner", e);
196 }
197 progressable.progress();
198 }
199 try {
200
201 requestProcessor.flush();
202 } catch (IOException e) {
203 throw new IllegalStateException("finishSuperstep: " +
204 "IOException occurred while sending aggregators to owners", e);
205 }
206
207
208 Iterable<Map.Entry<String, Writable>> myReducedValues =
209 ownerGlobalCommData.getMyReducedValuesWhenReady(
210 getOtherWorkerIdsSet());
211
212
213 GlobalCommValueOutputStream globalOutput =
214 new GlobalCommValueOutputStream(false);
215 for (Map.Entry<String, Writable> entry : myReducedValues) {
216 try {
217 int currentSize = globalOutput.addValue(entry.getKey(),
218 GlobalCommType.REDUCED_VALUE,
219 entry.getValue());
220 if (currentSize > maxBytesPerAggregatorRequest) {
221 requestProcessor.sendReducedValuesToMaster(
222 globalOutput.flush());
223 }
224 progressable.progress();
225 } catch (IOException e) {
226 throw new IllegalStateException("finishSuperstep: " +
227 "IOException occurred while writing aggregator " +
228 entry.getKey(), e);
229 }
230 }
231 try {
232 requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
233 } catch (IOException e) {
234 throw new IllegalStateException("finishSuperstep: " +
235 "IOException occured while sending aggregators to master", e);
236 }
237
238 serviceWorker.getWorkerClient().waitAllRequests();
239
240 ownerGlobalCommData.reset();
241 if (LOG.isDebugEnabled()) {
242 LOG.debug("finishSuperstep: Aggregators finished");
243 }
244 }
245
246
247
248
249
250
251
252 public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
253 if (AggregatorUtils.useThreadLocalAggregators(conf)) {
254 return new ThreadLocalWorkerGlobalCommUsage();
255 } else {
256 return this;
257 }
258 }
259
260 @Override
261 public void finishThreadComputation() {
262
263
264 }
265
266
267
268
269
270
271 public Set<Integer> getOtherWorkerIdsSet() {
272 Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize(
273 serviceWorker.getWorkerInfoList().size());
274 for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
275 if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) {
276 otherWorkers.add(workerInfo.getTaskId());
277 }
278 }
279 return otherWorkers;
280 }
281
282
283
284
285
286
287
288
289 public class ThreadLocalWorkerGlobalCommUsage
290 implements WorkerThreadGlobalCommUsage {
291
292 private final Map<String, Reducer<Object, Writable>> threadReducerMap;
293
294
295
296
297
298
299
300 public ThreadLocalWorkerGlobalCommUsage() {
301 threadReducerMap = Maps.newHashMapWithExpectedSize(
302 WorkerAggregatorHandler.this.reducerMap.size());
303
304 UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
305 UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
306
307 for (Entry<String, Reducer<Object, Writable>> entry :
308 reducerMap.entrySet()) {
309 ReduceOperation<Object, Writable> globalReduceOp =
310 entry.getValue().getReduceOp();
311
312 ReduceOperation<Object, Writable> threadLocalCopy =
313 WritableUtils.createCopy(out, in, globalReduceOp, conf);
314
315 threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
316 }
317 }
318
319 @Override
320 public void reduce(String name, Object value) {
321 Reducer<Object, Writable> reducer = threadReducerMap.get(name);
322 if (reducer != null) {
323 progressable.progress();
324 reducer.reduce(value);
325 } else {
326 throw new IllegalStateException("reduce: " +
327 AggregatorUtils.getUnregisteredAggregatorMessage(name,
328 threadReducerMap.size() != 0, conf));
329 }
330 }
331
332 @Override
333 public void reduceMerge(String name, Writable value) {
334 Reducer<Object, Writable> reducer = threadReducerMap.get(name);
335 if (reducer != null) {
336 progressable.progress();
337 reducer.reduceMerge(value);
338 } else {
339 throw new IllegalStateException("reduceMerge: " +
340 AggregatorUtils.getUnregisteredAggregatorMessage(name,
341 threadReducerMap.size() != 0, conf));
342 }
343 }
344
345 @Override
346 public <B extends Writable> B getBroadcast(String name) {
347 return WorkerAggregatorHandler.this.getBroadcast(name);
348 }
349
350 @Override
351 public void finishThreadComputation() {
352
353
354 for (Entry<String, Reducer<Object, Writable>> entry :
355 threadReducerMap.entrySet()) {
356 WorkerAggregatorHandler.this.reduceMerge(entry.getKey(),
357 entry.getValue().getCurrentValue());
358 }
359 }
360 }
361
362 }