This project has retired. For details please refer to its
Attic page.
RequestServerHandler xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.comm.flow_control.FlowControl;
22 import org.apache.giraph.comm.requests.WritableRequest;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.graph.TaskInfo;
25 import org.apache.giraph.time.SystemTime;
26 import org.apache.giraph.time.Time;
27 import org.apache.giraph.time.Times;
28 import org.apache.log4j.Logger;
29
30 import io.netty.buffer.ByteBuf;
31 import io.netty.channel.ChannelHandlerContext;
32 import io.netty.channel.ChannelInboundHandlerAdapter;
33
34 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
35
36
37
38
39
40
41 public abstract class RequestServerHandler<R> extends
42 ChannelInboundHandlerAdapter {
43
44 public static final int RESPONSE_BYTES = 16;
45
46 private static Time TIME = SystemTime.get();
47
48 private static final Logger LOG =
49 Logger.getLogger(RequestServerHandler.class);
50
51 private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
52
53 protected FlowControl flowControl;
54
55 private final boolean closeFirstRequest;
56
57 private final WorkerRequestReservedMap workerRequestReservedMap;
58
59 private final TaskInfo myTaskInfo;
60
61 private long startProcessingNanoseconds = -1;
62
63 private final Thread.UncaughtExceptionHandler exceptionHandler;
64
65
66
67
68
69
70
71
72
73 public RequestServerHandler(
74 WorkerRequestReservedMap workerRequestReservedMap,
75 ImmutableClassesGiraphConfiguration conf,
76 TaskInfo myTaskInfo,
77 Thread.UncaughtExceptionHandler exceptionHandler) {
78 this.workerRequestReservedMap = workerRequestReservedMap;
79 closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
80 this.myTaskInfo = myTaskInfo;
81 this.exceptionHandler = exceptionHandler;
82 }
83
84 @Override
85 public void channelRead(ChannelHandlerContext ctx, Object msg)
86 throws Exception {
87 if (LOG.isTraceEnabled()) {
88 LOG.trace("messageReceived: Got " + msg.getClass());
89 }
90
91 WritableRequest request = (WritableRequest) msg;
92
93
94 if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
95 LOG.info("messageReceived: Simulating closing channel on first " +
96 "request " + request.getRequestId() + " from " +
97 request.getClientId());
98 setAlreadyClosedFirstRequest();
99 ctx.close();
100 return;
101 }
102
103
104 AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST;
105 if (workerRequestReservedMap.reserveRequest(
106 request.getClientId(),
107 request.getRequestId())) {
108 if (LOG.isDebugEnabled()) {
109 startProcessingNanoseconds = TIME.getNanoseconds();
110 }
111 processRequest((R) request);
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("messageReceived: Processing client " +
114 request.getClientId() + ", " +
115 "requestId " + request.getRequestId() +
116 ", " + request.getType() + " took " +
117 Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
118 }
119 alreadyDone = AckSignalFlag.NEW_REQUEST;
120 } else {
121 LOG.info("messageReceived: Request id " +
122 request.getRequestId() + " from client " +
123 request.getClientId() +
124 " was already processed, " +
125 "not processing again.");
126 }
127
128
129 ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
130 buffer.writeInt(myTaskInfo.getTaskId());
131 buffer.writeLong(request.getRequestId());
132 int signal =
133 flowControl.calculateResponse(alreadyDone, request.getClientId());
134 buffer.writeInt(signal);
135 ctx.write(buffer);
136 }
137
138
139
140
141 private static void setAlreadyClosedFirstRequest() {
142 ALREADY_CLOSED_FIRST_REQUEST = true;
143 }
144
145
146
147
148
149
150 public abstract void processRequest(R request);
151
152 @Override
153 public void channelActive(ChannelHandlerContext ctx) throws Exception {
154 if (LOG.isDebugEnabled()) {
155 LOG.debug("channelActive: Connected the channel on " +
156 ctx.channel().remoteAddress());
157 }
158 ctx.fireChannelActive();
159 }
160
161 @Override
162 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
163 if (LOG.isDebugEnabled()) {
164 LOG.debug("channelInactive: Closed the channel on " +
165 ctx.channel().remoteAddress());
166 }
167 ctx.fireChannelInactive();
168 }
169
170 @Override
171 public void exceptionCaught(
172 ChannelHandlerContext ctx, Throwable cause) throws Exception {
173 exceptionHandler.uncaughtException(Thread.currentThread(), cause);
174 }
175
176
177
178
179 public interface Factory {
180
181
182
183
184
185
186
187
188
189 RequestServerHandler newHandler(
190 WorkerRequestReservedMap workerRequestReservedMap,
191 ImmutableClassesGiraphConfiguration conf,
192 TaskInfo myTaskInfo,
193 Thread.UncaughtExceptionHandler exceptionHandler);
194
195
196
197
198
199
200
201 void setFlowControl(FlowControl flowControl);
202 }
203 }