1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty;
2021import org.apache.giraph.metrics.GiraphMetrics;
22import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24import org.apache.log4j.Logger;
2526import io.netty.buffer.ByteBuf;
27import io.netty.channel.ChannelHandlerContext;
28import io.netty.channel.ChannelInboundHandlerAdapter;
29import io.netty.channel.ChannelHandler.Sharable;
303132/**33 * Keep track of the bytes received and provide some metrics when34 * desired as part of the Netty Channel stack.35 */36 @Sharable
37publicclassInboundByteCounterextends ChannelInboundHandlerAdapter implements38 ByteCounter, ResetSuperstepMetricsObserver {
39/** Class logger */40privatestaticfinal Logger LOG =
41 Logger.getLogger(InboundByteCounter.class);
42/**ByteCounter delegate object */43privatefinalByteCounterDelegate delegate = newByteCounterDelegate(true);
4445/** Constructor */46publicInboundByteCounter() {
47// Initialize Metrics48 GiraphMetrics.get().addSuperstepResetObserver(this);
49 }
5051publiclong getBytesReceived() {
52return delegate.getBytesProcessed();
53 }
5455/**56 * Returns bytes received per superstep.57 * @return Number of bytes.58 */59publiclong getBytesReceivedPerSuperstep() {
60return delegate.getBytesProcessedPerSuperstep();
61 }
6263/**64 * Set bytes received per superstep to 0.65 */66publicvoid resetBytesReceivedPerSuperstep() {
67 delegate.resetBytesProcessedPerSuperstep();
68 }
6970/**71 * @return Mbytes received / sec in the current interval72 */73publicdouble getMbytesPerSecReceived() {
74return delegate.getMbytesPerSecProcessed();
75 }
7677 @Override
78publicvoid channelRead(ChannelHandlerContext ctx, Object msg)
79throws Exception {
80if (msg instanceof ByteBuf) {
81 ByteBuf buf = (ByteBuf) msg;
82int receivedBytes = delegate.byteBookkeeper(buf);
83if (LOG.isDebugEnabled()) {
84 LOG.debug("channelRead: " + ctx.channel().toString() + " buffer " +
85"size = " + receivedBytes + ", total bytes = " +
86 getBytesReceived());
87 }
88 }
89 ctx.fireChannelRead(msg);
90 }
9192 @Override
93publicvoid newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
94 delegate.newSuperstep(superstepMetrics);
95 }
9697 @Override
98publicvoid resetAll() {
99 delegate.resetAll();
100 }
101102 @Override
103public String getMetrics() {
104return delegate.getMetrics();
105 }
106107 @Override
108public String getMetricsWindow(int minMsecsWindow) {
109return delegate.getMetricsWindow(minMsecsWindow);
110 }
111 }