This project has retired. For details please refer to its
Attic page.
ResponseClientHandler xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.comm.netty.NettyClient;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.log4j.Logger;
24
25 import io.netty.buffer.ByteBuf;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelInboundHandlerAdapter;
28 import io.netty.util.ReferenceCountUtil;
29
30 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
31
32
33
34
35 public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
36
37 private static final Logger LOG =
38 Logger.getLogger(ResponseClientHandler.class);
39
40 private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
41
42 private final boolean dropFirstResponse;
43
44 private final NettyClient nettyClient;
45
46
47
48
49
50
51 public ResponseClientHandler(NettyClient nettyClient, Configuration conf) {
52 this.nettyClient = nettyClient;
53 dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
54 }
55
56 @Override
57 public void channelRead(ChannelHandlerContext ctx, Object msg)
58 throws Exception {
59 if (!(msg instanceof ByteBuf)) {
60 throw new IllegalStateException("channelRead: Got a " +
61 "non-ByteBuf message " + msg);
62 }
63
64 ByteBuf buf = (ByteBuf) msg;
65 int senderId = -1;
66 long requestId = -1;
67 int response = -1;
68 try {
69 senderId = buf.readInt();
70 requestId = buf.readLong();
71 response = buf.readInt();
72 } catch (IndexOutOfBoundsException e) {
73 throw new IllegalStateException(
74 "channelRead: Got IndexOutOfBoundsException ", e);
75 }
76 ReferenceCountUtil.release(buf);
77
78 boolean shouldDrop = false;
79
80 if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
81 LOG.info("channelRead: Simulating dropped response " + response +
82 " for request " + requestId);
83 setAlreadyDroppedFirstResponse();
84 shouldDrop = true;
85 }
86
87 nettyClient.messageReceived(senderId, requestId, response, shouldDrop);
88 }
89
90
91
92
93 private static void setAlreadyDroppedFirstResponse() {
94 ALREADY_DROPPED_FIRST_RESPONSE = true;
95 }
96
97 @Override
98 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("channelClosed: Closed the channel on " +
101 ctx.channel().remoteAddress());
102 }
103 ctx.fireChannelInactive();
104 }
105
106 @Override
107 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
108 throws Exception {
109 LOG.warn("exceptionCaught: Channel channelId=" +
110 ctx.channel().hashCode() + " failed with remote address " +
111 ctx.channel().remoteAddress(), cause);
112 }
113 }
114