1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.worker;
20
21 import org.apache.giraph.job.JobProgressTracker;
22 import org.apache.giraph.utils.ThreadUtils;
23 import org.apache.log4j.Logger;
24
25 /**
26 * Class which periodically writes worker's progress to zookeeper
27 */
28 public class WorkerProgressWriter {
29 /** Class logger */
30 private static final Logger LOG =
31 Logger.getLogger(WorkerProgressWriter.class);
32 /** How often to update worker's progress */
33 private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
34
35 /** Job progress tracker */
36 private final JobProgressTracker jobProgressTracker;
37 /** Thread which writes worker's progress */
38 private final Thread writerThread;
39 /** Whether worker finished application */
40 private volatile boolean finished = false;
41
42 /**
43 * Constructor, starts separate thread to periodically update worker's
44 * progress
45 *
46 * @param jobProgressTracker JobProgressTracker to report job progress to
47 */
48 public WorkerProgressWriter(JobProgressTracker jobProgressTracker) {
49 this.jobProgressTracker = jobProgressTracker;
50 writerThread = ThreadUtils.startThread(new Runnable() {
51 @Override
52 public void run() {
53 while (!finished) {
54 updateAndSendProgress();
55 double factor = 1 + Math.random();
56 if (!ThreadUtils.trySleep(
57 (long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor))) {
58 break;
59 }
60 }
61 }
62 }, "workerProgressThread");
63 }
64
65 /**
66 * Update worker progress and send it
67 */
68 private void updateAndSendProgress() {
69 WorkerProgress.get().updateMemory();
70 jobProgressTracker.updateProgress(WorkerProgress.get());
71 }
72
73 /**
74 * Stop the thread which writes worker's progress
75 */
76 public void stop() throws InterruptedException {
77 finished = true;
78 writerThread.interrupt();
79 writerThread.join();
80 updateAndSendProgress();
81 }
82 }