This project has retired. For details please refer to its
Attic page.
InternalAggregators xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.local;
19
20 import java.util.Map;
21 import java.util.Map.Entry;
22
23 import org.apache.giraph.master.MasterGlobalCommUsage;
24 import org.apache.giraph.reducers.ReduceOperation;
25 import org.apache.giraph.reducers.Reducer;
26 import org.apache.giraph.utils.WritableUtils;
27 import org.apache.giraph.worker.WorkerGlobalCommUsage;
28 import org.apache.hadoop.io.Writable;
29
30 import com.google.common.collect.Maps;
31
32
33
34
35 @SuppressWarnings("unchecked")
36 class InternalAggregators
37 implements MasterGlobalCommUsage, WorkerGlobalCommUsage {
38 private final boolean runAllChecks;
39
40
41 private final Map<String, Reducer<Object, Writable>> reducerMap =
42 Maps.newHashMap();
43
44 private final Map<String, Writable> broadcastMap =
45 Maps.newHashMap();
46
47 private final Map<String, Writable> reducedMap =
48 Maps.newHashMap();
49
50 public InternalAggregators(boolean runAllChecks) {
51 this.runAllChecks = runAllChecks;
52 }
53
54 private static <T> T getOrThrow(
55 Map<String, T> map, String mapName, String key) {
56 T value = map.get(key);
57 if (value == null) {
58 throw new IllegalArgumentException(
59 key + " not present in " + mapName);
60 }
61 return value;
62 }
63
64 @Override
65 public void broadcast(String name, Writable value) {
66 broadcastMap.put(name, value);
67 }
68
69 @Override
70 public <B extends Writable> B getBroadcast(String name) {
71 return (B) getOrThrow(broadcastMap, "broadcastMap", name);
72 }
73
74 @Override
75 public <S, R extends Writable> void registerReducer(
76 String name, ReduceOperation<S, R> reduceOp) {
77 registerReducer(name, reduceOp, reduceOp.createInitialValue());
78 }
79
80 @Override
81 public <S, R extends Writable> void registerReducer(
82 String name, ReduceOperation<S, R> reduceOp,
83 R globalInitialValue) {
84 if (reducerMap.containsKey(name)) {
85 throw new IllegalArgumentException(
86 "Reducer with name " + name + " was already registered, " +
87 " and is " + reducerMap.get(name).getReduceOp() +
88 ", and we are trying to " + " register " + reduceOp);
89 }
90 if (reduceOp == null) {
91 throw new IllegalArgumentException(
92 "null reducer cannot be registered, with name " + name);
93 }
94 if (globalInitialValue == null) {
95 throw new IllegalArgumentException(
96 "global initial value for reducer cannot be null, but is for " +
97 reduceOp + " with naem" + name);
98 }
99
100 Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
101 reducerMap.put(name, (Reducer<Object, Writable>) reducer);
102 }
103
104 @Override
105 public void reduce(String name, Object value) {
106 Reducer<Object, Writable> reducer =
107 getOrThrow(reducerMap, "reducerMap", name);
108 synchronized (reducer) {
109 reducer.reduce(value);
110 }
111 }
112
113 @Override
114 public void reduceMerge(String name, Writable value) {
115 Reducer<Object, Writable> reducer =
116 getOrThrow(reducerMap, "reducerMap", name);
117 synchronized (reducer) {
118 reducer.reduceMerge(value);
119 }
120 }
121
122 @Override
123 public <R extends Writable> R getReduced(String name) {
124 return (R) getOrThrow(reducedMap, "reducedMap", name);
125 }
126
127 public synchronized void afterWorkerBeforeMaster() {
128 broadcastMap.clear();
129 reducedMap.clear();
130 for (Entry<String, Reducer<Object, Writable>> entry :
131 reducerMap.entrySet()) {
132 Writable value = entry.getValue().getCurrentValue();
133 if (runAllChecks) {
134 Writable newValue = entry.getValue().createInitialValue();
135 WritableUtils.copyInto(value, newValue);
136 value = newValue;
137 }
138 reducedMap.put(entry.getKey(), value);
139 }
140 reducerMap.clear();
141 }
142 }