This project has retired. For details please refer to its
Attic page.
RequestDecoder xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.comm.netty.InboundByteCounter;
23 import org.apache.giraph.comm.requests.RequestType;
24 import org.apache.giraph.comm.requests.WritableRequest;
25 import org.apache.giraph.time.SystemTime;
26 import org.apache.giraph.time.Time;
27 import org.apache.giraph.time.Times;
28 import org.apache.giraph.utils.ReflectionUtils;
29 import org.apache.giraph.utils.RequestUtils;
30 import org.apache.log4j.Logger;
31
32 import io.netty.buffer.ByteBuf;
33 import io.netty.channel.ChannelHandlerContext;
34 import io.netty.channel.ChannelInboundHandlerAdapter;
35 import io.netty.util.ReferenceCountUtil;
36
37
38
39
40 public class RequestDecoder extends ChannelInboundHandlerAdapter {
41
42 private static final Logger LOG =
43 Logger.getLogger(RequestDecoder.class);
44
45 private static final Time TIME = SystemTime.get();
46
47 private final ImmutableClassesGiraphConfiguration conf;
48
49 private final InboundByteCounter byteCounter;
50
51 private long startDecodingNanoseconds = -1;
52
53
54
55
56
57
58 public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
59 InboundByteCounter byteCounter) {
60 this.conf = conf;
61 this.byteCounter = byteCounter;
62 }
63
64 @Override
65 public void channelRead(ChannelHandlerContext ctx, Object msg)
66 throws Exception {
67 if (!(msg instanceof ByteBuf)) {
68 throw new IllegalStateException("decode: Got illegal message " + msg);
69 }
70
71 String metrics = byteCounter.getMetricsWindow(30000);
72 if (metrics != null) {
73 if (LOG.isInfoEnabled()) {
74 LOG.info("decode: Server window metrics " + metrics);
75 }
76 }
77
78 if (LOG.isDebugEnabled()) {
79 startDecodingNanoseconds = TIME.getNanoseconds();
80 }
81
82
83 ByteBuf buf = (ByteBuf) msg;
84 int enumValue = buf.readByte();
85 RequestType type = RequestType.values()[enumValue];
86 Class<? extends WritableRequest> requestClass = type.getRequestClass();
87 WritableRequest request =
88 ReflectionUtils.newInstance(requestClass, conf);
89 request = RequestUtils.decodeWritableRequest(buf, request);
90
91 if (LOG.isDebugEnabled()) {
92 LOG.debug("decode: Client " + request.getClientId() +
93 ", requestId " + request.getRequestId() +
94 ", " + request.getType() + ", with size " +
95 buf.writerIndex() + " took " +
96 Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
97 }
98 ReferenceCountUtil.release(buf);
99
100 ctx.fireChannelRead(request);
101 }
102 }