1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.piece;
1920import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22import org.apache.giraph.block_app.framework.api.CreateReducersApi;
23import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
24import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
26import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
27import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
28import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
30import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
31import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
32import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
33import org.apache.giraph.combiner.MessageCombiner;
34import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
35import org.apache.giraph.conf.EnumConfOption;
36import org.apache.giraph.conf.GiraphConfigurationSettable;
37import org.apache.giraph.conf.GiraphConstants;
38import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39import org.apache.giraph.conf.MessageClasses;
40import org.apache.giraph.factories.MessageValueFactory;
41import org.apache.giraph.graph.Vertex;
42import org.apache.giraph.types.NoMessage;
43import org.apache.hadoop.io.DoubleWritable;
44import org.apache.hadoop.io.FloatWritable;
45import org.apache.hadoop.io.IntWritable;
46import org.apache.hadoop.io.LongWritable;
47import org.apache.hadoop.io.Writable;
48import org.apache.hadoop.io.WritableComparable;
4950import com.google.common.base.Preconditions;
5152/**53 * Additional abstract implementations for all pieces to be used.54 * Code here is not in AbstractPiece only to allow for non-standard55 * non-user-defined pieces. <br>56 * Only logic used by the underlying framework directly is in AbstractPiece57 * itself.58 *59 * @param <I> Vertex id type60 * @param <V> Vertex value type61 * @param <E> Edge value type62 * @param <M> Message type63 * @param <WV> Worker value type64 * @param <WM> Worker message type65 * @param <S> Execution stage type66 */67 @SuppressWarnings({ "rawtypes", "unchecked" })
68publicabstractclass DefaultParentPiece<I extends WritableComparable,
69 V extends Writable, E extends Writable, M extends Writable, WV,
70 WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
71// TODO move to GiraphConstants72/**73 * This option will tell which message encode & store enum to force,74 * when combining is not enabled.75 *76 * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper77 * and lower bound on message store type, when looking them in order from78 * not doing anything special, to most advanced type:79 * BYTEARRAY_PER_PARTITION,80 * EXTRACT_BYTEARRAY_PER_PARTITION,81 * POINTER_LIST_PER_VERTEX82 * resulting encode type is going to be:83 * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?84 * POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)85 * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);86 *87 * This is useful to force all pieces onto particular message store, even88 * if they do not overrideallowOneMessageToManyIdsEncoding, though that might89 * be rarely needed.90 * This option might be more useful for fully local computation,91 * where overall job behavior is quite different.92 */93publicstaticfinal EnumConfOption<MessageEncodeAndStoreType>
94 MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
95 EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
96 MessageEncodeAndStoreType.class,
97 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
98"Select the message_encode_and_store_type min force to use");
99100privatefinalReduceUtilsObject reduceUtils = newReduceUtilsObject();
101privateReducersForPieceHandler reducersHandler;
102103// Overridable functions104105/**106 * Override to register any potential reducers used by this piece,107 * through calls to {@code reduceApi}, which will return reducer handles108 * for simple.109 * Tip: Without defining a field, first write here name of the field and what110 * you want to reduce, like:111 * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }112 * and then use tools your IDE provides to generate field signature itself,113 * which might be slightly complex:114 * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }115 */116publicvoid registerReducers(CreateReducersApi reduceApi, S executionStage) {
117 }
118119/**120 * Override to do vertex send processing.121 *122 * Creates handler that defines what should be executed on worker123 * during send phase.124 *125 * This logic gets executed first.126 * This function is called once on each worker on each thread, in parallel,127 * on their copy of Piece object to create functions handler.128 *129 * If returned object implements Postprocessor interface, then corresponding130 * postprocess() function is going to be called once, after all vertices131 * corresponding thread needed to process are done.132 */133public VertexSender<I, V, E> getVertexSender(
134 BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
135returnnull;
136 }
137138/**139 * Override to specify type of the message this Piece sends, if it does140 * send messages.141 *142 * If not overwritten, no messages can be sent.143 */144protected Class<M> getMessageClass() {
145returnnull;
146 }
147148/**149 * Override to specify message value factory to be used,150 * which creates objects into which messages will be deserialized.151 *152 * If not overwritten, or null is returned, DefaultMessageValueFactory153 * will be used.154 */155protected MessageValueFactory<M> getMessageFactory(
156ImmutableClassesGiraphConfiguration conf) {
157returnnull;
158 }
159160/**161 * Override to specify message combiner to be used, if any.162 *163 * Message combiner itself should be immutable164 * (i.e. it will be call simultanously from multiple threads)165 */166protected MessageCombiner<? super I, M> getMessageCombiner(
167ImmutableClassesGiraphConfiguration conf) {
168returnnull;
169 }
170171/**172 * Override to specify that this Piece allows one to many ids encoding to be173 * used for messages.174 * You should override this function, if you are sending identical message to175 * all targets, and message itself is not extremely small.176 */177protectedboolean allowOneMessageToManyIdsEncoding() {
178return false;
179 }
180181/**182 * Override to specify that receive of this Piece (and send of next Piece)183 * ignore existing vertices, and just process received messages.184 *185 * Useful when distributed processing on groups that are not vertices is186 * needed. This flag allows you not to worry whether a destination vertex187 * exist, and removes need to clean it up when finished.188 * One example is if each vertex is in a cluster, and we need to process189 * something per cluster.190 *191 * Alternative are reducers, which have distributed reduction, but mostly192 * master still does the processing afterwards, and amount of data needs to193 * fit single machine (master).194 */195protectedboolean receiveIgnoreExistingVertices() {
196return false;
197 }
198199 @Override
200public MessageClasses<I, M> getMessageClasses(
201ImmutableClassesGiraphConfiguration conf) {
202 Class<M> messageClass = null;
203 MessageValueFactory<M> messageFactory = getMessageFactory(conf);
204 MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
205206if (messageFactory != null) {
207 messageClass = (Class) messageFactory.newInstance().getClass();
208 } elseif (messageCombiner != null) {
209 messageClass = (Class) messageCombiner.createInitialMessage().getClass();
210 }
211212if (messageClass != null) {
213 Preconditions.checkState(getMessageClass() == null,
214"Piece %s defines getMessageFactory or getMessageCombiner, " +
215"so it doesn't need to define getMessageClass.",
216 toString());
217 } else {
218 messageClass = getMessageClass();
219if (messageClass == null) {
220 messageClass = (Class) NoMessage.class;
221 }
222 }
223224 SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
225if (messageFactory != null) {
226 messageFactorySupplier =
227new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
228 } else {
229 messageFactorySupplier =
230new DefaultMessageFactorySupplierFromConf<>(messageClass);
231 }
232233 SupplierFromConf<? extends MessageCombiner<? super I, M>>
234 messageCombinerSupplier;
235if (messageCombiner != null) {
236 messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
237 } else {
238 messageCombinerSupplier = null;
239 }
240241int maxAllowed =
242 GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
243int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
244 Preconditions.checkState(maxAllowed >= minForce);
245246int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
247 MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
248 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
249// bound piece type with boundaries:250 pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
251252MessageEncodeAndStoreType messageEncodeAndStoreType =
253 MessageEncodeAndStoreType.values()[pieceEncodeType];
254255if (messageFactory instanceof GiraphConfigurationSettable) {
256thrownew IllegalStateException(
257 messageFactory.getClass() + " MessageFactory in " + this +
258" Piece implements GiraphConfigurationSettable");
259 }
260if (messageCombiner instanceof GiraphConfigurationSettable) {
261thrownew IllegalStateException(
262 messageCombiner.getClass() + " MessageCombiner in " + this +
263" Piece implements GiraphConfigurationSettable");
264 }
265266returnnew ObjectMessageClasses<>(
267 messageClass, messageFactorySupplier,
268 messageCombinerSupplier, messageEncodeAndStoreType,
269 receiveIgnoreExistingVertices());
270 }
271272// Internal implementation273274 @Override
275publicfinalInnerVertexSender getWrappedVertexSender(
276final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
277 reducersHandler.vertexSenderWorkerPreprocess(workerApi);
278final VertexSender<I, V, E> functions =
279 getVertexSender(workerApi, executionStage);
280returnnewInnerVertexSender() {
281 @Override
282publicvoid vertexSend(Vertex<I, V, E> vertex) {
283if (functions != null) {
284 functions.vertexSend(vertex);
285 }
286 }
287 @Override
288publicvoid postprocess() {
289if (functions instanceof VertexPostprocessor) {
290 ((VertexPostprocessor) functions).postprocess();
291 }
292 reducersHandler.vertexSenderWorkerPostprocess(workerApi);
293 }
294 };
295 }
296297 @Override
298publicfinalvoid wrappedRegisterReducers(
299BlockMasterApi masterApi, S executionStage) {
300 reducersHandler = newReducersForPieceHandler();
301 registerReducers(newCreateReducersApiWrapper(
302 masterApi, reducersHandler), executionStage);
303 }
304305// utility functions:306// TODO Java8 - move these as default functions to VertexSender interface307protectedfinalvoid reduceDouble(
308 ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
309 reduceUtils.reduceDouble(reduceHandle, value);
310 }
311312protectedfinalvoid reduceFloat(
313 ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
314 reduceUtils.reduceFloat(reduceHandle, value);
315 }
316317protectedfinalvoid reduceLong(
318 ReducerHandle<LongWritable, ?> reduceHandle, long value) {
319 reduceUtils.reduceLong(reduceHandle, value);
320 }
321322protectedfinalvoid reduceInt(
323 ReducerHandle<IntWritable, ?> reduceHandle, int value) {
324 reduceUtils.reduceInt(reduceHandle, value);
325 }
326 }