1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.library;
1920import java.util.Iterator;
2122import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23import org.apache.giraph.block_app.framework.block.Block;
24import org.apache.giraph.block_app.framework.block.SequenceBlock;
25import org.apache.giraph.combiner.MessageCombiner;
26import org.apache.giraph.function.Consumer;
27import org.apache.giraph.function.Function;
28import org.apache.giraph.function.ObjectTransfer;
29import org.apache.giraph.function.PairConsumer;
30import org.apache.giraph.function.vertex.ConsumerWithVertex;
31import org.apache.giraph.function.vertex.FunctionWithVertex;
32import org.apache.giraph.function.vertex.SupplierFromVertex;
33import org.apache.giraph.graph.Vertex;
34import org.apache.giraph.reducers.ReduceOperation;
35import org.apache.hadoop.io.Writable;
36import org.apache.hadoop.io.WritableComparable;
3738/**39 * Utility class for creating sequences of sending replies to received40 * messages. Current instance of this object represents partial chain,41 * where we have specified which messages will be send at the lastly defined42 * link in the chain thus far, but we haven't specified yet what to do when43 * vertices receive them.44 *45 * Contains set of:46 * - static startX methods, used to create the chain47 * - thenX methods, used to add one more Piece to the chain, can be48 * "chained" arbitrary number of times.49 * - endX methods, used to finish the chain, returning50 * the Block representing the whole chain51 *52 * If messageSupplier or targetsSupplier returns null, current vertex53 * is not going to send any messages.54 *55 * @param <I> Vertex id type56 * @param <V> Vertex value type57 * @param <E> Edge value type58 * @param <P> Previous value59 */60publicclass SendMessageChain<I extends WritableComparable, V extends Writable,
61 E extends Writable, P> {
62/**63 * Represent current partial chain. Given a way to consume messages64 * received in lastly defined link in this chain, it will produce block65 * representing a chain created thus far.66 */67privatefinal Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator;
6869privateSendMessageChain(
70 Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) {
71this.blockCreator = blockCreator;
72 }
7374/**75 * Start chain with sending message provided by messageSupplier to all76 * targets provided by targetsSupplier.77 */78publicstatic <I extends WritableComparable, V extends Writable,
79 E extends Writable, M extends Writable>
80 SendMessageChain<I, V, E, Iterable<M>> startSend(
81final String name,
82final Class<M> messageClass,
83final SupplierFromVertex<I, V, E, M> messageSupplier,
84final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
85returnnew SendMessageChain<>(
86new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
87 @Override
88publicBlock apply(
89 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
90return Pieces.sendMessage(
91 name, messageClass, messageSupplier,
92 targetsSupplier, messagesConsumer);
93 }
94 });
95 }
9697/**98 * Start chain with sending message provided by messageSupplier to all99 * targets provided by targetsSupplier, and use given messageCombiner to100 * combine messages together.101 */102publicstatic <I extends WritableComparable, V extends Writable,
103 E extends Writable, M extends Writable>
104 SendMessageChain<I, V, E, M> startSend(
105final String name,
106final MessageCombiner<? super I, M> messageCombiner,
107final SupplierFromVertex<I, V, E, M> messageSupplier,
108final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
109returnnew SendMessageChain<>(
110new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
111 @Override
112publicBlock apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
113return Pieces.sendMessage(
114 name, messageCombiner, messageSupplier,
115 targetsSupplier, messagesConsumer);
116 }
117 });
118 }
119120/**121 * Start chain with sending message provided by messageSupplier to all122 * neighbors of a current vertex.123 */124publicstatic <I extends WritableComparable, V extends Writable,
125 E extends Writable, M extends Writable>
126 SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors(
127final String name,
128final Class<M> messageClass,
129final SupplierFromVertex<I, V, E, M> messageSupplier) {
130return startSend(name, messageClass, messageSupplier,
131 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
132 }
133134/**135 * Start chain with sending message provided by messageSupplier to all136 * neighbors of a current vertex, and use given messageCombiner to137 * combine messages together.138 */139publicstatic <I extends WritableComparable, V extends Writable,
140 E extends Writable, M extends Writable>
141 SendMessageChain<I, V, E, M> startSendToNeighbors(
142final String name,
143final MessageCombiner<? super I, M> messageCombiner,
144final SupplierFromVertex<I, V, E, M> messageSupplier) {
145return startSend(name, messageCombiner, messageSupplier,
146 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
147 }
148149/**150 * Start chain by providing a function that will produce Block representing151 * beginning of the chain, given a consumer of messages send152 * by the last link in the created block.153 */154publicstatic <I extends WritableComparable, V extends Writable,
155 E extends Writable, P extends Writable>
156 SendMessageChain<I, V, E, P> startCustom(
157 Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) {
158returnnew SendMessageChain<>(createStartingBlock);
159 }
160161/**162 * Give previously received message(s) to messageSupplier, and send message163 * it returns to all targets provided by targetsSupplier.164 */165public <M extends Writable>
166 SendMessageChain<I, V, E, Iterable<M>> thenSend(
167final String name,
168final Class<M> messageClass,
169final FunctionWithVertex<I, V, E, P, M> messageSupplier,
170final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
171final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
172173returnnew SendMessageChain<>(
174new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
175 @Override
176publicBlock apply(
177 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
178returnnewSequenceBlock(
179 blockCreator.apply(
180 prevMessagesTransfer.<I, V, E>castToConsumer()),
181 Pieces.sendMessage(
182 name, messageClass,
183new SupplierFromVertex<I, V, E, M>() {
184 @Override
185public M get(Vertex<I, V, E> vertex) {
186return messageSupplier.apply(
187 vertex, prevMessagesTransfer.get());
188 }
189 },
190 targetsSupplier, messagesConsumer));
191 }
192 });
193 }
194195/**196 * Give previously received message(s) to messageSupplier, and send message197 * it returns to all neighbors of current vertex.198 */199public <M extends Writable>
200 SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors(
201final String name,
202final Class<M> messageClass,
203final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
204return thenSend(name, messageClass, messageSupplier,
205 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
206 }
207208/**209 * Give previously received message(s) to messageSupplier, and send message210 * it returns to all targets provided by targetsSupplier, and use given211 * messageCombiner to combine messages together.212 */213public <M extends Writable>
214 SendMessageChain<I, V, E, M> thenSend(
215final String name,
216final MessageCombiner<? super I, M> messageCombiner,
217final FunctionWithVertex<I, V, E, P, M> messageSupplier,
218final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
219final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
220221returnnew SendMessageChain<>(
222new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
223 @Override
224publicBlock apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
225returnnewSequenceBlock(
226 blockCreator.apply(
227 prevMessagesTransfer.<I, V, E>castToConsumer()),
228 Pieces.sendMessage(
229 name, messageCombiner,
230new SupplierFromVertex<I, V, E, M>() {
231 @Override
232public M get(Vertex<I, V, E> vertex) {
233return messageSupplier.apply(
234 vertex, prevMessagesTransfer.get());
235 }
236 },
237 targetsSupplier, messagesConsumer));
238 }
239 });
240 }
241242/**243 * Give previously received message(s) to messageSupplier, and send message244 * it returns to all neighbors of current vertex, and use given245 * messageCombiner to combine messages together.246 */247public <M extends Writable>
248 SendMessageChain<I, V, E, M> thenSendToNeighbors(
249final String name,
250final MessageCombiner<? super I, M> messageCombiner,
251final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
252return thenSend(name, messageCombiner, messageSupplier,
253 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
254 }
255256/**257 * End chain by giving received messages to valueSupplier,258 * to produce value that should be reduced, and consumed on master259 * by reducedValueConsumer.260 */261public <S, R extends Writable>
262Block endReduce(final String name, final ReduceOperation<S, R> reduceOp,
263final FunctionWithVertex<I, V, E, P, S> valueSupplier,
264final Consumer<R> reducedValueConsumer) {
265return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
266 @Override
267publicBlock apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
268return Pieces.reduce(
269 name,
270 reduceOp,
271new SupplierFromVertex<I, V, E, S>() {
272 @Override
273public S get(Vertex<I, V, E> vertex) {
274return valueSupplier.apply(vertex, prevMessages.get(vertex));
275 }
276 },
277 reducedValueConsumer);
278 }
279 });
280 }
281282/**283 * End chain by giving received messages to valueSupplier,284 * to produce value that should be reduced, and consumed on master285 * by reducedValueConsumer.286 */287public <S, R extends Writable>
288Block endReduceWithMaster(
289final String name, final ReduceOperation<S, R> reduceOp,
290final FunctionWithVertex<I, V, E, P, S> valueSupplier,
291final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
292return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
293 @Override
294publicBlock apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
295return Pieces.reduceWithMaster(
296 name,
297 reduceOp,
298new SupplierFromVertex<I, V, E, S>() {
299 @Override
300public S get(Vertex<I, V, E> vertex) {
301return valueSupplier.apply(vertex, prevMessages.get(vertex));
302 }
303 },
304 reducedValueConsumer);
305 }
306 });
307 }
308309/**310 * End chain by processing messages received within the last link311 * in the chain.312 */313publicBlock endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) {
314return blockCreator.apply(messagesConsumer);
315 }
316317/**318 * End chain by providing a function that will produce Block to be attached319 * to the end of current chain, given a supplier of messages received320 * within the last link in the chain.321 */322publicBlock endCustom(
323 Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) {
324final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
325returnnewSequenceBlock(
326 blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
327 createBlockToAttach.apply(
328 prevMessagesTransfer.<I, V, E>castToSupplier()));
329 }
330 }