This project has retired. For details please refer to its Attic page.
NettyServer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import io.netty.handler.flush.FlushConsolidationHandler;
22  import org.apache.giraph.comm.flow_control.FlowControl;
23  /*if_not[HADOOP_NON_SECURE]*/
24  import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
25  /*end[HADOOP_NON_SECURE]*/
26  import org.apache.giraph.comm.netty.handler.RequestDecoder;
27  import org.apache.giraph.comm.netty.handler.RequestServerHandler;
28  /*if_not[HADOOP_NON_SECURE]*/
29  import org.apache.giraph.comm.netty.handler.ResponseEncoder;
30  import org.apache.giraph.comm.netty.handler.SaslServerHandler;
31  /*end[HADOOP_NON_SECURE]*/
32  import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
33  import org.apache.giraph.conf.GiraphConstants;
34  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35  import org.apache.giraph.graph.TaskInfo;
36  import org.apache.giraph.utils.PipelineUtils;
37  import org.apache.giraph.utils.ProgressableUtils;
38  import org.apache.giraph.utils.ThreadUtils;
39  import org.apache.hadoop.util.Progressable;
40  import org.apache.log4j.Logger;
41  import io.netty.bootstrap.ServerBootstrap;
42  import io.netty.channel.group.ChannelGroup;
43  import io.netty.channel.group.DefaultChannelGroup;
44  import io.netty.channel.nio.NioEventLoopGroup;
45  import io.netty.channel.socket.SocketChannel;
46  import io.netty.channel.ChannelHandlerContext;
47  import io.netty.channel.EventLoopGroup;
48  import io.netty.channel.ChannelOption;
49  import io.netty.channel.ChannelInitializer;
50  import io.netty.channel.ChannelInboundHandlerAdapter;
51  import io.netty.channel.ChannelFuture;
52  import io.netty.channel.socket.nio.NioServerSocketChannel;
53  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
54  /*if_not[HADOOP_NON_SECURE]*/
55  import io.netty.util.AttributeKey;
56  /*end[HADOOP_NON_SECURE]*/
57  import io.netty.util.concurrent.DefaultEventExecutorGroup;
58  import io.netty.util.concurrent.EventExecutorGroup;
59  import io.netty.util.concurrent.ImmediateEventExecutor;
60  import io.netty.channel.AdaptiveRecvByteBufAllocator;
61  
62  import java.net.InetSocketAddress;
63  import java.net.UnknownHostException;
64  
65  import static com.google.common.base.Preconditions.checkState;
66  import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
67  
68  /**
69   * This server uses Netty and will implement all Giraph communication
70   */
71  public class NettyServer {
72    /** Default maximum thread pool size */
73    public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
74  
75  /*if_not[HADOOP_NON_SECURE]*/
76    /** Used to authenticate with netty clients */
77    public static final AttributeKey<SaslNettyServer>
78    CHANNEL_SASL_NETTY_SERVERS = AttributeKey.valueOf("channelSaslServers");
79  /*end[HADOOP_NON_SECURE]*/
80  
81    /** Class logger */
82    private static final Logger LOG = Logger.getLogger(NettyServer.class);
83    /** Configuration */
84    private final ImmutableClassesGiraphConfiguration conf;
85    /** Progressable for reporting progress */
86    private final Progressable progressable;
87    /** Accepted channels */
88    private final ChannelGroup accepted = new DefaultChannelGroup(
89        ImmediateEventExecutor.INSTANCE);
90    /** Local hostname */
91    private final String localHostOrIp;
92    /** Address of the server */
93    private InetSocketAddress myAddress;
94    /** Current task info */
95    private TaskInfo myTaskInfo;
96    /** Maximum number of threads */
97    private final int maxPoolSize;
98    /** TCP backlog */
99    private final int tcpBacklog;
100   /** Factory for {@link RequestServerHandler} */
101   private final RequestServerHandler.Factory requestServerHandlerFactory;
102 /*if_not[HADOOP_NON_SECURE]*/
103   /** Factory for {@link RequestServerHandler} */
104   private SaslServerHandler.Factory saslServerHandlerFactory;
105 /*end[HADOOP_NON_SECURE]*/
106   /** Server bootstrap */
107   private ServerBootstrap bootstrap;
108   /** Inbound byte counter for this client */
109   private final InboundByteCounter inByteCounter = new InboundByteCounter();
110   /** Outbound byte counter for this client */
111   private final OutboundByteCounter outByteCounter = new OutboundByteCounter();
112   /** Send buffer size */
113   private final int sendBufferSize;
114   /** Receive buffer size */
115   private final int receiveBufferSize;
116   /** Boss eventloop group */
117   private final EventLoopGroup bossGroup;
118   /** Worker eventloop group */
119   private final EventLoopGroup workerGroup;
120   /** Request completed map per worker */
121   private final WorkerRequestReservedMap workerRequestReservedMap;
122   /** Use execution group? */
123   private final boolean useExecutionGroup;
124   /** Execution handler (if used) */
125   private final EventExecutorGroup executionGroup;
126   /** Name of the handler before the execution handler (if used) */
127   private final String handlerToUseExecutionGroup;
128   /** Handles all uncaught exceptions in netty threads */
129   private final Thread.UncaughtExceptionHandler exceptionHandler;
130 
131 
132   /**
133    * Constructor for creating the server
134    *
135    * @param conf Configuration to use
136    * @param requestServerHandlerFactory Factory for request handlers
137    * @param myTaskInfo Current task info
138    * @param progressable Progressable for reporting progress
139    * @param exceptionHandler handle uncaught exceptions
140    */
141   public NettyServer(ImmutableClassesGiraphConfiguration conf,
142       RequestServerHandler.Factory requestServerHandlerFactory,
143       TaskInfo myTaskInfo, Progressable progressable,
144       Thread.UncaughtExceptionHandler exceptionHandler) {
145     this.conf = conf;
146     this.progressable = progressable;
147     this.requestServerHandlerFactory = requestServerHandlerFactory;
148 /*if_not[HADOOP_NON_SECURE]*/
149     this.saslServerHandlerFactory = new SaslServerHandler.Factory();
150 /*end[HADOOP_NON_SECURE]*/
151     this.myTaskInfo = myTaskInfo;
152     this.exceptionHandler = exceptionHandler;
153     sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
154     receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
155 
156     workerRequestReservedMap = new WorkerRequestReservedMap(conf);
157 
158     maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
159 
160     bossGroup = new NioEventLoopGroup(4,
161         ThreadUtils.createThreadFactory(
162             "netty-server-boss-%d", exceptionHandler));
163 
164     workerGroup = new NioEventLoopGroup(maxPoolSize,
165         ThreadUtils.createThreadFactory(
166             "netty-server-worker-%d", exceptionHandler));
167 
168     try {
169       this.localHostOrIp = conf.getLocalHostOrIp();
170     } catch (UnknownHostException e) {
171       throw new IllegalStateException("NettyServer: unable to get hostname");
172     }
173 
174     tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
175         conf.getInt(GiraphConstants.MAX_WORKERS,
176             GiraphConstants.TCP_BACKLOG.getDefaultValue()));
177 
178     handlerToUseExecutionGroup =
179         GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
180     useExecutionGroup =
181         GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
182     if (useExecutionGroup) {
183       int executionThreads = conf.getNettyServerExecutionThreads();
184       executionGroup = new DefaultEventExecutorGroup(executionThreads,
185           ThreadUtils.createThreadFactory(
186               "netty-server-exec-%d", exceptionHandler));
187       if (LOG.isInfoEnabled()) {
188         LOG.info("NettyServer: Using execution group with " +
189             executionThreads + " threads for " +
190             handlerToUseExecutionGroup + ".");
191       }
192     } else {
193       executionGroup = null;
194     }
195   }
196 
197 /*if_not[HADOOP_NON_SECURE]*/
198   /**
199    * Constructor for creating the server
200    *
201    * @param conf Configuration to use
202    * @param requestServerHandlerFactory Factory for request handlers
203    * @param myTaskInfo Current task info
204    * @param progressable Progressable for reporting progress
205    * @param saslServerHandlerFactory  Factory for SASL handlers
206    * @param exceptionHandler handle uncaught exceptions
207    */
208   public NettyServer(ImmutableClassesGiraphConfiguration conf,
209                      RequestServerHandler.Factory requestServerHandlerFactory,
210                      TaskInfo myTaskInfo,
211                      Progressable progressable,
212                      SaslServerHandler.Factory saslServerHandlerFactory,
213                      Thread.UncaughtExceptionHandler exceptionHandler) {
214     this(conf, requestServerHandlerFactory, myTaskInfo,
215         progressable, exceptionHandler);
216     this.saslServerHandlerFactory = saslServerHandlerFactory;
217   }
218 /*end[HADOOP_NON_SECURE]*/
219 
220   /**
221    * Returns a handle on the in-bound byte counter.
222    * @return The {@link InboundByteCounter} object for this server.
223    */
224   public InboundByteCounter getInByteCounter() {
225     return inByteCounter;
226   }
227 
228   /**
229    * Start the server with the appropriate port
230    */
231   public void start() {
232     bootstrap = new ServerBootstrap();
233     bootstrap.group(bossGroup, workerGroup)
234         .channel(NioServerSocketChannel.class)
235         .option(ChannelOption.SO_BACKLOG, tcpBacklog)
236         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
237         .childOption(ChannelOption.SO_KEEPALIVE, true)
238         .childOption(ChannelOption.TCP_NODELAY, true)
239         .childOption(ChannelOption.SO_SNDBUF, sendBufferSize)
240         .childOption(ChannelOption.SO_RCVBUF, receiveBufferSize)
241         .childOption(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
242         .childOption(ChannelOption.RCVBUF_ALLOCATOR,
243             new AdaptiveRecvByteBufAllocator(receiveBufferSize / 4,
244                 receiveBufferSize, receiveBufferSize));
245 
246     /**
247      * Pipeline setup: depends on whether configured to use authentication
248      * or not.
249      */
250     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
251       @Override
252       protected void initChannel(SocketChannel ch) throws Exception {
253 /*if_not[HADOOP_NON_SECURE]*/
254         if (conf.authenticate()) {
255           LOG.info("start: Will use Netty pipeline with " +
256               "authentication and authorization of clients.");
257           // After a client authenticates, the two authentication-specific
258           // pipeline components SaslServerHandler and ResponseEncoder are
259           // removed, leaving the pipeline the same as in the non-authenticated
260           // configuration except for the presence of the Authorize component.
261           PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
262             new FlushConsolidationHandler(FlushConsolidationHandler
263               .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
264             handlerToUseExecutionGroup, executionGroup, ch);
265           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
266               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
267           if (conf.doCompression()) {
268             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
269                 conf.getNettyCompressionDecoder(),
270                 handlerToUseExecutionGroup, executionGroup, ch);
271           }
272           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
273               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
274           if (conf.doCompression()) {
275             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
276                 conf.getNettyCompressionEncoder(),
277                 handlerToUseExecutionGroup, executionGroup, ch);
278           }
279           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
280               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
281               handlerToUseExecutionGroup, executionGroup, ch);
282           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
283               new RequestDecoder(conf, inByteCounter),
284               handlerToUseExecutionGroup, executionGroup, ch);
285           // Removed after authentication completes:
286           PipelineUtils.addLastWithExecutorCheck("saslServerHandler",
287               saslServerHandlerFactory.newHandler(conf),
288               handlerToUseExecutionGroup, executionGroup, ch);
289           PipelineUtils.addLastWithExecutorCheck("authorizeServerHandler",
290               new AuthorizeServerHandler(), handlerToUseExecutionGroup,
291               executionGroup, ch);
292           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
293               requestServerHandlerFactory.newHandler(workerRequestReservedMap,
294                   conf, myTaskInfo, exceptionHandler),
295               handlerToUseExecutionGroup, executionGroup, ch);
296           // Removed after authentication completes:
297           PipelineUtils.addLastWithExecutorCheck("responseEncoder",
298               new ResponseEncoder(), handlerToUseExecutionGroup,
299               executionGroup, ch);
300         } else {
301           LOG.info("start: Using Netty without authentication.");
302 /*end[HADOOP_NON_SECURE]*/
303           // Store all connected channels in order to ensure that we can close
304           // them on stop(), or else stop() may hang waiting for the
305           // connections to close on their own
306           ch.pipeline().addLast("connectedChannels",
307               new ChannelInboundHandlerAdapter() {
308                 @Override
309                 public void channelActive(ChannelHandlerContext ctx)
310                   throws Exception {
311                   accepted.add(ctx.channel());
312                   ctx.fireChannelActive();
313                 }
314               });
315           PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
316             new FlushConsolidationHandler(FlushConsolidationHandler
317               .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
318             handlerToUseExecutionGroup, executionGroup, ch);
319           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
320               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
321           if (conf.doCompression()) {
322             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
323                 conf.getNettyCompressionDecoder(),
324                 handlerToUseExecutionGroup, executionGroup, ch);
325           }
326           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
327               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
328           if (conf.doCompression()) {
329             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
330                 conf.getNettyCompressionEncoder(),
331                 handlerToUseExecutionGroup, executionGroup, ch);
332           }
333           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
334               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
335               handlerToUseExecutionGroup, executionGroup, ch);
336           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
337               new RequestDecoder(conf, inByteCounter),
338               handlerToUseExecutionGroup, executionGroup, ch);
339           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
340               requestServerHandlerFactory.newHandler(
341                   workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
342               handlerToUseExecutionGroup, executionGroup, ch);
343 /*if_not[HADOOP_NON_SECURE]*/
344         }
345 /*end[HADOOP_NON_SECURE]*/
346       }
347     });
348 
349     int taskId = conf.getTaskPartition();
350     int numTasks = conf.getInt("mapred.map.tasks", 1);
351     // Number of workers + 1 for master
352     int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
353     int portIncrementConstant =
354         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
355     int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
356     int bindAttempts = 0;
357     final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
358     final boolean failFirstPortBindingAttempt =
359         GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
360 
361     // Simple handling of port collisions on the same machine while
362     // preserving debugability from the port number alone.
363     // Round up the max number of workers to the next power of 10 and use
364     // it as a constant to increase the port number with.
365     while (bindAttempts < maxIpcPortBindAttempts) {
366       this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
367       if (failFirstPortBindingAttempt && bindAttempts == 0) {
368         if (LOG.isInfoEnabled()) {
369           LOG.info("start: Intentionally fail first " +
370               "binding attempt as giraph.failFirstIpcPortBindAttempt " +
371               "is true, port " + bindPort);
372         }
373         ++bindAttempts;
374         bindPort += portIncrementConstant;
375         continue;
376       }
377 
378       try {
379         ChannelFuture f = bootstrap.bind(myAddress).sync();
380         accepted.add(f.channel());
381         break;
382       } catch (InterruptedException e) {
383         throw new IllegalStateException(e);
384         // CHECKSTYLE: stop IllegalCatchCheck
385       } catch (Exception e) {
386         // CHECKSTYLE: resume IllegalCatchCheck
387         LOG.warn("start: Likely failed to bind on attempt " +
388             bindAttempts + " to port " + bindPort, e.getCause());
389         ++bindAttempts;
390         bindPort += portIncrementConstant;
391       }
392     }
393     if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
394       throw new IllegalStateException(
395           "start: Failed to start NettyServer with " +
396               bindAttempts + " attempts");
397     }
398 
399     if (LOG.isInfoEnabled()) {
400       LOG.info("start: Started server " +
401           "communication server: " + myAddress + " with up to " +
402           maxPoolSize + " threads on bind attempt " + bindAttempts +
403           " with sendBufferSize = " + sendBufferSize +
404           " receiveBufferSize = " + receiveBufferSize);
405     }
406   }
407 
408   /**
409    * Stop the server.
410    */
411   public void stop() {
412     if (LOG.isInfoEnabled()) {
413       LOG.info("stop: Halting netty server");
414     }
415     ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
416     if (LOG.isInfoEnabled()) {
417       LOG.info("stop: Start releasing resources");
418     }
419     bossGroup.shutdownGracefully();
420     workerGroup.shutdownGracefully();
421     ProgressableUtils.awaitTerminationFuture(bossGroup, progressable);
422     ProgressableUtils.awaitTerminationFuture(workerGroup, progressable);
423     if (useExecutionGroup) {
424       executionGroup.shutdownGracefully();
425       ProgressableUtils.awaitTerminationFuture(executionGroup, progressable);
426     }
427     if (LOG.isInfoEnabled()) {
428       LOG.info("stop: Netty server halted");
429     }
430   }
431 
432   public InetSocketAddress getMyAddress() {
433     return myAddress;
434   }
435 
436   public String getLocalHostOrIp() {
437     return localHostOrIp;
438   }
439 
440   /**
441    * Inform the server about the flow control policy used in sending requests
442    *
443    * @param flowControl reference to the flow control used
444    */
445   public void setFlowControl(FlowControl flowControl) {
446     checkState(requestServerHandlerFactory != null);
447     requestServerHandlerFactory.setFlowControl(flowControl);
448   }
449 }
450