This project has retired. For details please refer to its
Attic page.
ByteCounterDelegate xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import com.yammer.metrics.core.Histogram;
22 import com.yammer.metrics.core.Meter;
23 import com.yammer.metrics.core.NoOpHistogram;
24 import com.yammer.metrics.core.NoOpMeter;
25 import io.netty.buffer.ByteBuf;
26 import org.apache.giraph.metrics.MeterDesc;
27 import org.apache.giraph.metrics.MetricNames;
28 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
29 import org.apache.giraph.time.SystemTime;
30 import org.apache.giraph.time.Time;
31
32 import java.text.DecimalFormat;
33 import java.util.concurrent.atomic.AtomicLong;
34
35
36
37
38
39 public class ByteCounterDelegate implements ByteCounter {
40
41 public static final double MEGABYTE = 1024f * 1024f;
42
43 private static final DecimalFormat DOUBLE_FORMAT =
44 new DecimalFormat("#######.####");
45
46 private static final Time TIME = SystemTime.get();
47
48 private final AtomicLong bytesProcessed = new AtomicLong();
49
50 private final AtomicLong bytesProcessedPerSuperstep = new AtomicLong();
51
52 private final AtomicLong processedRequests = new AtomicLong();
53
54 private final AtomicLong startMsecs = new AtomicLong();
55
56 private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
57
58
59
60 private Meter processedRequestsMeter = NoOpMeter.INSTANCE;
61
62 private Histogram processedBytesHist = NoOpHistogram.INSTANCE;
63
64
65 private final boolean isInbound;
66
67
68
69
70
71
72 public ByteCounterDelegate(boolean isInBound) {
73 this.isInbound = isInBound;
74 }
75
76
77
78
79
80
81 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
82 if (isInbound) {
83 processedRequestsMeter = superstepMetrics.getMeter(
84 MeterDesc.RECEIVED_REQUESTS);
85 processedBytesHist = superstepMetrics.getUniformHistogram(
86 MetricNames.RECEIVED_BYTES);
87 } else {
88 processedRequestsMeter = superstepMetrics.getMeter(
89 MeterDesc.SENT_REQUESTS);
90 processedBytesHist = superstepMetrics.getUniformHistogram(
91 MetricNames.SENT_BYTES);
92 }
93 }
94
95
96
97
98
99
100
101 public int byteBookkeeper(ByteBuf buf) {
102 int processedBytes = buf.readableBytes();
103 bytesProcessed.addAndGet(processedBytes);
104 bytesProcessedPerSuperstep.addAndGet(processedBytes);
105 processedBytesHist.update(processedBytes);
106 processedRequests.incrementAndGet();
107 processedRequestsMeter.mark();
108 return processedBytes;
109 }
110
111
112
113
114 public void resetBytes() {
115 bytesProcessed.set(0);
116 processedRequests.set(0);
117 }
118
119
120
121
122 public void resetStartMsecs() {
123 startMsecs.set(TIME.getMilliseconds());
124 }
125
126 @Override
127 public void resetAll() {
128 resetBytes();
129 resetStartMsecs();
130 }
131
132
133
134
135
136 public long getBytesProcessedPerSuperstep() {
137 return bytesProcessedPerSuperstep.get();
138 }
139
140
141
142
143 public void resetBytesProcessedPerSuperstep() {
144 bytesProcessedPerSuperstep.set(0);
145 }
146
147 public long getBytesProcessed() {
148 return bytesProcessed.get();
149 }
150
151
152
153
154 public double getMbytesPerSecProcessed() {
155 return bytesProcessed.get() * 1000f /
156 (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
157 }
158
159
160
161
162
163
164
165 public String getMetricsHelper(double mBytesProcessed,
166 double mBytesProcessedPerReq) {
167 if (isInbound) {
168 return "MBytes/sec received = " +
169 DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
170 ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesProcessed) +
171 ", ave received req MBytes = " +
172 DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
173 ", secs waited = " +
174 ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
175 } else {
176 return "MBytes/sec sent = " +
177 DOUBLE_FORMAT.format(getMbytesPerSecProcessed()) +
178 ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesProcessed) +
179 ", ave sent req MBytes = " +
180 DOUBLE_FORMAT.format(mBytesProcessedPerReq) +
181 ", secs waited = " +
182 ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
183 }
184 }
185
186 @Override
187 public String getMetrics() {
188 double mBytesProcessed = bytesProcessed.get() / MEGABYTE;
189 long curProcessedRequests = processedRequests.get();
190 double mBytesProcessedPerReq = (curProcessedRequests == 0) ? 0 :
191 mBytesProcessed / curProcessedRequests;
192
193 return getMetricsHelper(mBytesProcessed, mBytesProcessedPerReq);
194 }
195
196 @Override
197 public String getMetricsWindow(int minMsecsWindow) {
198 long lastUpdatedMsecs = metricsWindowLastUpdatedMsecs.get();
199 long curMsecs = TIME.getMilliseconds();
200 if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
201
202 if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
203 curMsecs)) {
204 String metrics = getMetrics();
205 resetAll();
206 return metrics;
207 }
208 }
209 return null;
210 }
211 }