1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework;
1920importstatic org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
2122import java.lang.reflect.Field;
23import java.lang.reflect.Modifier;
2425import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
26import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
27import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
28import org.apache.giraph.block_app.framework.block.Block;
29import org.apache.giraph.block_app.framework.block.PieceCount;
30import org.apache.giraph.block_app.framework.piece.AbstractPiece;
31import org.apache.giraph.block_app.framework.piece.Piece;
32import org.apache.giraph.conf.BooleanConfOption;
33import org.apache.giraph.conf.ClassConfOption;
34import org.apache.giraph.conf.GiraphConfiguration;
35import org.apache.giraph.conf.GiraphConstants;
36import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37import org.apache.giraph.conf.MessageClasses;
38import org.apache.giraph.function.Consumer;
39import org.apache.giraph.types.NoMessage;
40import org.apache.giraph.utils.ReflectionUtils;
41import org.apache.hadoop.conf.Configuration;
42import org.apache.log4j.Logger;
4344import com.google.common.base.Preconditions;
4546/**47 * Utility functions for block applications48 */49 @SuppressWarnings({ "rawtypes", "unchecked" })
50publicclassBlockUtils {
51/** Property describing BlockFactory to use for current application run */52publicstaticfinal ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS =
53 ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
54"block factory describing giraph job");
5556/** Property describing block worker context value class to use */57publicstaticfinal ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
58 ClassConfOption.create(
59"digraph.block_worker_context_value_class",
60 Object.class, Object.class,
61"block worker context value class");
6263/** Property describing whether to log execution status as application runs */64publicstaticfinal65BooleanConfOption LOG_EXECUTION_STATUS = newBooleanConfOption(
66"giraph.block_utils.log_execution_status", true,
67"Log execution status (of which pieces are being executed, etc)");
6869privatestaticfinal Logger LOG = Logger.getLogger(BlockUtils.class);
7071/** Dissallow constructor */72privateBlockUtils() { }
7374/**75 * Create new BlockFactory that is specified in the configuration.76 */77publicstatic <S> BlockFactory<S> createBlockFactory(Configuration conf) {
78return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf));
79 }
8081/**82 * Set which BlockFactory class to be used for the application.83 * (generally useful within tests only)84 */85publicstaticvoid setBlockFactoryClass(Configuration conf,
86 Class<? extends BlockFactory<?>> clazz) {
87 BLOCK_FACTORY_CLASS.set(conf, clazz);
88 }
8990/**91 * Set block factory, and initialize configs with it.92 * Should be used only if there are no configuration options set after93 * this method call.94 */95publicstaticvoid setAndInitBlockFactoryClass(GiraphConfiguration conf,
96 Class<? extends BlockFactory<?>> clazz) {
97 BLOCK_FACTORY_CLASS.set(conf, clazz);
98 initAndCheckConfig(conf);
99 }
100101/**102 * Initializes configuration, such that running it executes block application.103 *104 * Additionally, checks types of all pieces with a block application.105 */106publicstaticvoid initAndCheckConfig(GiraphConfiguration conf) {
107 conf.setMasterComputeClass(BlockMasterCompute.class);
108 conf.setComputationClass(BlockComputation.class);
109 conf.setWorkerContextClass(BlockWorkerContext.class);
110111 Preconditions.checkState(
112 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
113"Message types should only be specified in Pieces, " +
114"but outgoing was specified globally");
115 Preconditions.checkState(
116 GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
117 .isDefaultValue(conf),
118"Message types should only be specified in Pieces, " +
119"but factory was specified globally");
120 Preconditions.checkState(
121 GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
122"Message combiner should only be specified in Pieces, " +
123"but was specified globally");
124125 BlockFactory<?> blockFactory = createBlockFactory(conf);
126 blockFactory.initConfig(conf);
127128 Preconditions.checkState(
129 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
130"Outgoing message type was specified in blockFactory.initConfig");
131 Preconditions.checkState(
132 GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
133 .isDefaultValue(conf),
134"Outgoing message factory type was specified in " +
135"blockFactory.initConfig");
136 Preconditions.checkState(
137 GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
138"Message combiner type was specified in blockFactory.initConfig");
139140 GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class);
141142finalImmutableClassesGiraphConfiguration immConf =
143new ImmutableClassesGiraphConfiguration<>(conf);
144145// Create blocks to detect issues before creating a Giraph job146// They will not be used here147Block executionBlock = blockFactory.createBlock(immConf);
148 checkBlockTypes(
149 executionBlock, blockFactory.createExecutionStage(immConf), immConf);
150151PieceCount pieceCount = executionBlock.getPieceCount();
152if (pieceCount.isKnown()) {
153 GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
154 }
155156// check for non 'static final' fields in BlockFactories157 Class<?> bfClass = blockFactory.getClass();
158while (!bfClass.equals(Object.class)) {
159for (Field field : bfClass.getDeclaredFields()) {
160if (!Modifier.isStatic(field.getModifiers()) ||
161 !Modifier.isFinal(field.getModifiers())) {
162thrownew IllegalStateException("BlockFactory (" + bfClass +
163") cannot have any mutable (non 'static final') fields as a " +
164"safety measure, as createBlock function is called from a " +
165"different context then all other functions, use conf argument " +
166"instead, or make it 'static final'. Field present: " + field);
167 }
168 }
169 bfClass = bfClass.getSuperclass();
170 }
171172// Register outputs173 blockFactory.registerOutputs(conf);
174 }
175176publicstaticvoid checkBlockTypes(
177Block executionBlock, Object executionStage,
178finalImmutableClassesGiraphConfiguration conf) {
179 LOG.info("Executing application - " + executionBlock);
180181final Class<?> vertexIdClass = conf.getVertexIdClass();
182final Class<?> vertexValueClass = conf.getVertexValueClass();
183final Class<?> edgeValueClass = conf.getEdgeValueClass();
184final Class<?> workerContextValueClass =
185 BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
186final Class<?> executionStageClass = executionStage.getClass();
187188// Check for type inconsistencies189 executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
190 @Override
191publicvoid apply(AbstractPiece piece) {
192if (!piece.getClass().equals(Piece.class)) {
193 Class<?>[] classList = getTypeArguments(
194 AbstractPiece.class, piece.getClass());
195 Preconditions.checkArgument(classList.length == 7);
196197 ReflectionUtils.verifyTypes(
198 vertexIdClass, classList[0], "vertexId", piece.getClass());
199 ReflectionUtils.verifyTypes(
200 vertexValueClass, classList[1], "vertexValue", piece.getClass());
201 ReflectionUtils.verifyTypes(
202 edgeValueClass, classList[2], "edgeValue", piece.getClass());
203204MessageClasses classes = piece.getMessageClasses(conf);
205 Class<?> messageType = classes.getMessageClass();
206if (messageType == null) {
207 messageType = NoMessage.class;
208 }
209 ReflectionUtils.verifyTypes(
210 messageType, classList[3], "message", piece.getClass());
211212 ReflectionUtils.verifyTypes(
213 workerContextValueClass, classList[4],
214"workerContextValue", piece.getClass());
215// No need to check worker context message class at all216217 ReflectionUtils.verifyTypes(
218 executionStageClass, classList[6],
219"executionStage", piece.getClass());
220 }
221 }
222 });
223 }
224 }