This project has retired. For details please refer to its
Attic page.
BasicArrayReduce xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.array;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.commons.lang3.tuple.MutablePair;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
28 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
29 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
33 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
34 import org.apache.giraph.master.MasterGlobalCommUsage;
35 import org.apache.giraph.reducers.ReduceOperation;
36 import org.apache.giraph.types.ops.PrimitiveTypeOps;
37 import org.apache.giraph.types.ops.TypeOpsUtils;
38 import org.apache.giraph.types.ops.collections.array.WArrayList;
39 import org.apache.giraph.utils.WritableUtils;
40 import org.apache.giraph.worker.WorkerBroadcastUsage;
41 import org.apache.hadoop.io.Writable;
42
43
44
45
46
47
48
49
50
51
52 public class BasicArrayReduce<S, R extends Writable>
53 implements ReduceOperation<Pair<IntRef, S>, WArrayList<R>> {
54 private int fixedSize;
55 private PrimitiveTypeOps<R> typeOps;
56 private ReduceOperation<S, R> elementReduceOp;
57 private R initialElement;
58 private R reusable;
59 private R reusable2;
60
61 public BasicArrayReduce() {
62 }
63
64
65
66
67
68
69
70
71
72
73 public BasicArrayReduce(
74 int fixedSize,
75 PrimitiveTypeOps<R> typeOps,
76 ReduceOperation<S, R> elementReduceOp) {
77 this.fixedSize = fixedSize;
78 this.typeOps = typeOps;
79 this.elementReduceOp = elementReduceOp;
80 init();
81 }
82
83
84
85
86
87
88
89
90
91 public BasicArrayReduce(
92 PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
93 this(-1, typeOps, elementReduceOp);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public static <S, R extends Writable>
111 ReducerArrayHandle<S, R> createLocalArrayHandles(
112 PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
113 CreateReducersApi reduceApi) {
114 return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public static <S, R extends Writable>
132 ReducerArrayHandle<S, R> createLocalArrayHandles(
133 int fixedSize, PrimitiveTypeOps<R> typeOps,
134 ReduceOperation<S, R> elementReduceOp,
135 final CreateReducersApi reduceApi) {
136 return createArrayHandles(fixedSize, typeOps, elementReduceOp,
137 new CreateReducerFunctionApi() {
138 @Override
139 public <S, R extends Writable> ReducerHandle<S, R> createReducer(
140 ReduceOperation<S, R> reduceOp) {
141 return reduceApi.createLocalReducer(reduceOp);
142 }
143 });
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 public static <S, R extends Writable>
160 ReducerArrayHandle<S, R> createArrayHandles(
161 PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
162 CreateReducerFunctionApi createFunction) {
163 return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 public static <S, R extends Writable>
181 ReducerArrayHandle<S, R> createArrayHandles(
182 final int fixedSize, final PrimitiveTypeOps<R> typeOps,
183 ReduceOperation<S, R> elementReduceOp,
184 CreateReducerFunctionApi createFunction) {
185 final ReducerHandle<Pair<IntRef, S>, WArrayList<R>> reduceHandle =
186 createFunction.createReducer(
187 new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
188 final IntRef curIndex = new IntRef(0);
189 final R reusableValue = typeOps.create();
190 final R initialValue = elementReduceOp.createInitialValue();
191 final MutablePair<IntRef, S> reusablePair =
192 MutablePair.of(new IntRef(0), null);
193 final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
194 @Override
195 public R getReducedValue(MasterGlobalCommUsage master) {
196 WArrayList<R> result = reduceHandle.getReducedValue(master);
197 if (fixedSize == -1 && curIndex.value >= result.size()) {
198 typeOps.set(reusableValue, initialValue);
199 } else {
200 result.getIntoW(curIndex.value, reusableValue);
201 }
202 return reusableValue;
203 }
204
205 @Override
206 public void reduce(S valueToReduce) {
207 reusablePair.getLeft().value = curIndex.value;
208 reusablePair.setRight(valueToReduce);
209 reduceHandle.reduce(reusablePair);
210 }
211
212 @Override
213 public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
214 throw new UnsupportedOperationException();
215 }
216 };
217
218 return new ReducerArrayHandle<S, R>() {
219 @Override
220 public ReducerHandle<S, R> get(int index) {
221 curIndex.value = index;
222 return elementReduceHandle;
223 }
224
225 @Override
226 public int getStaticSize() {
227 if (fixedSize == -1) {
228 throw new UnsupportedOperationException(
229 "Cannot call size, when one is not specified upfront");
230 }
231 return fixedSize;
232 }
233
234 @Override
235 public int getReducedSize(BlockMasterApi master) {
236 return reduceHandle.getReducedValue(master).size();
237 }
238
239 @Override
240 public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
241 final BroadcastHandle<WArrayList<R>> broadcastHandle =
242 reduceHandle.broadcastValue(master);
243 final IntRef curIndex = new IntRef(0);
244 final R reusableValue = typeOps.create();
245 final BroadcastHandle<R>
246 elementBroadcastHandle = new BroadcastHandle<R>() {
247 @Override
248 public R getBroadcast(WorkerBroadcastUsage worker) {
249 WArrayList<R> result = broadcastHandle.getBroadcast(worker);
250 if (fixedSize == -1 && curIndex.value >= result.size()) {
251 typeOps.set(reusableValue, initialValue);
252 } else {
253 result.getIntoW(curIndex.value, reusableValue);
254 }
255 return reusableValue;
256 }
257 };
258 return new BroadcastArrayHandle<R>() {
259 @Override
260 public BroadcastHandle<R> get(int index) {
261 curIndex.value = index;
262 return elementBroadcastHandle;
263 }
264
265 @Override
266 public int getStaticSize() {
267 if (fixedSize == -1) {
268 throw new UnsupportedOperationException(
269 "Cannot call size, when one is not specified upfront");
270 }
271 return fixedSize;
272 }
273
274 @Override
275 public int getBroadcastedSize(WorkerBroadcastUsage worker) {
276 return broadcastHandle.getBroadcast(worker).size();
277 }
278 };
279 }
280 };
281 }
282
283
284 private void init() {
285 initialElement = elementReduceOp.createInitialValue();
286 reusable = typeOps.create();
287 reusable2 = typeOps.create();
288 }
289
290 @Override
291 public WArrayList<R> createInitialValue() {
292 if (fixedSize != -1) {
293 WArrayList<R> list = typeOps.createArrayList(fixedSize);
294 fill(list, fixedSize);
295 return list;
296 } else {
297 return typeOps.createArrayList(1);
298 }
299 }
300
301 private void fill(WArrayList<R> list, int newSize) {
302 if (fixedSize != -1 && newSize > fixedSize) {
303 throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
304 }
305
306 if (list.capacity() < newSize) {
307 list.setCapacity(newSize);
308 }
309 while (list.size() < newSize) {
310 list.addW(initialElement);
311 }
312 }
313
314 @Override
315 public WArrayList<R> reduce(
316 WArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
317 int index = valueToReduce.getLeft().value;
318 fill(curValue, index + 1);
319 curValue.getIntoW(index, reusable);
320 R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
321 curValue.setW(index, result);
322 return curValue;
323 }
324
325 @Override
326 public WArrayList<R> reduceMerge(
327 WArrayList<R> curValue, WArrayList<R> valueToReduce) {
328 fill(curValue, valueToReduce.size());
329 for (int i = 0; i < valueToReduce.size(); i++) {
330 valueToReduce.getIntoW(i, reusable2);
331 curValue.getIntoW(i, reusable);
332 R result = elementReduceOp.reduceMerge(reusable, reusable2);
333 curValue.setW(i, result);
334 }
335
336 return curValue;
337 }
338
339 @Override
340 public void write(DataOutput out) throws IOException {
341 out.writeInt(fixedSize);
342 TypeOpsUtils.writeTypeOps(typeOps, out);
343 WritableUtils.writeWritableObject(elementReduceOp, out);
344 }
345
346 @Override
347 public void readFields(DataInput in) throws IOException {
348 fixedSize = in.readInt();
349 typeOps = TypeOpsUtils.readTypeOps(in);
350 elementReduceOp = WritableUtils.readWritableObject(in, null);
351 init();
352 }
353 }