1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.metrics;
2021import org.apache.giraph.graph.GraphTaskManager;
22import org.apache.giraph.ooc.OutOfCoreEngine;
23import org.apache.giraph.ooc.OutOfCoreIOCallable;
24import org.apache.giraph.worker.BspServiceWorker;
25import org.apache.hadoop.io.Writable;
2627import com.yammer.metrics.core.Gauge;
2829import java.io.DataInput;
30import java.io.DataOutput;
31import java.io.IOException;
32import java.io.PrintStream;
33import java.util.concurrent.TimeUnit;
3435/**36 * Per-superstep metrics for a Worker.37 */38publicclassWorkerSuperstepMetricsimplements Writable {
39/** Total network communication time */40privateLongAndTimeUnit commTimer;
41/** Time for all compute calls to complete */42privateLongAndTimeUnit computeAllTimer;
43/** Time till first message gets flushed */44privateLongAndTimeUnit timeToFirstMsg;
45/** Total superstep time */46privateLongAndTimeUnit superstepTimer;
47/** Time spent waiting for other workers to finish */48privateLongAndTimeUnit waitRequestsTimer;
49/** Time spent doing GC in a superstep */50privateLongAndTimeUnit superstepGCTimer;
51/** Number of bytes loaded from disk to memory in out-of-core mechanism */52privatelong bytesLoadedFromDisk;
53/** Number of bytes stored from memory to disk in out-of-core mechanism */54privatelong bytesStoredOnDisk;
55/** Percentage of graph kept in memory */56privatedouble graphPercentageInMemory;
5758/**59 * Constructor60 */61publicWorkerSuperstepMetrics() {
62 commTimer = newLongAndTimeUnit();
63 computeAllTimer = newLongAndTimeUnit();
64 timeToFirstMsg = newLongAndTimeUnit();
65 superstepTimer = newLongAndTimeUnit();
66 waitRequestsTimer = newLongAndTimeUnit();
67 superstepGCTimer = newLongAndTimeUnit();
68 superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
69 bytesLoadedFromDisk = 0;
70 bytesStoredOnDisk = 0;
71 graphPercentageInMemory = 100;
72 }
7374/**75 * Read metric values from global MetricsRegistry.76 *77 * @return this object, for chaining78 */79publicWorkerSuperstepMetrics readFromRegistry() {
80 readGiraphTimer(GraphTaskManager.TIMER_COMMUNICATION_TIME, commTimer);
81 readGiraphTimer(GraphTaskManager.TIMER_COMPUTE_ALL, computeAllTimer);
82 readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
83 readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
84 readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
85SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep();
86 superstepGCTimer.setValue(
87 registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count());
88 bytesLoadedFromDisk =
89 registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count();
90 bytesStoredOnDisk =
91 registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count();
92 Gauge<Double> gauge =
93 registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
94if (gauge != null) {
95 graphPercentageInMemory = gauge.value();
96 }
97returnthis;
98 }
99100/**101 * Read data from GiraphTimer into a LongAndTimeUnit.102 *103 * @param name String name of Gauge to retrieve.104 * @param data LongAndTimeUnit to read data into.105 */106privatevoid readGiraphTimer(String name, LongAndTimeUnit data) {
107 Gauge<Long> gauge = GiraphMetrics.get().perSuperstep().
108 getExistingGauge(name);
109if (gauge instanceof GiraphTimer) {
110GiraphTimer giraphTimer = (GiraphTimer) gauge;
111 data.setTimeUnit(giraphTimer.getTimeUnit());
112 data.setValue(giraphTimer.value());
113 } elseif (gauge != null) {
114thrownew IllegalStateException(name + " is not a GiraphTimer");
115 }
116 }
117118/**119 * Human readable dump of metrics stored here.120 *121 * @param superstep long number of superstep.122 * @param out PrintStream to write to.123 * @return this object, for chaining124 */125publicWorkerSuperstepMetrics print(long superstep, PrintStream out) {
126 out.println();
127 out.println("--- METRICS: superstep " + superstep + " ---");
128 out.println(" superstep time: " + superstepTimer);
129 out.println(" compute all partitions: " + computeAllTimer);
130 out.println(" time spent in gc: " + superstepGCTimer);
131 out.println(" bytes transferred in out-of-core: " +
132 (bytesLoadedFromDisk + bytesStoredOnDisk));
133 out.println(" network communication time: " + commTimer);
134 out.println(" time to first message: " + timeToFirstMsg);
135 out.println(" wait on requests time: " + waitRequestsTimer);
136returnthis;
137 }
138139/**140 * @return Communication timer141 */142publiclong getCommTimer() {
143return commTimer.getValue();
144 }
145146/**147 * @return Total compute timer148 */149publiclong getComputeAllTimer() {
150return computeAllTimer.getValue();
151 }
152153/**154 * @return timer between start time and first message flushed.155 */156publiclong getTimeToFirstMsg() {
157return timeToFirstMsg.getValue();
158 }
159160/**161 * @return timer for superstep time162 */163publiclong getSuperstepTimer() {
164return superstepTimer.getValue();
165 }
166167/**168 * @return timer waiting for other workers169 */170publiclong getWaitRequestsTimer() {
171return waitRequestsTimer.getValue();
172 }
173174/**175 * @return number of bytes loaded from disk by out-of-core mechanism (if any176 * is used)177 */178publiclong getBytesLoadedFromDisk() {
179return bytesLoadedFromDisk;
180 }
181182/**183 * @return number of bytes stored on disk by out-of-core mechanism (if any is184 * used)185 */186publiclong getBytesStoredOnDisk() {
187return bytesStoredOnDisk;
188 }
189190/**191 * @return a rough estimate of percentage of graph in memory192 */193publicdouble getGraphPercentageInMemory() {
194return graphPercentageInMemory;
195 }
196197 @Override
198publicvoid readFields(DataInput dataInput) throws IOException {
199 commTimer.setValue(dataInput.readLong());
200 computeAllTimer.setValue(dataInput.readLong());
201 timeToFirstMsg.setValue(dataInput.readLong());
202 superstepTimer.setValue(dataInput.readLong());
203 waitRequestsTimer.setValue(dataInput.readLong());
204 bytesLoadedFromDisk = dataInput.readLong();
205 bytesStoredOnDisk = dataInput.readLong();
206 graphPercentageInMemory = dataInput.readDouble();
207 }
208209 @Override
210publicvoid write(DataOutput dataOutput) throws IOException {
211 dataOutput.writeLong(commTimer.getValue());
212 dataOutput.writeLong(computeAllTimer.getValue());
213 dataOutput.writeLong(timeToFirstMsg.getValue());
214 dataOutput.writeLong(superstepTimer.getValue());
215 dataOutput.writeLong(waitRequestsTimer.getValue());
216 dataOutput.writeLong(bytesLoadedFromDisk);
217 dataOutput.writeLong(bytesStoredOnDisk);
218 dataOutput.writeDouble(graphPercentageInMemory);
219 }
220 }