1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.bsp;
20
21 import java.io.IOException;
22 import java.util.List;
23
24 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
25 import org.apache.giraph.master.MasterCompute;
26 import org.apache.giraph.master.MasterGlobalCommHandler;
27 import org.apache.giraph.worker.WorkerInfo;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.zookeeper.KeeperException;
31
32 /**
33 * At most, there will be one active master at a time, but many threads can
34 * be trying to be the active master.
35 *
36 * @param <I> Vertex id
37 * @param <V> Vertex value
38 * @param <E> Edge value
39 */
40 @SuppressWarnings("rawtypes")
41 public interface CentralizedServiceMaster<I extends WritableComparable,
42 V extends Writable, E extends Writable> extends
43 CentralizedService<I, V, E> {
44 /**
45 * Setup (must be called prior to any other function)
46 */
47 void setup();
48
49 /**
50 * Become the master.
51 * @return true if became the master, false if the application is done.
52 */
53 boolean becomeMaster();
54
55 /**
56 * Check all the {@link org.apache.giraph.worker.WorkerInfo} objects to ensure
57 * that a minimum number of good workers exists out of the total that have
58 * reported.
59 *
60 * @return List of of healthy workers such that the minimum has been
61 * met, otherwise null
62 */
63 List<WorkerInfo> checkWorkers();
64
65 /**
66 * Create the {@link BspInputSplit} objects from the index range based on the
67 * user-defined MappingInputFormat. The {@link BspInputSplit} objects will
68 * processed by the workers later on during the INPUT_SUPERSTEP.
69 *
70 * @return Number of splits. Returns -1 on failure to create
71 * valid input splits.
72 */
73 int createMappingInputSplits();
74
75 /**
76 * Create the {@link BspInputSplit} objects from the index range based on the
77 * user-defined VertexInputFormat. The {@link BspInputSplit} objects will
78 * processed by the workers later on during the INPUT_SUPERSTEP.
79 *
80 * @return Number of splits. Returns -1 on failure to create
81 * valid input splits.
82 */
83 int createVertexInputSplits();
84
85 /**
86 * Create the {@link BspInputSplit} objects from the index range based on the
87 * user-defined EdgeInputFormat. The {@link BspInputSplit} objects will
88 * processed by the workers later on during the INPUT_SUPERSTEP.
89 *
90 * @return Number of splits. Returns -1 on failure to create
91 * valid input splits.
92 */
93 int createEdgeInputSplits();
94
95 /**
96 * Master coordinates the superstep
97 *
98 * @return State of the application as a result of this superstep
99 * @throws InterruptedException
100 * @throws KeeperException
101 */
102 SuperstepState coordinateSuperstep()
103 throws KeeperException, InterruptedException;
104
105 /**
106 * Master can decide to restart from the last good checkpoint if a
107 * worker fails during a superstep.
108 *
109 * @param checkpoint Checkpoint to restart from
110 */
111 void restartFromCheckpoint(long checkpoint);
112
113 /**
114 * Get the last known good checkpoint
115 *
116 * @return Last good superstep number
117 * @throws IOException
118 */
119 long getLastGoodCheckpoint() throws IOException;
120
121 /**
122 * If the master decides that this job doesn't have the resources to
123 * continue, it can fail the job. It can also designate what to do next.
124 * Typically this is mainly informative.
125 *
126 * @param state State of the application.
127 * @param applicationAttempt Attempt to start on
128 * @param desiredSuperstep Superstep to restart from (if applicable)
129 */
130 void setJobState(ApplicationState state,
131 long applicationAttempt,
132 long desiredSuperstep);
133
134 /**
135 * Get handler for global communication
136 *
137 * @return Global communication handler
138 */
139 MasterGlobalCommHandler getGlobalCommHandler();
140
141 /**
142 * Handler for aggregators to reduce/broadcast translation
143 *
144 * @return aggregator translation handler
145 */
146 AggregatorToGlobalCommTranslation getAggregatorTranslationHandler();
147
148 /**
149 * Get MasterCompute object
150 *
151 * @return MasterCompute object
152 */
153 MasterCompute getMasterCompute();
154
155 /**
156 * Superstep has finished.
157 */
158 void postSuperstep();
159
160 /**
161 * Application has finished.
162 */
163 void postApplication();
164
165 /**
166 * Called when the job fails in order to let the Master do any cleanup.
167 *
168 * @param e Exception job failed from. May be null.
169 */
170 void failureCleanup(Exception e);
171
172
173 /**
174 * Clean up the service (no calls may be issued after this)
175 *
176 * @throws IOException
177 * @throws InterruptedException
178 * @param superstepState what was the state
179 * of the last complete superstep?
180 */
181 void cleanup(SuperstepState superstepState)
182 throws IOException, InterruptedException;
183
184 /**
185 * Add the Giraph Timers to thirft counter struct, and send to the job client
186 * Counters include the Giraph Timers for setup, initialise, shutdown, total,
187 * and time for the given superstep
188 * @param superstep superstep for which the GiraphTimer will be sent
189 *
190 */
191 void addGiraphTimersAndSendCounters(long superstep);
192 }