This project has retired. For details please refer to its
Attic page.
DelegatePiece xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.piece.delegate;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.List;
23
24 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
28 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
29 import org.apache.giraph.block_app.framework.block.PieceCount;
30 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
32 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34 import org.apache.giraph.conf.MessageClasses;
35 import org.apache.giraph.function.Consumer;
36 import org.apache.giraph.graph.Vertex;
37 import org.apache.giraph.types.NoMessage;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.io.WritableComparable;
40
41 import com.google.common.base.Preconditions;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @SuppressWarnings("rawtypes")
69 public class DelegatePiece<I extends WritableComparable, V extends Writable,
70 E extends Writable, M extends Writable, WV, WM extends Writable, S>
71 extends AbstractPiece<I, V, E, M, WV, WM, S> {
72
73 private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
74
75 @SafeVarargs
76 @SuppressWarnings("unchecked")
77 public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
78 ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
79
80
81 this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
82 }
83
84 @SuppressWarnings("unchecked")
85 public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
86 ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
87
88
89 this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
90 }
91
92 protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
93 ArrayList<InnerVertexSender> workerSendFunctions,
94 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
95 return new DelegateWorkerSendFunctions(workerSendFunctions);
96 }
97
98 protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
99 ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
100 BlockWorkerReceiveApi<I> workerApi, S executionStage) {
101 return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
102 }
103
104 @Override
105 public InnerVertexSender getWrappedVertexSender(
106 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
107 ArrayList<InnerVertexSender> workerSendFunctions =
108 new ArrayList<>(innerPieces.size());
109 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
110 workerSendFunctions.add(
111 innerPiece.getWrappedVertexSender(workerApi, executionStage));
112 }
113 return delegateWorkerSendFunctions(
114 workerSendFunctions, workerApi, executionStage);
115 }
116
117 @Override
118 public InnerVertexReceiver getVertexReceiver(
119 BlockWorkerReceiveApi<I> workerApi, S executionStage) {
120 ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
121 new ArrayList<>(innerPieces.size());
122 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
123 workerReceiveFunctions.add(
124 innerPiece.getVertexReceiver(workerApi, executionStage));
125 }
126 return delegateWorkerReceiveFunctions(
127 workerReceiveFunctions, workerApi, executionStage);
128 }
129
130
131 protected class DelegateWorkerSendFunctions extends InnerVertexSender {
132 private final ArrayList<InnerVertexSender> workerSendFunctions;
133
134 public DelegateWorkerSendFunctions(
135 ArrayList<InnerVertexSender> workerSendFunctions) {
136 this.workerSendFunctions = workerSendFunctions;
137 }
138
139 @Override
140 public void vertexSend(Vertex<I, V, E> vertex) {
141 for (InnerVertexSender functions : workerSendFunctions) {
142 if (functions != null) {
143 functions.vertexSend(vertex);
144 }
145 }
146 }
147
148 @Override
149 public void postprocess() {
150 for (InnerVertexSender functions : workerSendFunctions) {
151 if (functions != null) {
152 functions.postprocess();
153 }
154 }
155 }
156 }
157
158
159 protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
160 private final ArrayList<VertexReceiver<I, V, E, M>>
161 workerReceiveFunctions;
162
163 public DelegateWorkerReceiveFunctions(
164 ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
165 this.workerReceiveFunctions = workerReceiveFunctions;
166 }
167
168 @Override
169 public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
170 for (VertexReceiver<I, V, E, M> functions :
171 workerReceiveFunctions) {
172 if (functions != null) {
173 functions.vertexReceive(vertex, messages);
174 }
175 }
176 }
177
178 @Override
179 public void postprocess() {
180 for (VertexReceiver<I, V, E, M> functions :
181 workerReceiveFunctions) {
182 if (functions instanceof VertexPostprocessor) {
183 ((VertexPostprocessor) functions).postprocess();
184 }
185 }
186 }
187 }
188
189 @Override
190 public void masterCompute(BlockMasterApi api, S executionStage) {
191 for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
192 piece.masterCompute(api, executionStage);
193 }
194 }
195
196 @Override
197 public void workerContextSend(
198 BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
199 WV workerValue) {
200 for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
201 piece.workerContextSend(workerContextApi, executionStage, workerValue);
202 }
203 }
204
205 @Override
206 public void workerContextReceive(
207 BlockWorkerContextReceiveApi workerContextApi, S executionStage,
208 WV workerValue, List<WM> workerMessages) {
209 for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
210 piece.workerContextReceive(
211 workerContextApi, executionStage, workerValue, workerMessages);
212 }
213 }
214
215 @Override
216 public S nextExecutionStage(S executionStage) {
217 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
218 executionStage = innerPiece.nextExecutionStage(executionStage);
219 }
220 return executionStage;
221 }
222
223 @Override
224 public MessageClasses<I, M> getMessageClasses(
225 ImmutableClassesGiraphConfiguration conf) {
226 MessageClasses<I, M> messageClasses = null;
227 MessageClasses<I, M> firstMessageClasses = null;
228 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
229 MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
230 Preconditions.checkState(cur != null);
231 if (!cur.getMessageClass().equals(NoMessage.class)) {
232 if (messageClasses != null) {
233 throw new RuntimeException(
234 "Only one piece combined through delegate (" +
235 toString() + ") can send messages");
236 }
237 messageClasses = cur;
238 }
239 if (firstMessageClasses == null) {
240 firstMessageClasses = cur;
241 }
242 }
243 return messageClasses != null ? messageClasses : firstMessageClasses;
244 }
245
246 @Override
247 public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
248 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
249 innerPiece.forAllPossiblePieces(consumer);
250 }
251 }
252
253 @Override
254 public PieceCount getPieceCount() {
255 return new PieceCount(1);
256 }
257
258 @SuppressWarnings("deprecation")
259 @Override
260 public void registerAggregators(BlockMasterApi master)
261 throws InstantiationException, IllegalAccessException {
262 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
263 innerPiece.registerAggregators(master);
264 }
265 }
266
267 @Override
268 public void wrappedRegisterReducers(
269 BlockMasterApi masterApi, S executionStage) {
270 for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
271 innerPiece.wrappedRegisterReducers(masterApi, executionStage);
272 }
273 }
274
275 protected String delegationName() {
276 return "Delegate";
277 }
278
279 @Override
280 public String toString() {
281 return delegationName() + innerPieces.toString();
282 }
283 }