1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.master;
2021import java.util.List;
2223import org.apache.giraph.aggregators.Aggregator;
24import org.apache.giraph.bsp.CentralizedServiceMaster;
25import org.apache.giraph.combiner.MessageCombiner;
26import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
27import org.apache.giraph.conf.MessageClasses;
28import org.apache.giraph.graph.Computation;
29import org.apache.giraph.graph.GraphState;
30import org.apache.giraph.reducers.ReduceOperation;
31import org.apache.giraph.worker.WorkerInfo;
32import org.apache.hadoop.io.Writable;
33import org.apache.hadoop.io.WritableComparable;
34import org.apache.hadoop.mapreduce.Mapper;
3536/**37 * Interface for defining a master vertex that can perform centralized38 * computation between supersteps. This class will be instantiated on the39 * master node and will run every superstep before the workers do.40 *41 * Communication with the workers should be performed via aggregators. The42 * values of the aggregators are broadcast to the workers before43 * vertex.compute() is called and collected by the master before44 * master.compute() is called. This means aggregator values used by the workers45 * are consistent with aggregator values from the master from the same46 * superstep and aggregator used by the master are consistent with aggregator47 * values from the workers from the previous superstep.48 */49publicabstractclassMasterCompute50extendsDefaultImmutableClassesGiraphConfigurable51implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
52/** If true, do not do anymore computation on this vertex. */53privateboolean halt = false;
54/** Master aggregator usage */55privateCentralizedServiceMaster serviceMaster;
56/** Graph state */57privateGraphState graphState;
58/**59 * Computation and MessageCombiner classes used, which can be60 * switched by master61 */62privateSuperstepClasses superstepClasses;
6364/**65 * Must be defined by user to specify what the master has to do.66 */67publicabstractvoid compute();
6869/**70 * Initialize the MasterCompute class, this is the place to register71 * aggregators.72 *73 * @throws InstantiationException74 * @throws IllegalAccessException75 */76publicabstractvoid initialize() throws InstantiationException,
77 IllegalAccessException;
7879/**80 * Retrieves the current superstep.81 *82 * @return Current superstep83 */84publicfinallong getSuperstep() {
85return graphState.getSuperstep();
86 }
8788/**89 * Get the total (all workers) number of vertices that90 * existed in the previous superstep.91 *92 * @return Total number of vertices (-1 if first superstep)93 */94publicfinallong getTotalNumVertices() {
95return graphState.getTotalNumVertices();
96 }
9798/**99 * Get the total (all workers) number of edges that100 * existed in the previous superstep.101 *102 * @return Total number of edges (-1 if first superstep)103 */104publicfinallong getTotalNumEdges() {
105return graphState.getTotalNumEdges();
106 }
107108/**109 * After this is called, the computation will stop, even if there are110 * still messages in the system or vertices that have not voted to halt.111 */112publicfinalvoid haltComputation() {
113 halt = true;
114 }
115116/**117 * Has the master halted?118 *119 * @return True if halted, false otherwise.120 */121publicfinalboolean isHalted() {
122return halt;
123 }
124125/**126 * Get the mapper context127 *128 * @return Mapper context129 */130publicfinal Mapper.Context getContext() {
131return graphState.getContext();
132 }
133134/**135 * Get list of workers136 *137 * @return List of workers138 */139publicfinal List<WorkerInfo> getWorkerInfoList() {
140return serviceMaster.getWorkerInfoList();
141 }
142143/**144 * Set Computation class to be used145 *146 * @param computationClass Computation class147 */148publicfinalvoid setComputation(
149 Class<? extends Computation> computationClass) {
150 superstepClasses.setComputationClass(computationClass);
151 }
152153/**154 * Get Computation class to be used155 *156 * @return Computation class157 */158publicfinal Class<? extends Computation> getComputation() {
159// Might be called prior to classes being set, do not return NPE160if (superstepClasses == null) {
161returnnull;
162 }
163164return superstepClasses.getComputationClass();
165 }
166167/**168 * Set MessageCombiner class to be used169 *170 * @param combinerClass MessageCombiner class171 */172publicfinalvoid setMessageCombiner(
173 Class<? extends MessageCombiner> combinerClass) {
174 superstepClasses.setMessageCombinerClass(combinerClass);
175 }
176177/**178 * Get MessageCombiner class to be used179 *180 * @return MessageCombiner class181 */182publicfinal Class<? extends MessageCombiner> getMessageCombiner() {
183// Might be called prior to classes being set, do not return NPE184if (superstepClasses == null) {
185returnnull;
186 }
187188return superstepClasses.getMessageCombinerClass();
189 }
190191/**192 * Set incoming message class to be used193 *194 * @param incomingMessageClass incoming message class195 */196 @Deprecated
197publicfinalvoid setIncomingMessage(
198 Class<? extends Writable> incomingMessageClass) {
199 superstepClasses.setIncomingMessageClass(incomingMessageClass);
200 }
201202/**203 * Set outgoing message class to be used204 *205 * @param outgoingMessageClass outgoing message class206 */207publicfinalvoid setOutgoingMessage(
208 Class<? extends Writable> outgoingMessageClass) {
209 superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
210 }
211212/**213 * Set outgoing message classes to be used214 *215 * @param outgoingMessageClasses outgoing message classes216 */217publicvoid setOutgoingMessageClasses(
218 MessageClasses<? extends WritableComparable, ? extends Writable>
219 outgoingMessageClasses) {
220 superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
221 }
222223 @Override
224publicfinal <S, R extends Writable> void registerReducer(
225 String name, ReduceOperation<S, R> reduceOp) {
226 serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
227 }
228229 @Override
230publicfinal <S, R extends Writable> void registerReducer(
231 String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
232 serviceMaster.getGlobalCommHandler().registerReducer(
233 name, reduceOp, globalInitialValue);
234 }
235236 @Override
237publicfinal <T extends Writable> T getReduced(String name) {
238return serviceMaster.getGlobalCommHandler().getReduced(name);
239 }
240241 @Override
242publicfinalvoid broadcast(String name, Writable object) {
243 serviceMaster.getGlobalCommHandler().broadcast(name, object);
244 }
245246 @Override
247publicfinal <A extends Writable> boolean registerAggregator(
248 String name, Class<? extends Aggregator<A>> aggregatorClass)
249throws InstantiationException, IllegalAccessException {
250return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
251 name, aggregatorClass);
252 }
253254 @Override
255publicfinal <A extends Writable> boolean registerPersistentAggregator(
256 String name,
257 Class<? extends Aggregator<A>> aggregatorClass) throws258 InstantiationException, IllegalAccessException {
259return serviceMaster.getAggregatorTranslationHandler()
260 .registerPersistentAggregator(name, aggregatorClass);
261 }
262263 @Override
264publicfinal <A extends Writable> A getAggregatedValue(String name) {
265return serviceMaster.getAggregatorTranslationHandler()
266 .<A>getAggregatedValue(name);
267 }
268269 @Override
270publicfinal <A extends Writable> void setAggregatedValue(
271 String name, A value) {
272 serviceMaster.getAggregatorTranslationHandler()
273 .setAggregatedValue(name, value);
274 }
275276/**277 * Call this to log a line to command line of the job. Use in moderation -278 * it's a synchronous call to Job client279 *280 * @param line Line to print281 */282publicvoid logToCommandLine(String line) {
283 serviceMaster.getJobProgressTracker().logInfo(line);
284 }
285286publicfinalvoid setGraphState(GraphState graphState) {
287this.graphState = graphState;
288 }
289290publicfinalvoid setMasterService(CentralizedServiceMaster serviceMaster) {
291this.serviceMaster = serviceMaster;
292 }
293294publicfinalvoid setSuperstepClasses(SuperstepClasses superstepClasses) {
295this.superstepClasses = superstepClasses;
296 }
297 }