1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.worker;
2021import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
24import java.util.List;
2526import org.apache.giraph.bsp.CentralizedServiceWorker;
27import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
28import org.apache.giraph.graph.GraphState;
29import org.apache.hadoop.io.Writable;
30import org.apache.hadoop.io.WritableComparable;
31import org.apache.hadoop.mapreduce.Mapper;
3233/**34 * WorkerContext allows for the execution of user code35 * on a per-worker basis. There's one WorkerContext per worker.36 */37 @SuppressWarnings("rawtypes")
38publicabstractclassWorkerContext39extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
40implements Writable, WorkerIndexUsage<WritableComparable> {
41/** Global graph state */42privateGraphState graphState;
4344/** Service worker */45privateCentralizedServiceWorker serviceWorker;
46/** All workers info */47privateAllWorkersInfo allWorkersInfo;
4849/**50 * Set the graph state.51 *52 * @param graphState Used to set the graph state.53 */54publicfinalvoid setGraphState(GraphState graphState) {
55this.graphState = graphState;
56 }
5758/**59 * Setup superstep.60 *61 * @param serviceWorker Service worker containing all the information62 */63publicfinalvoid setupSuperstep(
64 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65this.serviceWorker = serviceWorker;
66 allWorkersInfo = newAllWorkersInfo(
67 serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
68 }
6970/**71 * Initialize the WorkerContext.72 * This method is executed once on each Worker before the first73 * superstep starts.74 *75 * @throws IllegalAccessException Thrown for getting the class76 * @throws InstantiationException Expected instantiation in this method.77 */78publicabstractvoid preApplication() throws InstantiationException,
79 IllegalAccessException;
8081/**82 * Finalize the WorkerContext.83 * This method is executed once on each Worker after the last84 * superstep ends.85 */86publicabstractvoid postApplication();
8788/**89 * Execute user code.90 * This method is executed once on each Worker before each91 * superstep starts.92 */93publicabstractvoid preSuperstep();
9495/**96 * Get number of workers97 *98 * @return Number of workers99 */100 @Override
101publicfinalint getWorkerCount() {
102return allWorkersInfo.getWorkerCount();
103 }
104105/**106 * Get index for this worker107 *108 * @return Index of this worker109 */110 @Override
111publicfinalint getMyWorkerIndex() {
112return allWorkersInfo.getMyWorkerIndex();
113 }
114115 @Override
116publicfinalint getWorkerForVertex(WritableComparable vertexId) {
117return allWorkersInfo.getWorkerIndex(
118 serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
119 }
120121/**122 * Get messages which other workers sent to this worker and clear them (can123 * be called once per superstep)124 *125 * @return Messages received126 */127publicfinal List<Writable> getAndClearMessagesFromOtherWorkers() {
128return serviceWorker.getServerData().
129 getAndClearCurrentWorkerToWorkerMessages();
130 }
131132/**133 * Send message to another worker134 *135 * @param message Message to send136 * @param workerIndex Index of the worker to send the message to137 */138publicfinalvoid sendMessageToWorker(Writable message, int workerIndex) {
139SendWorkerToWorkerMessageRequest request =
140newSendWorkerToWorkerMessageRequest(message);
141if (workerIndex == getMyWorkerIndex()) {
142 request.doRequest(serviceWorker.getServerData());
143 } else {
144 serviceWorker.getWorkerClient().sendWritableRequest(
145 allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request);
146 }
147 }
148149/**150 * Execute user code.151 * This method is executed once on each Worker after each152 * superstep ends.153 */154publicabstractvoid postSuperstep();
155156/**157 * Retrieves the current superstep.158 *159 * @return Current superstep160 */161publicfinallong getSuperstep() {
162return graphState.getSuperstep();
163 }
164165/**166 * Get the total (all workers) number of vertices that167 * existed in the previous superstep.168 *169 * @return Total number of vertices (-1 if first superstep)170 */171publicfinallong getTotalNumVertices() {
172return graphState.getTotalNumVertices();
173 }
174175/**176 * Get the total (all workers) number of edges that177 * existed in the previous superstep.178 *179 * @return Total number of edges (-1 if first superstep)180 */181publicfinallong getTotalNumEdges() {
182return graphState.getTotalNumEdges();
183 }
184185/**186 * Get the mapper context187 *188 * @return Mapper context189 */190publicfinal Mapper.Context getContext() {
191return graphState.getContext();
192 }
193194/**195 * Call this to log a line to command line of the job. Use in moderation -196 * it's a synchronous call to Job client197 *198 * @param line Line to print199 */200publicfinalvoid logToCommandLine(String line) {
201 serviceWorker.getJobProgressTracker().logInfo(line);
202 }
203204 @Override
205publicvoid write(DataOutput dataOutput) throws IOException {
206 }
207208 @Override
209publicvoid readFields(DataInput dataInput) throws IOException {
210 }
211 }