This project has retired. For details please refer to its
Attic page.
HugeArrayUtils xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.array;
19
20 import java.util.ArrayList;
21
22 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
25 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
26 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
27 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
28 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
29 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
30 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
31 import org.apache.giraph.conf.IntConfOption;
32 import org.apache.giraph.function.ObjectHolder;
33 import org.apache.giraph.function.Supplier;
34 import org.apache.giraph.function.primitive.Int2ObjFunction;
35 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
36 import org.apache.giraph.reducers.ReduceOperation;
37 import org.apache.giraph.types.ops.PrimitiveTypeOps;
38 import org.apache.giraph.types.ops.TypeOpsUtils;
39 import org.apache.giraph.types.ops.collections.array.WArrayList;
40 import org.apache.giraph.utils.ArrayWritable;
41 import org.apache.giraph.worker.WorkerBroadcastUsage;
42 import org.apache.hadoop.io.Writable;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class HugeArrayUtils {
67
68
69 private static final IntConfOption NUM_STRIPES = new IntConfOption(
70 "giraph.reducers.HugeArrayUtils.num_stripes", 500000,
71 "Number of distict reducers to create. If array is smaller then this" +
72 "number, each element will be it's own reducer");
73
74 private HugeArrayUtils() { }
75
76
77
78
79
80
81
82
83
84
85 public static <S, R extends Writable>
86 ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
87 final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
88 final CreateReducersApi reduceApi) {
89 return createGlobalReducerArrayHandle(
90 fixedSize, elementReduceOp, reduceApi,
91 NUM_STRIPES.get(reduceApi.getConf()));
92 }
93
94
95
96
97
98
99
100
101
102
103
104 public static <S, R extends Writable>
105 ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
106 final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
107 final CreateReducersApi reduceApi, int maxNumStripes) {
108 PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
109 (Class<R>) elementReduceOp.createInitialValue().getClass());
110
111 final CreateReducerFunctionApi
112 createReducer = new CreateReducerFunctionApi() {
113 @Override
114 public <S, R extends Writable> ReducerHandle<S, R> createReducer(
115 ReduceOperation<S, R> reduceOp) {
116 return reduceApi.createGlobalReducer(reduceOp);
117 }
118 };
119
120 if (fixedSize < maxNumStripes) {
121 return new ArrayOfReducers<>(
122 fixedSize,
123 new Supplier<ReducerHandle<S, R>>() {
124 @Override
125 public ReducerHandle<S, R> get() {
126 return createReducer.createReducer(elementReduceOp);
127 }
128 });
129 } else {
130 final ObjectStriping striping =
131 new ObjectStriping(fixedSize, maxNumStripes);
132
133 final ArrayList<ReducerArrayHandle<S, R>> handles =
134 new ArrayList<>(striping.getSplits());
135 for (int i = 0; i < striping.getSplits(); i++) {
136 if (typeOps != null) {
137 handles.add(BasicArrayReduce.createArrayHandles(
138 striping.getSplitSize(i), typeOps,
139 elementReduceOp, createReducer));
140 } else {
141 handles.add(ArrayReduce.createArrayHandles(
142 striping.getSplitSize(i), elementReduceOp, createReducer));
143 }
144 }
145
146 return new ReducerArrayHandle<S, R>() {
147 @Override
148 public ReducerHandle<S, R> get(int index) {
149 if ((index >= fixedSize) || (index < 0)) {
150 throw new RuntimeException(
151 "Reducer Access out of bounds: requested : " +
152 index + " from array of size : " + fixedSize);
153 }
154 int reducerIndex = striping.getSplitIndex(index);
155 int insideIndex = striping.getInsideIndex(index);
156 return handles.get(reducerIndex).get(insideIndex);
157 }
158
159 @Override
160 public int getStaticSize() {
161 return fixedSize;
162 }
163
164 @Override
165 public int getReducedSize(BlockMasterApi master) {
166 return getStaticSize();
167 }
168
169 @Override
170 public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
171 throw new UnsupportedOperationException("for now not supported");
172 }
173 };
174 }
175 }
176
177
178
179
180
181
182
183
184
185 public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
186 final int count,
187 final Int2ObjFunction<V> valueSupplier,
188 final BlockMasterApi master) {
189 return broadcast(count, valueSupplier, null, master);
190 }
191
192
193
194
195
196
197
198
199
200
201
202 public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
203 final int count,
204 final Int2ObjFunction<V> valueSupplier,
205 final PrimitiveTypeOps<V> typeOps,
206 final BlockMasterApi master) {
207 int numStripes = NUM_STRIPES.get(master.getConf());
208 if (count < numStripes) {
209 return new ArrayOfBroadcasts<>(
210 count,
211 new Int2ObjFunction<BroadcastHandle<V>>() {
212 @Override
213 public BroadcastHandle<V> apply(int i) {
214
215
216
217 return master.broadcast(
218 typeOps != null ?
219 typeOps.createCopy(valueSupplier.apply(i)) :
220 valueSupplier.apply(i));
221 }
222 });
223 } else {
224 ObjectStriping striping = new ObjectStriping(count, numStripes);
225 final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
226
227 if (typeOps != null) {
228 handleSupplier = getPrimitiveBroadcastHandleSupplier(
229 valueSupplier, typeOps, master, striping);
230 } else {
231 handleSupplier = getObjectBroadcastHandleSupplier(
232 valueSupplier, master, striping);
233 }
234 return new BroadcastArrayHandle<V>() {
235 @Override
236 public BroadcastHandle<V> get(int index) {
237 if (index >= count || index < 0) {
238 throw new RuntimeException(
239 "Broadcast Access out of bounds: requested: " +
240 index + " from array of size : " + count);
241 }
242 return handleSupplier.apply(index);
243 }
244
245 @Override
246 public int getBroadcastedSize(WorkerBroadcastUsage worker) {
247 return count;
248 }
249
250 @Override
251 public int getStaticSize() {
252 return count;
253 }
254 };
255 }
256 }
257
258 private static <V extends Writable>
259 Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
260 final Int2ObjFunction<V> valueSupplier,
261 final BlockMasterApi master, final ObjectStriping striping) {
262 final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
263 final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
264 new ArrayOfHandles<>(
265 striping.getSplits(),
266 new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
267 @Override
268 public BroadcastHandle<ArrayWritable<V>> apply(int value) {
269 int size = striping.getSplitSize(value);
270 int start = striping.getSplitStart(value);
271 V[] array = (V[]) new Writable[size];
272 for (int i = 0; i < size; i++) {
273 array[i] = valueSupplier.apply(start + i);
274 if (elementClass.get() == null) {
275 elementClass.apply((Class<V>) array[i].getClass());
276 }
277 }
278 return master.broadcast(
279 new ArrayWritable<>(elementClass.get(), array));
280 }
281 });
282
283 final IntRef insideIndex = new IntRef(-1);
284 final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
285 new ObjectHolder<>();
286
287 final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
288 @Override
289 public V getBroadcast(WorkerBroadcastUsage worker) {
290 return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
291 }
292 };
293
294 return createBroadcastHandleSupplier(
295 striping, arrayOfBroadcasts, insideIndex, handleHolder,
296 reusableHandle);
297 }
298
299 private static <V extends Writable>
300 Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
301 final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
302 final BlockMasterApi master, final ObjectStriping striping) {
303 final ArrayOfHandles<BroadcastHandle<WArrayList<V>>> arrayOfBroadcasts =
304 new ArrayOfHandles<>(
305 striping.getSplits(),
306 new Int2ObjFunction<BroadcastHandle<WArrayList<V>>>() {
307 @Override
308 public BroadcastHandle<WArrayList<V>> apply(int value) {
309 int size = striping.getSplitSize(value);
310 int start = striping.getSplitStart(value);
311 WArrayList<V> array = typeOps.createArrayList(size);
312 for (int i = 0; i < size; i++) {
313 array.addW(valueSupplier.apply(start + i));
314 }
315 return master.broadcast(array);
316 }
317 });
318
319 final IntRef insideIndex = new IntRef(-1);
320 final ObjectHolder<BroadcastHandle<WArrayList<V>>> handleHolder =
321 new ObjectHolder<>();
322 final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
323 private final V reusable = typeOps.create();
324 @Override
325 public V getBroadcast(WorkerBroadcastUsage worker) {
326 handleHolder.get().getBroadcast(worker).getIntoW(
327 insideIndex.value, reusable);
328 return reusable;
329 }
330 };
331
332 return createBroadcastHandleSupplier(
333 striping, arrayOfBroadcasts, insideIndex, handleHolder,
334 reusableHandle);
335 }
336
337 private static <V extends Writable, A>
338 Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
339 final ObjectStriping striping,
340 final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
341 final IntRef insideIndex,
342 final ObjectHolder<BroadcastHandle<A>> handleHolder,
343 final BroadcastHandle<V> reusableHandle) {
344 final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
345 new Int2ObjFunction<BroadcastHandle<V>>() {
346 @Override
347 public BroadcastHandle<V> apply(int index) {
348 int broadcastIndex = striping.getSplitIndex(index);
349 insideIndex.value = striping.getInsideIndex(index);
350 handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
351 return reusableHandle;
352 }
353 };
354 return handleProvider;
355 }
356
357
358
359
360
361 static class ObjectStriping {
362 private final int splits;
363 private final int indicesPerObject;
364 private final int overflowNum;
365 private final int beforeOverflow;
366
367 public ObjectStriping(int size, int splits) {
368 this.splits = splits;
369 this.indicesPerObject = size / splits;
370 this.overflowNum = size % splits;
371 this.beforeOverflow = overflowNum * (indicesPerObject + 1);
372 }
373
374 public int getSplits() {
375 return splits;
376 }
377
378 public int getSplitSize(int splitIndex) {
379 return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
380 }
381
382 public int getSplitStart(int splitIndex) {
383 if (splitIndex < overflowNum) {
384 return splitIndex * (indicesPerObject + 1);
385 } else {
386 return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
387 }
388 }
389
390 public int getSplitIndex(int objectIndex) {
391 if (objectIndex < beforeOverflow) {
392 return objectIndex / (indicesPerObject + 1);
393 } else {
394 return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
395 }
396 }
397
398 public int getInsideIndex(int objectIndex) {
399 if (objectIndex < beforeOverflow) {
400 return objectIndex % (indicesPerObject + 1);
401 } else {
402 return (objectIndex - beforeOverflow) % indicesPerObject;
403 }
404 }
405 }
406 }