1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import org.apache.giraph.graph.BasicComputation;
22import org.apache.giraph.graph.Vertex;
23import org.apache.hadoop.io.DoubleWritable;
24import org.apache.hadoop.io.FloatWritable;
25import org.apache.hadoop.io.LongWritable;
26import org.apache.log4j.Logger;
2728import java.io.IOException;
2930/**31 * Vertex to allow unit testing of failure detection32 */33publicclassSimpleFailComputationextends BasicComputation<
34 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
35/** Class logger */36privatestatic Logger LOG = Logger.getLogger(SimpleFailComputation.class);
37/** TODO: Change this behavior to WorkerContext */38privatestaticlong SUPERSTEP = 0;
3940 @Override
41publicvoid compute(
42 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
43 Iterable<DoubleWritable> messages) throws IOException {
44if (getSuperstep() >= 1) {
45double sum = 0;
46for (DoubleWritable message : messages) {
47 sum += message.get();
48 }
49 DoubleWritable vertexValue =
50new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
51 vertex.setValue(vertexValue);
52if (getSuperstep() < 30) {
53if (getSuperstep() == 20) {
54if (vertex.getId().get() == 10L) {
55try {
56 Thread.sleep(2000);
57 } catch (InterruptedException e) {
58 LOG.info("Sleep interrupted ", e);
59 }
60 System.exit(1);
61 } elseif (getSuperstep() - SUPERSTEP > 10) {
62return;
63 }
64 }
65long edges = vertex.getNumEdges();
66 sendMessageToAllEdges(vertex,
67new DoubleWritable(vertex.getValue().get() / edges));
68 } else {
69 vertex.voteToHalt();
70 }
71 setSuperstep(getSuperstep());
72 }
73 }
7475/**76 * Set the superstep77 *78 * @param superstep to set79 */80privatestaticvoid setSuperstep(long superstep) {
81 SUPERSTEP = superstep;
82 }
83 }