1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.metrics;
2021import org.apache.giraph.graph.GraphTaskManager;
22import org.apache.giraph.ooc.OutOfCoreEngine;
23import org.apache.giraph.ooc.OutOfCoreIOCallable;
24import org.apache.giraph.worker.BspServiceWorker;
2526import com.google.common.collect.Maps;
2728import java.io.PrintStream;
29import java.util.Map;
3031/**32 * Map of a bunch of aggregated metrics33 */34publicclassAggregatedMetrics {
35/** Mapping from name to aggregated metric */36private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap();
3738/**39 * Add value from hostname for a metric.40 *41 * @param name String name of metric42 * @param value long value to track43 * @param hostnamePartitionId String host it came from44 * @return this45 */46publicAggregatedMetrics add(String name, long value,
47 String hostnamePartitionId) {
48AggregatedMetricLong aggregatedMetric =
49 (AggregatedMetricLong) metrics.get(name);
50if (aggregatedMetric == null) {
51 aggregatedMetric = newAggregatedMetricLong();
52 metrics.put(name, aggregatedMetric);
53 }
54 aggregatedMetric.addItem(value, hostnamePartitionId);
55returnthis;
56 }
5758/**59 * Add value from hostname for a metric.60 *61 * @param name String name of metric62 * @param value double value to track63 * @param hostnamePartitionId String host it came from64 * @return this65 */66publicAggregatedMetrics add(String name, double value,
67 String hostnamePartitionId) {
68AggregatedMetricDouble aggregatedMetric =
69 (AggregatedMetricDouble) metrics.get(name);
70if (aggregatedMetric == null) {
71 aggregatedMetric = newAggregatedMetricDouble();
72 metrics.put(name, aggregatedMetric);
73 }
74 aggregatedMetric.addItem(value, hostnamePartitionId);
75returnthis;
76 }
7778/**79 * Add metrics from worker.80 *81 * @param workerMetrics WorkerSuperstepMetrics from work82 * @param hostname String hostname of worker83 * @return this84 */85publicAggregatedMetrics add(WorkerSuperstepMetrics workerMetrics,
86 String hostname) {
87 add(GraphTaskManager.TIMER_SUPERSTEP_TIME,
88 workerMetrics.getSuperstepTimer(), hostname);
89 add(GraphTaskManager.TIMER_COMMUNICATION_TIME,
90 workerMetrics.getCommTimer(), hostname);
91 add(GraphTaskManager.TIMER_COMPUTE_ALL,
92 workerMetrics.getComputeAllTimer(), hostname);
93 add(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG,
94 workerMetrics.getTimeToFirstMsg(), hostname);
95 add(BspServiceWorker.TIMER_WAIT_REQUESTS,
96 workerMetrics.getWaitRequestsTimer(), hostname);
97 add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK,
98 workerMetrics.getBytesLoadedFromDisk(), hostname);
99 add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK,
100 workerMetrics.getBytesStoredOnDisk(), hostname);
101 add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY,
102 workerMetrics.getGraphPercentageInMemory(), hostname);
103returnthis;
104 }
105106/**107 * Print the aggregated metrics to the stream provided.108 *109 * @param superstep long number of superstep.110 * @param out PrintStream to write to.111 * @return this112 */113publicAggregatedMetrics print(long superstep, PrintStream out) {
114AggregatedMetric superstepTime = get(GraphTaskManager.TIMER_SUPERSTEP_TIME);
115AggregatedMetric commTime = get(GraphTaskManager.TIMER_COMMUNICATION_TIME);
116AggregatedMetric computeAll = get(GraphTaskManager.TIMER_COMPUTE_ALL);
117AggregatedMetric timeToFirstMsg =
118 get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG);
119AggregatedMetric waitRequestsMicros = get(
120 BspServiceWorker.TIMER_WAIT_REQUESTS);
121AggregatedMetric bytesLoaded =
122 get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK);
123AggregatedMetric bytesStored =
124 get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK);
125AggregatedMetric graphInMem =
126 get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
127128 out.println();
129 out.println("--- METRICS: superstep " + superstep + " ---");
130 printAggregatedMetric(out, "superstep time", "ms", superstepTime);
131 printAggregatedMetric(out, "compute all partitions", "ms", computeAll);
132 printAggregatedMetric(out, "network communication time", "ms", commTime);
133 printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg);
134 printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros);
135 printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded);
136 printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored);
137 printAggregatedMetric(out, "graph in mem", "%", graphInMem);
138139returnthis;
140 }
141142/**143 * Print batch of lines for AggregatedMetric144 *145 * @param out PrintStream to write to146 * @param header String header to print.147 * @param unit String unit of metric148 * @param aggregatedMetric AggregatedMetric to write149 */150privatevoid printAggregatedMetric(PrintStream out, String header,
151 String unit,
152AggregatedMetric aggregatedMetric) {
153if (aggregatedMetric.hasData()) {
154 out.println(header);
155 out.println(" mean: " + aggregatedMetric.mean() + " " + unit);
156 printValueFromHost(out, " smallest: ", unit, aggregatedMetric.min());
157 printValueFromHost(out, " largest: ", unit, aggregatedMetric.max());
158 } else {
159 out.println(header + ": NO DATA");
160 }
161 }
162163/**164 * Print a line for a value with the host it came from.165 *166 * @param out PrintStream to write to167 * @param prefix String to write at beginning168 * @param unit String unit of metric169 * @param vh ValueWithHostname to write170 */171privatevoid printValueFromHost(PrintStream out, String prefix,
172 String unit, ValueWithHostname vh) {
173 out.println(prefix + vh.getValue() + ' ' + unit +
174" from " + vh.getHostname());
175 }
176177/**178 * Get AggregatedMetric with given name.179 *180 * @param name String metric to lookup.181 * @return AggregatedMetric for given metric name.182 */183publicAggregatedMetric get(String name) {
184return metrics.get(name);
185 }
186187/**188 * Get map of all aggregated metrics.189 *190 * @return Map of all the aggregated metrics.191 */192public Map<String, AggregatedMetric<?>> getAll() {
193return metrics;
194 }
195 }