This project has retired. For details please refer to its
Attic page.
ArrayReduce xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.array;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.lang.reflect.Array;
24
25 import org.apache.commons.lang3.tuple.MutablePair;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
28 import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
29 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
33 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
34 import org.apache.giraph.master.MasterGlobalCommUsage;
35 import org.apache.giraph.reducers.ReduceOperation;
36 import org.apache.giraph.utils.ArrayWritable;
37 import org.apache.giraph.utils.WritableUtils;
38 import org.apache.giraph.worker.WorkerBroadcastUsage;
39 import org.apache.hadoop.io.Writable;
40
41
42
43
44
45
46
47
48
49 public class ArrayReduce<S, R extends Writable>
50 implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> {
51 private int fixedSize;
52 private ReduceOperation<S, R> elementReduceOp;
53 private Class<R> elementClass;
54
55 public ArrayReduce() {
56 }
57
58
59
60
61
62
63
64
65 public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) {
66 this.fixedSize = fixedSize;
67 this.elementReduceOp = elementReduceOp;
68 init();
69 }
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public static <S, T extends Writable>
84 ReducerArrayHandle<S, T> createArrayHandles(
85 final int fixedSize, ReduceOperation<S, T> elementReduceOp,
86 CreateReducerFunctionApi createFunction) {
87 final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle =
88 createFunction.createReducer(
89 new ArrayReduce<>(fixedSize, elementReduceOp));
90
91 final IntRef curIndex = new IntRef(0);
92 final MutablePair<IntRef, S> reusablePair =
93 MutablePair.of(new IntRef(0), null);
94 final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() {
95 @Override
96 public T getReducedValue(MasterGlobalCommUsage master) {
97 ArrayWritable<T> result = reduceHandle.getReducedValue(master);
98 return result.get()[curIndex.value];
99 }
100
101 @Override
102 public void reduce(S valueToReduce) {
103 reusablePair.getLeft().value = curIndex.value;
104 reusablePair.setRight(valueToReduce);
105 reduceHandle.reduce(reusablePair);
106 }
107
108 @Override
109 public BroadcastHandle<T> broadcastValue(BlockMasterApi master) {
110 throw new UnsupportedOperationException();
111 }
112 };
113
114 return new ReducerArrayHandle<S, T>() {
115 @Override
116 public ReducerHandle<S, T> get(int index) {
117 curIndex.value = index;
118 return elementReduceHandle;
119 }
120
121 @Override
122 public int getStaticSize() {
123 return fixedSize;
124 }
125
126 @Override
127 public int getReducedSize(BlockMasterApi master) {
128 return getStaticSize();
129 }
130
131 @Override
132 public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) {
133 final BroadcastHandle<ArrayWritable<T>> broadcastHandle =
134 reduceHandle.broadcastValue(master);
135 final IntRef curIndex = new IntRef(0);
136 final BroadcastHandle<T>
137 elementBroadcastHandle = new BroadcastHandle<T>() {
138 @Override
139 public T getBroadcast(WorkerBroadcastUsage worker) {
140 ArrayWritable<T> result = broadcastHandle.getBroadcast(worker);
141 return result.get()[curIndex.value];
142 }
143 };
144 return new BroadcastArrayHandle<T>() {
145 @Override
146 public BroadcastHandle<T> get(int index) {
147 curIndex.value = index;
148 return elementBroadcastHandle;
149 }
150
151 @Override
152 public int getStaticSize() {
153 return fixedSize;
154 }
155
156 @Override
157 public int getBroadcastedSize(WorkerBroadcastUsage worker) {
158 return getStaticSize();
159 }
160 };
161 }
162 };
163 }
164
165 private void init() {
166 elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass();
167 }
168
169 @Override
170 public ArrayWritable<R> createInitialValue() {
171 R[] values = (R[]) Array.newInstance(elementClass, fixedSize);
172 for (int i = 0; i < fixedSize; i++) {
173 values[i] = elementReduceOp.createInitialValue();
174 }
175 return new ArrayWritable<>(elementClass, values);
176 }
177
178 @Override
179 public ArrayWritable<R> reduce(
180 ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) {
181 int index = valueToReduce.getLeft().value;
182 curValue.get()[index] =
183 elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight());
184 return curValue;
185 }
186
187 @Override
188 public ArrayWritable<R> reduceMerge(
189 ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) {
190 for (int i = 0; i < fixedSize; i++) {
191 curValue.get()[i] =
192 elementReduceOp.reduceMerge(
193 curValue.get()[i], valueToReduce.get()[i]);
194 }
195 return curValue;
196 }
197
198 @Override
199 public void write(DataOutput out) throws IOException {
200 out.writeInt(fixedSize);
201 WritableUtils.writeWritableObject(elementReduceOp, out);
202 }
203
204 @Override
205 public void readFields(DataInput in) throws IOException {
206 fixedSize = in.readInt();
207 elementReduceOp = WritableUtils.readWritableObject(in, null);
208 init();
209 }
210
211 }