1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty;
2021import org.apache.giraph.metrics.GiraphMetrics;
22import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24import org.apache.log4j.Logger;
2526import io.netty.buffer.ByteBuf;
27import io.netty.channel.ChannelHandlerContext;
28import io.netty.channel.ChannelOutboundHandlerAdapter;
29import io.netty.channel.ChannelPromise;
30import io.netty.channel.ChannelHandler.Sharable;
3132/**33 * Keep track of the bytes sent and provide some metrics when34 * desired as part of the Netty Channel stack.35 */36 @Sharable
37publicclassOutboundByteCounterextends ChannelOutboundHandlerAdapter
38implements ByteCounter, ResetSuperstepMetricsObserver {
39/** Class logger */40privatestaticfinal Logger LOG =
41 Logger.getLogger(OutboundByteCounter.class);
42/**ByteCounter delegate object */43privatefinalByteCounterDelegate delegate = newByteCounterDelegate(false);
4445/** Constructor */46publicOutboundByteCounter() {
47// Initialize Metrics48 GiraphMetrics.get().addSuperstepResetObserver(this);
49 }
5051publiclong getBytesSent() {
52return delegate.getBytesProcessed();
53 }
5455/**56 * @return Mbytes sent / sec in the current interval57 */58publicdouble getMbytesPerSecSent() {
59return delegate.getMbytesPerSecProcessed();
60 }
6162 @Override
63publicvoid write(ChannelHandlerContext ctx, Object msg,
64 ChannelPromise promise) throws Exception {
65if (msg instanceof ByteBuf) {
66 ByteBuf buf = (ByteBuf) msg;
67int sentBytes = delegate.byteBookkeeper(buf);
68if (LOG.isDebugEnabled()) {
69 LOG.debug("write: " + ctx.channel().toString() +
70" buffer size = " + sentBytes + ", total bytes = " + getBytesSent()
71 );
72 }
73 }
74 ctx.writeAndFlush(msg, promise);
75 }
7677 @Override
78publicvoid newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
79 delegate.newSuperstep(superstepMetrics);
80 }
8182 @Override
83publicvoid resetAll() {
84 delegate.resetAll();
85 }
8687 @Override
88public String getMetrics() {
89return delegate.getMetrics();
90 }
9192 @Override
93public String getMetricsWindow(int minMsecsWindow) {
94return delegate.getMetricsWindow(minMsecsWindow);
95 }
96 }
97