This project has retired. For details please refer to its
Attic page.
NettyServer xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import io.netty.handler.flush.FlushConsolidationHandler;
22 import org.apache.giraph.comm.flow_control.FlowControl;
23
24 import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
25
26 import org.apache.giraph.comm.netty.handler.RequestDecoder;
27 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
28
29 import org.apache.giraph.comm.netty.handler.ResponseEncoder;
30 import org.apache.giraph.comm.netty.handler.SaslServerHandler;
31
32 import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
33 import org.apache.giraph.conf.GiraphConstants;
34 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35 import org.apache.giraph.graph.TaskInfo;
36 import org.apache.giraph.utils.PipelineUtils;
37 import org.apache.giraph.utils.ProgressableUtils;
38 import org.apache.giraph.utils.ThreadUtils;
39 import org.apache.hadoop.util.Progressable;
40 import org.apache.log4j.Logger;
41 import io.netty.bootstrap.ServerBootstrap;
42 import io.netty.channel.group.ChannelGroup;
43 import io.netty.channel.group.DefaultChannelGroup;
44 import io.netty.channel.nio.NioEventLoopGroup;
45 import io.netty.channel.socket.SocketChannel;
46 import io.netty.channel.ChannelHandlerContext;
47 import io.netty.channel.EventLoopGroup;
48 import io.netty.channel.ChannelOption;
49 import io.netty.channel.ChannelInitializer;
50 import io.netty.channel.ChannelInboundHandlerAdapter;
51 import io.netty.channel.ChannelFuture;
52 import io.netty.channel.socket.nio.NioServerSocketChannel;
53 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
54
55 import io.netty.util.AttributeKey;
56
57 import io.netty.util.concurrent.DefaultEventExecutorGroup;
58 import io.netty.util.concurrent.EventExecutorGroup;
59 import io.netty.util.concurrent.ImmediateEventExecutor;
60 import io.netty.channel.AdaptiveRecvByteBufAllocator;
61
62 import java.net.InetSocketAddress;
63 import java.net.UnknownHostException;
64
65 import static com.google.common.base.Preconditions.checkState;
66 import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
67
68
69
70
71 public class NettyServer {
72
73 public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
74
75
76
77 public static final AttributeKey<SaslNettyServer>
78 CHANNEL_SASL_NETTY_SERVERS = AttributeKey.valueOf("channelSaslServers");
79
80
81
82 private static final Logger LOG = Logger.getLogger(NettyServer.class);
83
84 private final ImmutableClassesGiraphConfiguration conf;
85
86 private final Progressable progressable;
87
88 private final ChannelGroup accepted = new DefaultChannelGroup(
89 ImmediateEventExecutor.INSTANCE);
90
91 private final String localHostOrIp;
92
93 private InetSocketAddress myAddress;
94
95 private TaskInfo myTaskInfo;
96
97 private final int maxPoolSize;
98
99 private final int tcpBacklog;
100
101 private final RequestServerHandler.Factory requestServerHandlerFactory;
102
103
104 private SaslServerHandler.Factory saslServerHandlerFactory;
105
106
107 private ServerBootstrap bootstrap;
108
109 private final InboundByteCounter inByteCounter = new InboundByteCounter();
110
111 private final OutboundByteCounter outByteCounter = new OutboundByteCounter();
112
113 private final int sendBufferSize;
114
115 private final int receiveBufferSize;
116
117 private final EventLoopGroup bossGroup;
118
119 private final EventLoopGroup workerGroup;
120
121 private final WorkerRequestReservedMap workerRequestReservedMap;
122
123 private final boolean useExecutionGroup;
124
125 private final EventExecutorGroup executionGroup;
126
127 private final String handlerToUseExecutionGroup;
128
129 private final Thread.UncaughtExceptionHandler exceptionHandler;
130
131
132
133
134
135
136
137
138
139
140
141 public NettyServer(ImmutableClassesGiraphConfiguration conf,
142 RequestServerHandler.Factory requestServerHandlerFactory,
143 TaskInfo myTaskInfo, Progressable progressable,
144 Thread.UncaughtExceptionHandler exceptionHandler) {
145 this.conf = conf;
146 this.progressable = progressable;
147 this.requestServerHandlerFactory = requestServerHandlerFactory;
148
149 this.saslServerHandlerFactory = new SaslServerHandler.Factory();
150
151 this.myTaskInfo = myTaskInfo;
152 this.exceptionHandler = exceptionHandler;
153 sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
154 receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
155
156 workerRequestReservedMap = new WorkerRequestReservedMap(conf);
157
158 maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
159
160 bossGroup = new NioEventLoopGroup(4,
161 ThreadUtils.createThreadFactory(
162 "netty-server-boss-%d", exceptionHandler));
163
164 workerGroup = new NioEventLoopGroup(maxPoolSize,
165 ThreadUtils.createThreadFactory(
166 "netty-server-worker-%d", exceptionHandler));
167
168 try {
169 this.localHostOrIp = conf.getLocalHostOrIp();
170 } catch (UnknownHostException e) {
171 throw new IllegalStateException("NettyServer: unable to get hostname");
172 }
173
174 tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
175 conf.getInt(GiraphConstants.MAX_WORKERS,
176 GiraphConstants.TCP_BACKLOG.getDefaultValue()));
177
178 handlerToUseExecutionGroup =
179 GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
180 useExecutionGroup =
181 GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
182 if (useExecutionGroup) {
183 int executionThreads = conf.getNettyServerExecutionThreads();
184 executionGroup = new DefaultEventExecutorGroup(executionThreads,
185 ThreadUtils.createThreadFactory(
186 "netty-server-exec-%d", exceptionHandler));
187 if (LOG.isInfoEnabled()) {
188 LOG.info("NettyServer: Using execution group with " +
189 executionThreads + " threads for " +
190 handlerToUseExecutionGroup + ".");
191 }
192 } else {
193 executionGroup = null;
194 }
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208 public NettyServer(ImmutableClassesGiraphConfiguration conf,
209 RequestServerHandler.Factory requestServerHandlerFactory,
210 TaskInfo myTaskInfo,
211 Progressable progressable,
212 SaslServerHandler.Factory saslServerHandlerFactory,
213 Thread.UncaughtExceptionHandler exceptionHandler) {
214 this(conf, requestServerHandlerFactory, myTaskInfo,
215 progressable, exceptionHandler);
216 this.saslServerHandlerFactory = saslServerHandlerFactory;
217 }
218
219
220
221
222
223
224 public InboundByteCounter getInByteCounter() {
225 return inByteCounter;
226 }
227
228
229
230
231 public void start() {
232 bootstrap = new ServerBootstrap();
233 bootstrap.group(bossGroup, workerGroup)
234 .channel(NioServerSocketChannel.class)
235 .option(ChannelOption.SO_BACKLOG, tcpBacklog)
236 .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
237 .childOption(ChannelOption.SO_KEEPALIVE, true)
238 .childOption(ChannelOption.TCP_NODELAY, true)
239 .childOption(ChannelOption.SO_SNDBUF, sendBufferSize)
240 .childOption(ChannelOption.SO_RCVBUF, receiveBufferSize)
241 .childOption(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
242 .childOption(ChannelOption.RCVBUF_ALLOCATOR,
243 new AdaptiveRecvByteBufAllocator(receiveBufferSize / 4,
244 receiveBufferSize, receiveBufferSize));
245
246
247
248
249
250 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
251 @Override
252 protected void initChannel(SocketChannel ch) throws Exception {
253
254 if (conf.authenticate()) {
255 LOG.info("start: Will use Netty pipeline with " +
256 "authentication and authorization of clients.");
257
258
259
260
261 PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
262 new FlushConsolidationHandler(FlushConsolidationHandler
263 .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
264 handlerToUseExecutionGroup, executionGroup, ch);
265 PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
266 inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
267 if (conf.doCompression()) {
268 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
269 conf.getNettyCompressionDecoder(),
270 handlerToUseExecutionGroup, executionGroup, ch);
271 }
272 PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
273 outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
274 if (conf.doCompression()) {
275 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
276 conf.getNettyCompressionEncoder(),
277 handlerToUseExecutionGroup, executionGroup, ch);
278 }
279 PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
280 new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
281 handlerToUseExecutionGroup, executionGroup, ch);
282 PipelineUtils.addLastWithExecutorCheck("requestDecoder",
283 new RequestDecoder(conf, inByteCounter),
284 handlerToUseExecutionGroup, executionGroup, ch);
285
286 PipelineUtils.addLastWithExecutorCheck("saslServerHandler",
287 saslServerHandlerFactory.newHandler(conf),
288 handlerToUseExecutionGroup, executionGroup, ch);
289 PipelineUtils.addLastWithExecutorCheck("authorizeServerHandler",
290 new AuthorizeServerHandler(), handlerToUseExecutionGroup,
291 executionGroup, ch);
292 PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
293 requestServerHandlerFactory.newHandler(workerRequestReservedMap,
294 conf, myTaskInfo, exceptionHandler),
295 handlerToUseExecutionGroup, executionGroup, ch);
296
297 PipelineUtils.addLastWithExecutorCheck("responseEncoder",
298 new ResponseEncoder(), handlerToUseExecutionGroup,
299 executionGroup, ch);
300 } else {
301 LOG.info("start: Using Netty without authentication.");
302
303
304
305
306 ch.pipeline().addLast("connectedChannels",
307 new ChannelInboundHandlerAdapter() {
308 @Override
309 public void channelActive(ChannelHandlerContext ctx)
310 throws Exception {
311 accepted.add(ctx.channel());
312 ctx.fireChannelActive();
313 }
314 });
315 PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
316 new FlushConsolidationHandler(FlushConsolidationHandler
317 .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
318 handlerToUseExecutionGroup, executionGroup, ch);
319 PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
320 inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
321 if (conf.doCompression()) {
322 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
323 conf.getNettyCompressionDecoder(),
324 handlerToUseExecutionGroup, executionGroup, ch);
325 }
326 PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
327 outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
328 if (conf.doCompression()) {
329 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
330 conf.getNettyCompressionEncoder(),
331 handlerToUseExecutionGroup, executionGroup, ch);
332 }
333 PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
334 new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
335 handlerToUseExecutionGroup, executionGroup, ch);
336 PipelineUtils.addLastWithExecutorCheck("requestDecoder",
337 new RequestDecoder(conf, inByteCounter),
338 handlerToUseExecutionGroup, executionGroup, ch);
339 PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
340 requestServerHandlerFactory.newHandler(
341 workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
342 handlerToUseExecutionGroup, executionGroup, ch);
343
344 }
345
346 }
347 });
348
349 int taskId = conf.getTaskPartition();
350 int numTasks = conf.getInt("mapred.map.tasks", 1);
351
352 int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
353 int portIncrementConstant =
354 (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
355 int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
356 int bindAttempts = 0;
357 final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
358 final boolean failFirstPortBindingAttempt =
359 GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
360
361
362
363
364
365 while (bindAttempts < maxIpcPortBindAttempts) {
366 this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
367 if (failFirstPortBindingAttempt && bindAttempts == 0) {
368 if (LOG.isInfoEnabled()) {
369 LOG.info("start: Intentionally fail first " +
370 "binding attempt as giraph.failFirstIpcPortBindAttempt " +
371 "is true, port " + bindPort);
372 }
373 ++bindAttempts;
374 bindPort += portIncrementConstant;
375 continue;
376 }
377
378 try {
379 ChannelFuture f = bootstrap.bind(myAddress).sync();
380 accepted.add(f.channel());
381 break;
382 } catch (InterruptedException e) {
383 throw new IllegalStateException(e);
384
385 } catch (Exception e) {
386
387 LOG.warn("start: Likely failed to bind on attempt " +
388 bindAttempts + " to port " + bindPort, e.getCause());
389 ++bindAttempts;
390 bindPort += portIncrementConstant;
391 }
392 }
393 if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
394 throw new IllegalStateException(
395 "start: Failed to start NettyServer with " +
396 bindAttempts + " attempts");
397 }
398
399 if (LOG.isInfoEnabled()) {
400 LOG.info("start: Started server " +
401 "communication server: " + myAddress + " with up to " +
402 maxPoolSize + " threads on bind attempt " + bindAttempts +
403 " with sendBufferSize = " + sendBufferSize +
404 " receiveBufferSize = " + receiveBufferSize);
405 }
406 }
407
408
409
410
411 public void stop() {
412 if (LOG.isInfoEnabled()) {
413 LOG.info("stop: Halting netty server");
414 }
415 ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
416 if (LOG.isInfoEnabled()) {
417 LOG.info("stop: Start releasing resources");
418 }
419 bossGroup.shutdownGracefully();
420 workerGroup.shutdownGracefully();
421 ProgressableUtils.awaitTerminationFuture(bossGroup, progressable);
422 ProgressableUtils.awaitTerminationFuture(workerGroup, progressable);
423 if (useExecutionGroup) {
424 executionGroup.shutdownGracefully();
425 ProgressableUtils.awaitTerminationFuture(executionGroup, progressable);
426 }
427 if (LOG.isInfoEnabled()) {
428 LOG.info("stop: Netty server halted");
429 }
430 }
431
432 public InetSocketAddress getMyAddress() {
433 return myAddress;
434 }
435
436 public String getLocalHostOrIp() {
437 return localHostOrIp;
438 }
439
440
441
442
443
444
445 public void setFlowControl(FlowControl flowControl) {
446 checkState(requestServerHandlerFactory != null);
447 requestServerHandlerFactory.setFlowControl(flowControl);
448 }
449 }
450