1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc;
2021import com.google.common.collect.Maps;
22import org.apache.giraph.conf.GiraphConstants;
23import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.conf.IntConfOption;
25import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
26import org.apache.log4j.Logger;
2728import java.util.Map;
29import java.util.Queue;
30import java.util.concurrent.ArrayBlockingQueue;
31import java.util.concurrent.atomic.AtomicLong;
3233/**34 * Class to collect statistics regarding IO operations done in out-of-core35 * mechanism.36 */37publicclassOutOfCoreIOStatistics {
38/**39 * An estimate of disk bandwidth. This number is only used just at the start40 * of the computation, and will be updated/replaced later on once a few disk41 * operations happen.42 */43publicstaticfinalIntConfOption DISK_BANDWIDTH_ESTIMATE =
44newIntConfOption("giraph.diskBandwidthEstimate", 125,
45"An estimate of disk bandwidth (MB/s). This number is used just at " +
46"the beginning of the computation, and it will be " +
47"updated/replaced once a few disk operations happen.");
48/**49 * How many recent IO operations should we keep track of? Any report given out50 * of this statistics collector is only based on most recent IO operations.51 */52publicstaticfinalIntConfOption IO_COMMAND_HISTORY_SIZE =
53newIntConfOption("giraph.ioCommandHistorySize", 50,
54"Number of most recent IO operations to consider for reporting the" +
55"statistics.");
5657/**58 * Use this option to control how frequently to print OOC statistics.59 */60publicstaticfinalIntConfOption STATS_PRINT_FREQUENCY =
61newIntConfOption("giraph.oocStatPrintFrequency", 200,
62"Number of updates before stats are printed.");
6364/** Class logger */65privatestaticfinal Logger LOG =
66 Logger.getLogger(OutOfCoreIOStatistics.class);
67/** Estimate of disk bandwidth (bytes/second) */68privatefinal AtomicLong diskBandwidthEstimate;
69/** Cached value for IO_COMMAND_HISTORY_SIZE */70privatefinalint maxHistorySize;
71/**72 * Coefficient/Weight of the most recent IO operation toward the disk73 * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the74 * latest IO command happened at the rate of r, the new estimate of disk75 * bandwidth is calculated as:76 * d_new = updateCoefficient * r + (1 - updateCoefficient) * d77 */78privatefinaldouble updateCoefficient;
79/** Queue of all recent commands */80privatefinal Queue<StatisticsEntry> commandHistory;
81/**82 * Command statistics for each type of IO command. This is the statistics of83 * the recent commands in the history we keep track of (with 'maxHistorySize'84 * command in the history).85 */86privatefinal Map<IOCommandType, StatisticsEntry> aggregateStats;
87/** How many IO command completed? */88privateint numUpdates = 0;
89/** Cached value for {@link #STATS_PRINT_FREQUENCY} */90privateint statsPrintFrequency = 0;
9192/**93 * Constructor94 *95 * @param conf configuration96 * @param numIOThreads number of disks/IO threads97 */98publicOutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
99int numIOThreads) {
100this.diskBandwidthEstimate =
101new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
102 (long) GiraphConstants.ONE_MB);
103this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
104this.updateCoefficient = 1.0 / maxHistorySize;
105// Adding more entry to the capacity of the queue to have a wiggle room106// if all IO threads are adding/removing entries from the queue107this.commandHistory =
108new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
109this.aggregateStats = Maps.newConcurrentMap();
110for (IOCommandType type : IOCommandType.values()) {
111 aggregateStats.put(type, newStatisticsEntry(type, 0, 0, 0));
112 }
113this.statsPrintFrequency = STATS_PRINT_FREQUENCY.get(conf);
114 }
115116/**117 * Update statistics with the last IO command that is executed.118 *119 * @param type type of the IO command that is executed120 * @param bytesTransferred number of bytes transferred in the last IO command121 * @param duration duration it took for the last IO command to complete122 * (milliseconds)123 */124publicvoid update(IOCommandType type, long bytesTransferred,
125long duration) {
126StatisticsEntry entry = aggregateStats.get(type);
127synchronized (entry) {
128 entry.setOccurrence(entry.getOccurrence() + 1);
129 entry.setDuration(duration + entry.getDuration());
130 entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
131 }
132 commandHistory.offer(
133newStatisticsEntry(type, bytesTransferred, duration, 0));
134if (type != IOCommandType.WAIT) {
135// If the current estimate is 'd', the new rate is 'r', and the size of136// history is 'n', we can simply model all the past command's rate as:137// d, d, d, ..., d, r138// where 'd' happens for 'n-1' times. Hence the new estimate of the139// bandwidth would be:140// d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r141// where updateCoefficient = 1/n142 diskBandwidthEstimate.set((long)
143 (updateCoefficient * (bytesTransferred / duration * 1000) +
144 (1 - updateCoefficient) * diskBandwidthEstimate.get()));
145 }
146if (commandHistory.size() > maxHistorySize) {
147StatisticsEntry removedEntry = commandHistory.poll();
148 entry = aggregateStats.get(removedEntry.getType());
149synchronized (entry) {
150 entry.setOccurrence(entry.getOccurrence() - 1);
151 entry.setDuration(entry.getDuration() - removedEntry.getDuration());
152 entry.setBytesTransferred(
153 entry.getBytesTransferred() - removedEntry.getBytesTransferred());
154 }
155 }
156 numUpdates++;
157// Outputting log every so many commands158if (numUpdates % statsPrintFrequency == 0) {
159if (LOG.isInfoEnabled()) {
160 LOG.info(this);
161 }
162 }
163 }
164165 @Override
166public String toString() {
167 StringBuffer sb = new StringBuffer();
168long waitTime = 0;
169long loadTime = 0;
170long bytesRead = 0;
171long storeTime = 0;
172long bytesWritten = 0;
173for (Map.Entry<IOCommandType, StatisticsEntry> entry :
174 aggregateStats.entrySet()) {
175synchronized (entry.getValue()) {
176 sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
177if (entry.getKey() == IOCommandType.WAIT) {
178 waitTime += entry.getValue().getDuration();
179 } elseif (entry.getKey() == IOCommandType.LOAD_PARTITION) {
180 loadTime += entry.getValue().getDuration();
181 bytesRead += entry.getValue().getBytesTransferred();
182 } else {
183 storeTime += entry.getValue().getDuration();
184 bytesWritten += entry.getValue().getBytesTransferred();
185 }
186 }
187 }
188 sb.append(String.format("Average STORE: %.2f MB/s, ",
189 (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
190 sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
191 (double) (bytesRead - bytesWritten) /
192 (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
193 sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
194 (double) diskBandwidthEstimate.get() / 1024 / 1024));
195196return sb.toString();
197 }
198199/**200 * @return most recent estimate of the disk bandwidth201 */202publiclong getDiskBandwidth() {
203return diskBandwidthEstimate.get();
204 }
205206/**207 * Get aggregate statistics of a given command type in the command history208 *209 * @param type type of the command to get the statistics for210 * @return aggregate statistics for the given command type211 */212publicBytesDuration getCommandTypeStats(IOCommandType type) {
213StatisticsEntry entry = aggregateStats.get(type);
214synchronized (entry) {
215returnnewBytesDuration(entry.getBytesTransferred(), entry.getDuration(),
216 entry.getOccurrence());
217 }
218 }
219220/**221 * Helper class to return results of statistics collector for a certain222 * command type223 */224publicstaticclassBytesDuration {
225/** Number of bytes transferred in a few commands of the same type */226privatelong bytes;
227/** Duration of it took to execute a few commands of the same type */228privatelong duration;
229/** Number of commands executed of the same type */230privateint occurrence;
231232/**233 * Constructor234 * @param bytes number of bytes transferred235 * @param duration duration it took to execute commands236 * @param occurrence number of commands237 */238BytesDuration(long bytes, long duration, int occurrence) {
239this.bytes = bytes;
240this.duration = duration;
241this.occurrence = occurrence;
242 }
243244/**245 * @return number of bytes transferred for the same command type246 */247publiclong getBytes() {
248return bytes;
249 }
250251/**252 * @return duration it took to execute a few commands of the same type253 */254publiclong getDuration() {
255return duration;
256 }
257258/**259 * @return number of commands that are executed of the same type260 */261publicint getOccurrence() {
262return occurrence;
263 }
264 }
265266/**267 * Helper class to keep statistics for a certain command type268 */269privatestaticclassStatisticsEntry {
270/** Type of the command */271private IOCommandType type;
272/**273 * Aggregate number of bytes transferred executing the particular command274 * type in the history we keep275 */276privatelong bytesTransferred;
277/**278 * Aggregate duration it took executing the particular command type in the279 * history we keep280 */281privatelong duration;
282/**283 * Number of occurrences of the particular command type in the history we284 * keep285 */286privateint occurrence;
287288/**289 * Constructor290 *291 * @param type type of the command292 * @param bytesTransferred aggregate number of bytes transferred293 * @param duration aggregate execution time294 * @param occurrence number of occurrences of the particular command type295 */296publicStatisticsEntry(IOCommandType type, long bytesTransferred,
297long duration, int occurrence) {
298this.type = type;
299this.bytesTransferred = bytesTransferred;
300this.duration = duration;
301this.occurrence = occurrence;
302 }
303304/**305 * @return type of the command306 */307public IOCommandType getType() {
308return type;
309 }
310311/**312 * @return aggregate number of bytes transferred in the particular command313 * type314 */315publiclong getBytesTransferred() {
316return bytesTransferred;
317 }
318319/**320 * Update the aggregate number of bytes transferred321 *322 * @param bytesTransferred aggregate number of bytes323 */324publicvoid setBytesTransferred(long bytesTransferred) {
325this.bytesTransferred = bytesTransferred;
326 }
327328/**329 * @return aggregate duration it took to execute the particular command type330 */331publiclong getDuration() {
332return duration;
333 }
334335/**336 * Update the aggregate duration337 *338 * @param duration aggregate duration339 */340publicvoid setDuration(long duration) {
341this.duration = duration;
342 }
343344/**345 * @return number of occurrences of the particular command type346 */347publicint getOccurrence() {
348return occurrence;
349 }
350351/**352 * Update the number of occurrences of the particular command type353 *354 * @param occurrence number of occurrences355 */356publicvoid setOccurrence(int occurrence) {
357this.occurrence = occurrence;
358 }
359360 @Override
361public String toString() {
362if (type == IOCommandType.WAIT) {
363return String.format("%.2f sec", duration / 1000.0);
364 } else {
365return String.format("%.2f MB/s",
366 (double) bytesTransferred / duration * 1000 / 1024 / 2014);
367 }
368 }
369 }
370 }