This project has retired. For details please refer to its
Attic page.
BasicMapReduce xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.map;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.Iterator;
24
25 import org.apache.commons.lang3.tuple.MutablePair;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
28 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
29 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
30 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
31 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
32 import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle;
33 import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
34 import org.apache.giraph.master.MasterGlobalCommUsage;
35 import org.apache.giraph.reducers.ReduceOperation;
36 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
37 import org.apache.giraph.types.ops.PrimitiveTypeOps;
38 import org.apache.giraph.types.ops.TypeOpsUtils;
39 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
40 import org.apache.giraph.types.ops.collections.WritableWriter;
41 import org.apache.giraph.utils.WritableUtils;
42 import org.apache.giraph.worker.WorkerBroadcastUsage;
43 import org.apache.hadoop.io.Writable;
44 import org.apache.hadoop.io.WritableComparable;
45
46
47
48
49
50
51
52
53
54
55 public class BasicMapReduce<K extends WritableComparable, S,
56 R extends Writable>
57 implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> {
58 private PrimitiveIdTypeOps<K> keyTypeOps;
59 private PrimitiveTypeOps<R> typeOps;
60 private ReduceOperation<S, R> elementReduceOp;
61 private WritableWriter<R> writer;
62
63 public BasicMapReduce() {
64 }
65
66
67
68
69
70
71
72
73
74 public BasicMapReduce(
75 PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
76 ReduceOperation<S, R> elementReduceOp) {
77 this.keyTypeOps = keyTypeOps;
78 this.typeOps = typeOps;
79 this.elementReduceOp = elementReduceOp;
80 init();
81 }
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public static <K extends WritableComparable, S, R extends Writable>
98 ReducerMapHandle<K, S, R> createLocalMapHandles(
99 PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
100 ReduceOperation<S, R> elementReduceOp,
101 final CreateReducersApi reduceApi) {
102 return createMapHandles(
103 keyTypeOps, typeOps, elementReduceOp,
104 new CreateReducerFunctionApi() {
105 @Override
106 public <S, R extends Writable> ReducerHandle<S, R> createReducer(
107 ReduceOperation<S, R> reduceOp) {
108 return reduceApi.createLocalReducer(reduceOp);
109 }
110 });
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 public static <K extends WritableComparable, S, R extends Writable>
128 ReducerMapHandle<K, S, R> createMapHandles(
129 final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps,
130 ReduceOperation<S, R> elementReduceOp,
131 CreateReducerFunctionApi createFunction) {
132 final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle =
133 createFunction.createReducer(
134 new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp));
135 final K curIndex = keyTypeOps.create();
136 final R reusableValue = typeOps.create();
137 final R initialValue = elementReduceOp.createInitialValue();
138 final MutablePair<K, S> reusablePair = MutablePair.of(null, null);
139 final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
140 @Override
141 public R getReducedValue(MasterGlobalCommUsage master) {
142 Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master);
143 R value = result.get(curIndex);
144 if (value == null) {
145 typeOps.set(reusableValue, initialValue);
146 } else {
147 typeOps.set(reusableValue, value);
148 }
149 return reusableValue;
150 }
151
152 @Override
153 public void reduce(S valueToReduce) {
154 reusablePair.setLeft(curIndex);
155 reusablePair.setRight(valueToReduce);
156 reduceHandle.reduce(reusablePair);
157 }
158
159 @Override
160 public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
161 throw new UnsupportedOperationException();
162 }
163 };
164
165 return new ReducerMapHandle<K, S, R>() {
166 @Override
167 public ReducerHandle<S, R> get(K key) {
168 keyTypeOps.set(curIndex, key);
169 return elementReduceHandle;
170 }
171
172 @Override
173 public int getReducedSize(BlockMasterApi master) {
174 return reduceHandle.getReducedValue(master).size();
175 }
176
177 @Override
178 public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) {
179 final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle =
180 reduceHandle.broadcastValue(master);
181 final K curIndex = keyTypeOps.create();
182 final R reusableValue = typeOps.create();
183 final BroadcastHandle<R>
184 elementBroadcastHandle = new BroadcastHandle<R>() {
185 @Override
186 public R getBroadcast(WorkerBroadcastUsage worker) {
187 Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker);
188 R value = result.get(curIndex);
189 if (value == null) {
190 typeOps.set(reusableValue, initialValue);
191 } else {
192 typeOps.set(reusableValue, value);
193 }
194 return reusableValue;
195 }
196 };
197 return new BroadcastMapHandle<K, R>() {
198 @Override
199 public BroadcastHandle<R> get(K key) {
200 keyTypeOps.set(curIndex, key);
201 return elementBroadcastHandle;
202 }
203
204 @Override
205 public int getBroadcastedSize(WorkerBroadcastUsage worker) {
206 return broadcastHandle.getBroadcast(worker).size();
207 }
208 };
209 }
210 };
211 }
212
213 private void init() {
214 writer = new WritableWriter<R>() {
215 @Override
216 public void write(DataOutput out, R value) throws IOException {
217 value.write(out);
218 }
219
220 @Override
221 public R readFields(DataInput in) throws IOException {
222 R result = typeOps.create();
223 result.readFields(in);
224 return result;
225 }
226 };
227 }
228
229 @Override
230 public Basic2ObjectMap<K, R> createInitialValue() {
231 return keyTypeOps.create2ObjectOpenHashMap(writer);
232 }
233
234 @Override
235 public Basic2ObjectMap<K, R> reduce(
236 Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) {
237 R result = curValue.get(valueToReduce.getLeft());
238 if (result == null) {
239 result = typeOps.create();
240 }
241 result = elementReduceOp.reduce(result, valueToReduce.getRight());
242 curValue.put(valueToReduce.getLeft(), result);
243 return curValue;
244 }
245
246 @Override
247 public Basic2ObjectMap<K, R> reduceMerge(
248 Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) {
249 for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) {
250 K key = iter.next();
251
252 R result = curValue.get(key);
253 if (result == null) {
254 result = typeOps.create();
255 }
256 result = elementReduceOp.reduceMerge(result, valueToReduce.get(key));
257 curValue.put(key, result);
258 }
259 return curValue;
260 }
261
262 @Override
263 public void write(DataOutput out) throws IOException {
264 TypeOpsUtils.writeTypeOps(keyTypeOps, out);
265 TypeOpsUtils.writeTypeOps(typeOps, out);
266 WritableUtils.writeWritableObject(elementReduceOp, out);
267 }
268
269 @Override
270 public void readFields(DataInput in) throws IOException {
271 keyTypeOps = TypeOpsUtils.readTypeOps(in);
272 typeOps = TypeOpsUtils.readTypeOps(in);
273 elementReduceOp = WritableUtils.readWritableObject(in, null);
274 init();
275 }
276 }