This project has retired. For details please refer to its Attic page.
InboundByteCounter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.metrics.GiraphMetrics;
22  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
23  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
24  import org.apache.log4j.Logger;
25  
26  import io.netty.buffer.ByteBuf;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelInboundHandlerAdapter;
29  import io.netty.channel.ChannelHandler.Sharable;
30  
31  
32  /**
33   * Keep track of the bytes received and provide some metrics when
34   * desired as part of the Netty Channel stack.
35   */
36  @Sharable
37  public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
38      ByteCounter, ResetSuperstepMetricsObserver {
39    /** Class logger */
40    private static final Logger LOG =
41        Logger.getLogger(InboundByteCounter.class);
42    /** ByteCounter delegate object */
43    private final ByteCounterDelegate delegate = new ByteCounterDelegate(true);
44  
45    /** Constructor */
46    public InboundByteCounter() {
47      // Initialize Metrics
48      GiraphMetrics.get().addSuperstepResetObserver(this);
49    }
50  
51    public long getBytesReceived() {
52      return delegate.getBytesProcessed();
53    }
54  
55    /**
56     * Returns bytes received per superstep.
57     * @return Number of bytes.
58     */
59    public long getBytesReceivedPerSuperstep() {
60      return delegate.getBytesProcessedPerSuperstep();
61    }
62  
63    /**
64     * Set bytes received per superstep to 0.
65     */
66    public void resetBytesReceivedPerSuperstep() {
67      delegate.resetBytesProcessedPerSuperstep();
68    }
69  
70    /**
71     * @return Mbytes received / sec in the current interval
72     */
73    public double getMbytesPerSecReceived() {
74      return delegate.getMbytesPerSecProcessed();
75    }
76  
77    @Override
78    public void channelRead(ChannelHandlerContext ctx, Object msg)
79      throws Exception {
80      if (msg instanceof ByteBuf) {
81        ByteBuf buf = (ByteBuf) msg;
82        int receivedBytes = delegate.byteBookkeeper(buf);
83        if (LOG.isDebugEnabled()) {
84          LOG.debug("channelRead: " + ctx.channel().toString() + " buffer " +
85              "size = " + receivedBytes + ", total bytes = " +
86              getBytesReceived());
87        }
88      }
89      ctx.fireChannelRead(msg);
90    }
91  
92    @Override
93    public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
94      delegate.newSuperstep(superstepMetrics);
95    }
96  
97    @Override
98    public void resetAll() {
99      delegate.resetAll();
100   }
101 
102   @Override
103   public String getMetrics() {
104     return delegate.getMetrics();
105   }
106 
107   @Override
108   public String getMetricsWindow(int minMsecsWindow) {
109     return delegate.getMetricsWindow(minMsecsWindow);
110   }
111 }