This project has retired. For details please refer to its
Attic page.
OutOfCoreIOCallable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc;
20
21 import com.yammer.metrics.core.Counter;
22 import com.yammer.metrics.core.Histogram;
23 import org.apache.giraph.metrics.GiraphMetrics;
24 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
25 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
26 import org.apache.giraph.ooc.command.IOCommand;
27 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
28 import org.apache.giraph.ooc.command.WaitIOCommand;
29 import org.apache.log4j.Logger;
30
31 import java.util.concurrent.Callable;
32
33
34
35
36 public class OutOfCoreIOCallable implements Callable<Void>,
37 ResetSuperstepMetricsObserver {
38
39 public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load";
40
41 public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store";
42
43 public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes";
44
45 public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes";
46
47 private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
48
49 private final OutOfCoreEngine oocEngine;
50
51 private final int diskId;
52
53 private Counter bytesReadPerSuperstep;
54
55 private Counter bytesWrittenPerSuperstep;
56
57 private Histogram histogramLoadSize;
58
59 private Histogram histogramStoreSize;
60
61
62
63
64
65
66
67 public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) {
68 this.oocEngine = oocEngine;
69 this.diskId = diskId;
70 newSuperstep(GiraphMetrics.get().perSuperstep());
71 GiraphMetrics.get().addSuperstepResetObserver(this);
72 }
73
74 @Override
75 public Void call() throws Exception {
76 while (true) {
77 oocEngine.getSuperstepLock().readLock().lock();
78 IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
79 if (LOG.isDebugEnabled() && !(command instanceof WaitIOCommand)) {
80 LOG.debug("call: thread " + diskId + "'s next IO command is: " +
81 command);
82 }
83 if (command == null) {
84 oocEngine.getSuperstepLock().readLock().unlock();
85 break;
86 }
87 if (command instanceof WaitIOCommand) {
88 oocEngine.getSuperstepLock().readLock().unlock();
89 }
90
91 boolean commandExecuted = false;
92 long duration = 0;
93 long bytes;
94
95 try {
96 long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
97 .getSuperstepGCTime();
98 long startTime = System.currentTimeMillis();
99 commandExecuted = command.execute();
100 duration = System.currentTimeMillis() - startTime;
101 timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
102 .getSuperstepGCTime() - timeInGC;
103 bytes = command.bytesTransferred();
104 if (LOG.isDebugEnabled() && !(command instanceof WaitIOCommand)) {
105 LOG.debug("call: thread " + diskId + "'s command " + command +
106 " completed: bytes= " + bytes + ", duration=" + duration + ", " +
107 "bandwidth=" + String.format("%.2f", (double) bytes / duration *
108 1000 / 1024 / 1024) +
109 ((command instanceof WaitIOCommand) ? "" :
110 (", bandwidth (excluding GC time)=" + String.format("%.2f",
111 (double) bytes / (duration - timeInGC) *
112 1000 / 1024 / 1024))));
113 }
114 } catch (Exception e) {
115 throw new RuntimeException(
116 "call: execution of IO command " + command + " failed!", e);
117 }
118
119 if (!(command instanceof WaitIOCommand)) {
120 oocEngine.getSuperstepLock().readLock().unlock();
121 if (bytes != 0) {
122 if (command instanceof LoadPartitionIOCommand) {
123 bytesReadPerSuperstep.inc(bytes);
124 histogramLoadSize.update(bytes);
125 } else {
126 bytesWrittenPerSuperstep.inc(bytes);
127 histogramStoreSize.update(bytes);
128 }
129 }
130 }
131
132 if (commandExecuted && duration > 0) {
133 oocEngine.getIOStatistics().update(command.getType(),
134 command.bytesTransferred(), duration);
135 }
136 oocEngine.getIOScheduler().ioCommandCompleted(command);
137 }
138 if (LOG.isInfoEnabled()) {
139 LOG.info("call: out-of-core IO thread " + diskId + " terminating!");
140 }
141 return null;
142 }
143
144 @Override
145 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
146 bytesReadPerSuperstep = superstepMetrics.getCounter(BYTES_LOAD_FROM_DISK);
147 bytesWrittenPerSuperstep =
148 superstepMetrics.getCounter(BYTES_STORE_TO_DISK);
149 histogramLoadSize =
150 superstepMetrics.getUniformHistogram(HISTOGRAM_LOAD_SIZE);
151 histogramStoreSize =
152 superstepMetrics.getUniformHistogram(HISTOGRAM_STORE_SIZE);
153 }
154 }
155