This project has retired. For details please refer to its
Attic page.
SendMessagePiece xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library.internal;
19
20 import java.util.Iterator;
21
22 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
23 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
24 import org.apache.giraph.block_app.framework.block.Block;
25 import org.apache.giraph.block_app.framework.piece.Piece;
26 import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
27 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
28 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29 import org.apache.giraph.block_app.library.striping.StripingUtils;
30 import org.apache.giraph.function.Function;
31 import org.apache.giraph.function.Predicate;
32 import org.apache.giraph.function.primitive.Int2ObjFunction;
33 import org.apache.giraph.function.vertex.ConsumerWithVertex;
34 import org.apache.giraph.function.vertex.SupplierFromVertex;
35 import org.apache.giraph.graph.Vertex;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.io.WritableComparable;
38
39 import com.google.common.base.Preconditions;
40 import com.google.common.collect.Iterators;
41
42
43
44
45
46
47
48
49
50
51 @SuppressWarnings("rawtypes")
52 public class SendMessagePiece<I extends WritableComparable, V extends Writable,
53 E extends Writable, M extends Writable> extends Piece<I, V, E, M, Object> {
54 private final String name;
55 private final Class<M> messageClass;
56 private final SupplierFromVertex<I, V, E, M> messageSupplier;
57 private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
58 private final ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer;
59
60 public SendMessagePiece(String name,
61 Class<M> messageClass,
62 SupplierFromVertex<I, V, E, M> messageSupplier,
63 SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
64 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
65 Preconditions.checkNotNull(messageClass);
66 this.name = name;
67 this.messageClass = messageClass;
68 this.messageSupplier = messageSupplier;
69 this.targetsSupplier = targetsSupplier;
70 this.messagesConsumer = messagesConsumer;
71 }
72
73
74
75
76
77
78
79
80
81
82 public Block stripeByReceiver(
83 int stripes,
84 Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
85 return StripingUtils.generateStripedBlock(
86 stripes,
87 new Function<Predicate<I>, Block>() {
88 @Override
89 public Block apply(final Predicate<I> stripePredicate) {
90 return FilteringPiece.createReceiveFiltering(
91 new SupplierFromVertex<I, V, E, Boolean>() {
92 @Override
93 public Boolean get(Vertex<I, V, E> vertex) {
94 return stripePredicate.apply(vertex.getId());
95 }
96 },
97 new SendMessagePiece<>(
98 name,
99 messageClass,
100 messageSupplier,
101 new SupplierFromVertex<I, V, E, Iterator<I>>() {
102 @Override
103 public Iterator<I> get(Vertex<I, V, E> vertex) {
104 return Iterators.filter(
105 targetsSupplier.get(vertex),
106 new com.google.common.base.Predicate<I>() {
107 @Override
108 public boolean apply(I targetId) {
109 return stripePredicate.apply(targetId);
110 }
111 });
112 }
113 },
114 messagesConsumer));
115 }
116 },
117 stripeSupplier);
118 }
119
120
121 @Override
122 public VertexSender<I, V, E> getVertexSender(
123 final BlockWorkerSendApi<I, V, E, M> workerApi,
124 Object executionStage) {
125 return new InnerVertexSender() {
126 @Override
127 public void vertexSend(Vertex<I, V, E> vertex) {
128 Iterator<I> targets = targetsSupplier.get(vertex);
129 M message = messageSupplier.get(vertex);
130 if (message != null && targets != null && targets.hasNext()) {
131 workerApi.sendMessageToMultipleEdges(targets, message);
132 }
133 }
134 };
135 }
136
137 @Override
138 public VertexReceiver<I, V, E, M> getVertexReceiver(
139 BlockWorkerReceiveApi<I> workerApi,
140 Object executionStage) {
141 return new InnerVertexReceiver() {
142 @Override
143 public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
144 messagesConsumer.apply(vertex, messages);
145 }
146 };
147 }
148
149 @Override
150 public Class<M> getMessageClass() {
151 return messageClass;
152 }
153
154 @Override
155 public String toString() {
156 return name;
157 }
158 }