This project has retired. For details please refer to its
Attic page.
NettyClient xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import io.netty.handler.flush.FlushConsolidationHandler;
22 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
23 import org.apache.giraph.comm.flow_control.FlowControl;
24 import org.apache.giraph.comm.flow_control.NoOpFlowControl;
25 import org.apache.giraph.comm.flow_control.StaticFlowControl;
26 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
27 import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
28 import org.apache.giraph.comm.netty.handler.ClientRequestId;
29 import org.apache.giraph.comm.netty.handler.RequestEncoder;
30 import org.apache.giraph.comm.netty.handler.RequestInfo;
31 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
32 import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
33
34 import org.apache.giraph.comm.netty.handler.SaslClientHandler;
35 import org.apache.giraph.comm.requests.RequestType;
36 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
37
38 import org.apache.giraph.comm.requests.WritableRequest;
39 import org.apache.giraph.conf.BooleanConfOption;
40 import org.apache.giraph.conf.GiraphConstants;
41 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
42 import org.apache.giraph.counters.GiraphHadoopCounter;
43 import org.apache.giraph.function.Predicate;
44 import org.apache.giraph.graph.TaskInfo;
45 import org.apache.giraph.master.MasterInfo;
46 import org.apache.giraph.utils.PipelineUtils;
47 import org.apache.giraph.utils.ProgressableUtils;
48 import org.apache.giraph.utils.ThreadUtils;
49 import org.apache.giraph.utils.TimedLogger;
50 import org.apache.hadoop.mapreduce.Mapper;
51 import org.apache.log4j.Logger;
52
53 import com.google.common.collect.Lists;
54 import com.google.common.collect.MapMaker;
55 import com.google.common.collect.Maps;
56
57
58 import java.io.IOException;
59
60 import java.net.InetSocketAddress;
61 import java.util.Collection;
62 import java.util.Collections;
63 import java.util.Comparator;
64 import java.util.HashMap;
65 import java.util.HashSet;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Set;
69 import java.util.concurrent.ConcurrentMap;
70 import java.util.concurrent.atomic.AtomicInteger;
71 import java.util.concurrent.atomic.AtomicLong;
72
73 import io.netty.bootstrap.Bootstrap;
74 import io.netty.channel.Channel;
75 import io.netty.channel.ChannelFuture;
76 import io.netty.channel.ChannelFutureListener;
77 import io.netty.channel.ChannelHandlerContext;
78 import io.netty.channel.ChannelInitializer;
79 import io.netty.channel.ChannelOption;
80 import io.netty.channel.EventLoopGroup;
81 import io.netty.channel.nio.NioEventLoopGroup;
82 import io.netty.channel.socket.SocketChannel;
83 import io.netty.channel.socket.nio.NioSocketChannel;
84 import io.netty.handler.codec.FixedLengthFrameDecoder;
85
86 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
87 import io.netty.util.AttributeKey;
88
89 import io.netty.util.concurrent.BlockingOperationException;
90 import io.netty.util.concurrent.DefaultEventExecutorGroup;
91 import io.netty.util.concurrent.EventExecutorGroup;
92
93 import static com.google.common.base.Preconditions.checkState;
94 import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
95 import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
96 import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
97 import static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;
98 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;
99 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
100 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
101 import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
102 import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
103 import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
104 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
105
106
107
108
109 public class NettyClient {
110
111 public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
112 new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
113 "Whether to have a limit on number of open requests or not");
114
115
116
117
118
119
120
121
122
123 public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER =
124 new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
125 "Whether to have a limit on number of open requests for each worker" +
126 "or not");
127
128 public static final int MAX_REQUESTS_TO_LIST = 10;
129
130
131
132
133 public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;
134
135 public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
136
137
138 public static final AttributeKey<SaslNettyClient> SASL =
139 AttributeKey.valueOf("saslNettyClient");
140
141
142
143 public static final String NETTY_COUNTERS_GROUP = "Netty counters";
144
145 public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
146 "Network requests resent for timeout";
147
148 public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
149 "Network requests resent for channel failure";
150
151 public static final String
152 NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
153 "Network requests resent for connection or request failure";
154
155
156 private static final Logger LOG = Logger.getLogger(NettyClient.class);
157
158 private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
159 new HashMap<>();
160
161 private final Mapper<?, ?, ?, ?>.Context context;
162
163 private final Bootstrap bootstrap;
164
165
166
167
168 private final ConcurrentMap<InetSocketAddress, ChannelRotater>
169 addressChannelMap = new MapMaker().makeMap();
170
171
172
173 private final Map<Integer, InetSocketAddress> taskIdAddressMap =
174 new MapMaker().makeMap();
175
176
177
178 private final ConcurrentMap<ClientRequestId, RequestInfo>
179 clientRequestIdRequestInfoMap;
180
181 private final int channelsPerServer;
182
183 private final InboundByteCounter inboundByteCounter = new
184 InboundByteCounter();
185
186 private final OutboundByteCounter outboundByteCounter = new
187 OutboundByteCounter();
188
189 private final int sendBufferSize;
190
191 private final int receiveBufferSize;
192
193 private final float requestSizeWarningThreshold;
194
195 private final int maxConnectionFailures;
196
197 private final long waitTimeBetweenConnectionRetriesMs;
198
199 private final int maxRequestMilliseconds;
200
201
202
203
204 private final boolean resendTimedOutRequests;
205
206 private final int waitingRequestMsecs;
207
208 private final TimedLogger requestLogger;
209
210 private final EventLoopGroup workerGroup;
211
212 private final TaskRequestIdGenerator taskRequestIdGenerator =
213 new TaskRequestIdGenerator();
214
215 private final TaskInfo myTaskInfo;
216
217 private final int maxPoolSize;
218
219 private final int maxResolveAddressAttempts;
220
221 private final boolean useExecutionGroup;
222
223 private final EventExecutorGroup executionGroup;
224
225 private final String handlerToUseExecutionGroup;
226
227 private final AtomicLong lastTimeCheckedRequestsForProblems =
228 new AtomicLong(0);
229
230
231
232
233 private final LogOnErrorChannelFutureListener logErrorListener =
234 new LogOnErrorChannelFutureListener();
235
236 private final FlowControl flowControl;
237
238
239 private final GiraphHadoopCounter networkRequestsResentForTimeout;
240
241 private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
242
243 private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
244
245
246
247
248
249 private int reconnectFailures = 0;
250
251
252
253
254
255
256
257
258
259
260 public NettyClient(Mapper<?, ?, ?, ?>.Context context,
261 final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
262 final Thread.UncaughtExceptionHandler exceptionHandler) {
263
264 this.context = context;
265 this.myTaskInfo = myTaskInfo;
266 this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
267 sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
268 receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
269 this.requestSizeWarningThreshold =
270 GiraphConstants.REQUEST_SIZE_WARNING_THRESHOLD.get(conf);
271
272 boolean limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
273 boolean limitOpenRequestsPerWorker =
274 LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
275 checkState(!limitNumberOfOpenRequests || !limitOpenRequestsPerWorker,
276 "NettyClient: it is not allowed to have both limitations on the " +
277 "number of total open requests, and on the number of open " +
278 "requests per worker!");
279 if (limitNumberOfOpenRequests) {
280 flowControl = new StaticFlowControl(conf, this);
281 } else if (limitOpenRequestsPerWorker) {
282 flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
283 } else {
284 flowControl = new NoOpFlowControl(this);
285 }
286
287 initialiseCounters();
288 networkRequestsResentForTimeout =
289 new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
290 NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
291 networkRequestsResentForChannelFailure =
292 new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
293 NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
294 networkRequestsResentForConnectionFailure =
295 new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
296 NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
297
298 maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
299 resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
300 maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
301 waitTimeBetweenConnectionRetriesMs =
302 WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
303 waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
304 requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
305 maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
306 maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
307
308 clientRequestIdRequestInfoMap =
309 new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
310
311 handlerToUseExecutionGroup =
312 NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
313 useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
314 if (useExecutionGroup) {
315 int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
316 executionGroup = new DefaultEventExecutorGroup(executionThreads,
317 ThreadUtils.createThreadFactory(
318 "netty-client-exec-%d", exceptionHandler));
319 if (LOG.isInfoEnabled()) {
320 LOG.info("NettyClient: Using execution handler with " +
321 executionThreads + " threads after " +
322 handlerToUseExecutionGroup + ".");
323 }
324 } else {
325 executionGroup = null;
326 }
327
328 workerGroup = new NioEventLoopGroup(maxPoolSize,
329 ThreadUtils.createThreadFactory(
330 "netty-client-worker-%d", exceptionHandler));
331
332 bootstrap = new Bootstrap();
333 bootstrap.group(workerGroup)
334 .channel(NioSocketChannel.class)
335 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
336 MAX_CONNECTION_MILLISECONDS_DEFAULT)
337 .option(ChannelOption.TCP_NODELAY, true)
338 .option(ChannelOption.SO_KEEPALIVE, true)
339 .option(ChannelOption.SO_SNDBUF, sendBufferSize)
340 .option(ChannelOption.SO_RCVBUF, receiveBufferSize)
341 .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
342 .handler(new ChannelInitializer<SocketChannel>() {
343 @Override
344 protected void initChannel(SocketChannel ch) throws Exception {
345
346 if (conf.authenticate()) {
347 LOG.info("Using Netty with authentication.");
348 PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
349 new FlushConsolidationHandler(FlushConsolidationHandler
350 .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
351 handlerToUseExecutionGroup, executionGroup, ch);
352
353
354
355
356
357
358
359 PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
360 inboundByteCounter, handlerToUseExecutionGroup,
361 executionGroup, ch);
362 if (conf.doCompression()) {
363 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
364 conf.getNettyCompressionDecoder(),
365 handlerToUseExecutionGroup, executionGroup, ch);
366 }
367 PipelineUtils.addLastWithExecutorCheck(
368 "clientOutboundByteCounter",
369 outboundByteCounter, handlerToUseExecutionGroup,
370 executionGroup, ch);
371 if (conf.doCompression()) {
372 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
373 conf.getNettyCompressionEncoder(),
374 handlerToUseExecutionGroup, executionGroup, ch);
375 }
376
377
378
379
380
381 PipelineUtils.addLastWithExecutorCheck(
382 "length-field-based-frame-decoder",
383 new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
384 handlerToUseExecutionGroup, executionGroup, ch);
385 PipelineUtils.addLastWithExecutorCheck("request-encoder",
386 new RequestEncoder(conf), handlerToUseExecutionGroup,
387 executionGroup, ch);
388
389
390
391
392
393
394 PipelineUtils.addLastWithExecutorCheck("sasl-client-handler",
395 new SaslClientHandler(conf), handlerToUseExecutionGroup,
396 executionGroup, ch);
397 PipelineUtils.addLastWithExecutorCheck("response-handler",
398 new ResponseClientHandler(NettyClient.this, conf),
399 handlerToUseExecutionGroup, executionGroup, ch);
400 } else {
401 LOG.info("Using Netty without authentication.");
402
403 PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
404 new FlushConsolidationHandler(FlushConsolidationHandler
405 .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
406 handlerToUseExecutionGroup, executionGroup, ch);
407 PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
408 inboundByteCounter, handlerToUseExecutionGroup,
409 executionGroup, ch);
410 if (conf.doCompression()) {
411 PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
412 conf.getNettyCompressionDecoder(),
413 handlerToUseExecutionGroup, executionGroup, ch);
414 }
415 PipelineUtils.addLastWithExecutorCheck(
416 "clientOutboundByteCounter",
417 outboundByteCounter, handlerToUseExecutionGroup,
418 executionGroup, ch);
419 if (conf.doCompression()) {
420 PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
421 conf.getNettyCompressionEncoder(),
422 handlerToUseExecutionGroup, executionGroup, ch);
423 }
424 PipelineUtils.addLastWithExecutorCheck(
425 "fixed-length-frame-decoder",
426 new FixedLengthFrameDecoder(
427 RequestServerHandler.RESPONSE_BYTES),
428 handlerToUseExecutionGroup, executionGroup, ch);
429 PipelineUtils.addLastWithExecutorCheck("request-encoder",
430 new RequestEncoder(conf), handlerToUseExecutionGroup,
431 executionGroup, ch);
432 PipelineUtils.addLastWithExecutorCheck("response-handler",
433 new ResponseClientHandler(NettyClient.this, conf),
434 handlerToUseExecutionGroup, executionGroup, ch);
435
436
437 }
438
439 }
440
441 @Override
442 public void channelUnregistered(ChannelHandlerContext ctx) throws
443 Exception {
444 super.channelUnregistered(ctx);
445 LOG.error("Channel failed " + ctx.channel());
446 checkRequestsAfterChannelFailure(ctx.channel());
447 }
448 });
449
450
451
452 ThreadUtils.startThread(new Runnable() {
453 @Override
454 public void run() {
455 while (true) {
456 ThreadUtils.trySleep(waitingRequestMsecs);
457 checkRequestsForProblems();
458 }
459 }
460 }, "open-requests-observer");
461 }
462
463
464
465
466
467 private void initialiseCounters() {
468 Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
469 NETTY_COUNTERS_GROUP, new HashSet<>());
470 counters.add(NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME);
471 counters.add(NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME);
472 counters.add(NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME);
473 COUNTER_GROUP_AND_NAMES.put(NETTY_COUNTERS_GROUP, counters);
474 }
475
476 public static Map<String, Set<String>> getCounterGroupsAndNames() {
477 return COUNTER_GROUP_AND_NAMES;
478 }
479
480
481
482
483
484
485
486 public boolean masterInvolved(int clientId) {
487 return myTaskInfo.getTaskId() == MasterInfo.MASTER_TASK_ID ||
488 clientId == MasterInfo.MASTER_TASK_ID;
489 }
490
491
492
493
494 private static class ChannelFutureAddress {
495
496 private final ChannelFuture future;
497
498 private final InetSocketAddress address;
499
500 private final Integer taskId;
501
502
503
504
505
506
507
508
509 ChannelFutureAddress(
510 ChannelFuture future, InetSocketAddress address, Integer taskId) {
511 this.future = future;
512 this.address = address;
513 this.taskId = taskId;
514 }
515
516 @Override
517 public String toString() {
518 return "(future=" + future + ",address=" + address + ",taskId=" +
519 taskId + ")";
520 }
521 }
522
523
524
525
526
527
528 public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
529 List<ChannelFutureAddress> waitingConnectionList =
530 Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
531 for (TaskInfo taskInfo : tasks) {
532 context.progress();
533 int taskId = taskInfo.getTaskId();
534 InetSocketAddress address = taskIdAddressMap.get(taskId);
535 if (address == null ||
536 !address.getHostName().equals(taskInfo.getHostname()) ||
537 address.getPort() != taskInfo.getPort()) {
538 address = resolveAddress(maxResolveAddressAttempts,
539 taskInfo.getHostOrIp(), taskInfo.getPort());
540 taskIdAddressMap.put(taskId, address);
541 }
542 if (address == null || address.getHostName() == null ||
543 address.getHostName().isEmpty()) {
544 throw new IllegalStateException("connectAllAddresses: Null address " +
545 "in addresses " + tasks);
546 }
547 if (address.isUnresolved()) {
548 throw new IllegalStateException("connectAllAddresses: Unresolved " +
549 "address " + address);
550 }
551
552 if (addressChannelMap.containsKey(address)) {
553 continue;
554 }
555
556
557 for (int i = 0; i < channelsPerServer; ++i) {
558 ChannelFuture connectionFuture = bootstrap.connect(address);
559
560 waitingConnectionList.add(
561 new ChannelFutureAddress(
562 connectionFuture, address, taskId));
563 }
564 }
565
566
567 int failures = 0;
568 int connected = 0;
569 while (failures < maxConnectionFailures) {
570 List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
571 boolean isFirstFailure = true;
572 for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
573 context.progress();
574 ChannelFuture future = waitingConnection.future;
575 ProgressableUtils.awaitChannelFuture(future, context);
576 if (!future.isSuccess() || !future.channel().isOpen()) {
577
578
579 if (isFirstFailure) {
580 isFirstFailure = false;
581 try {
582 Thread.sleep(waitTimeBetweenConnectionRetriesMs);
583 } catch (InterruptedException e) {
584 throw new IllegalStateException(
585 "connectAllAddresses: InterruptedException occurred", e);
586 }
587 }
588
589 LOG.warn("connectAllAddresses: Future failed " +
590 "to connect with " + waitingConnection.address + " with " +
591 failures + " failures because of " + future.cause());
592
593 ChannelFuture connectionFuture =
594 bootstrap.connect(waitingConnection.address);
595 nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
596 waitingConnection.address, waitingConnection.taskId));
597 ++failures;
598 } else {
599 Channel channel = future.channel();
600 if (LOG.isDebugEnabled()) {
601 LOG.debug("connectAllAddresses: Connected to " +
602 channel.remoteAddress() + ", open = " + channel.isOpen());
603 }
604
605 if (channel.remoteAddress() == null) {
606 throw new IllegalStateException(
607 "connectAllAddresses: Null remote address!");
608 }
609
610 ChannelRotater rotater =
611 addressChannelMap.get(waitingConnection.address);
612 if (rotater == null) {
613 ChannelRotater newRotater =
614 new ChannelRotater(waitingConnection.taskId,
615 waitingConnection.address);
616 rotater = addressChannelMap.putIfAbsent(
617 waitingConnection.address, newRotater);
618 if (rotater == null) {
619 rotater = newRotater;
620 }
621 }
622 rotater.addChannel(future.channel());
623 ++connected;
624 }
625 }
626 LOG.info("connectAllAddresses: Successfully added " +
627 (waitingConnectionList.size() - nextCheckFutures.size()) +
628 " connections, (" + connected + " total connected) " +
629 nextCheckFutures.size() + " failed, " +
630 failures + " failures total.");
631 if (nextCheckFutures.isEmpty()) {
632 break;
633 }
634 waitingConnectionList = nextCheckFutures;
635 }
636 if (failures >= maxConnectionFailures) {
637 throw new IllegalStateException(
638 "connectAllAddresses: Too many failures (" + failures + ").");
639 }
640 }
641
642
643
644
645
646 public void authenticate() {
647 LOG.info("authenticate: NettyClient starting authentication with " +
648 "servers.");
649 for (Map.Entry<InetSocketAddress, ChannelRotater> entry :
650 addressChannelMap.entrySet()) {
651 if (LOG.isDebugEnabled()) {
652 LOG.debug("authenticate: Authenticating with address:" +
653 entry.getKey());
654 }
655 ChannelRotater channelRotater = entry.getValue();
656 for (Channel channel: channelRotater.getChannels()) {
657 if (LOG.isDebugEnabled()) {
658 LOG.debug("authenticate: Authenticating with server on channel: " +
659 channel);
660 }
661 authenticateOnChannel(channelRotater.getTaskId(), channel);
662 }
663 }
664 if (LOG.isInfoEnabled()) {
665 LOG.info("authenticate: NettyClient successfully authenticated with " +
666 addressChannelMap.size() + " server" +
667 ((addressChannelMap.size() != 1) ? "s" : "") +
668 " - continuing with normal work.");
669 }
670 }
671
672
673
674
675
676
677
678 private void authenticateOnChannel(Integer taskId, Channel channel) {
679 try {
680 SaslNettyClient saslNettyClient = channel.attr(SASL).get();
681 if (channel.attr(SASL).get() == null) {
682 if (LOG.isDebugEnabled()) {
683 LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
684 "for channel: " + channel);
685 }
686 saslNettyClient = new SaslNettyClient();
687 channel.attr(SASL).set(saslNettyClient);
688 }
689 if (!saslNettyClient.isComplete()) {
690 if (LOG.isDebugEnabled()) {
691 LOG.debug("authenticateOnChannel: Waiting for authentication " +
692 "to complete..");
693 }
694 SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
695 sendWritableRequest(taskId, saslTokenMessage);
696
697
698 try {
699 synchronized (saslNettyClient.getAuthenticated()) {
700 while (!saslNettyClient.isComplete()) {
701 saslNettyClient.getAuthenticated().wait();
702 }
703 }
704 } catch (InterruptedException e) {
705 LOG.error("authenticateOnChannel: Interrupted while waiting for " +
706 "authentication.");
707 }
708 }
709 if (LOG.isDebugEnabled()) {
710 LOG.debug("authenticateOnChannel: Authentication on channel: " +
711 channel + " has completed successfully.");
712 }
713 } catch (IOException e) {
714 LOG.error("authenticateOnChannel: Failed to authenticate with server " +
715 "due to error: " + e);
716 }
717 return;
718 }
719
720
721
722
723
724 public void stop() {
725 if (LOG.isInfoEnabled()) {
726 LOG.info("stop: Halting netty client");
727 }
728
729
730
731 int channelCount = 0;
732 for (ChannelRotater channelRotater : addressChannelMap.values()) {
733 channelCount += channelRotater.size();
734 }
735 final int done = channelCount;
736 final AtomicInteger count = new AtomicInteger(0);
737 for (ChannelRotater channelRotater : addressChannelMap.values()) {
738 channelRotater.closeChannels(new ChannelFutureListener() {
739 @Override
740 public void operationComplete(ChannelFuture cf) {
741 context.progress();
742 if (count.incrementAndGet() == done) {
743 if (LOG.isInfoEnabled()) {
744 LOG.info("stop: reached wait threshold, " +
745 done + " connections closed, releasing " +
746 "resources now.");
747 }
748 workerGroup.shutdownGracefully();
749 if (executionGroup != null) {
750 executionGroup.shutdownGracefully();
751 }
752 }
753 }
754 });
755 }
756 ProgressableUtils.awaitTerminationFuture(workerGroup, context);
757 if (executionGroup != null) {
758 ProgressableUtils.awaitTerminationFuture(executionGroup, context);
759 }
760 if (LOG.isInfoEnabled()) {
761 LOG.info("stop: Netty client halted");
762 }
763 }
764
765
766
767
768
769
770
771 private Channel getNextChannel(InetSocketAddress remoteServer) {
772 Channel channel = addressChannelMap.get(remoteServer).nextChannel();
773 if (channel == null) {
774 LOG.warn("getNextChannel: No channel exists for " + remoteServer);
775 } else {
776
777 if (channel.isActive()) {
778 return channel;
779 }
780
781
782 if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
783 LOG.warn("getNextChannel: Unlikely event that the channel " +
784 channel + " was already removed!");
785 }
786 if (LOG.isInfoEnabled()) {
787 LOG.info("getNextChannel: Fixing disconnected channel to " +
788 remoteServer + ", open = " + channel.isOpen() + ", " +
789 "bound = " + channel.isRegistered());
790 }
791 }
792
793 while (reconnectFailures < maxConnectionFailures) {
794 ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
795 try {
796 ProgressableUtils.awaitChannelFuture(connectionFuture, context);
797 } catch (BlockingOperationException e) {
798 LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
799 }
800 if (connectionFuture.isSuccess()) {
801 if (LOG.isInfoEnabled()) {
802 LOG.info("getNextChannel: Connected to " + remoteServer + "!");
803 }
804 addressChannelMap.get(remoteServer).addChannel(
805 connectionFuture.channel());
806 return connectionFuture.channel();
807 }
808 ++reconnectFailures;
809 LOG.warn("getNextChannel: Failed to reconnect to " + remoteServer +
810 " on attempt " + reconnectFailures + " out of " +
811 maxConnectionFailures + " max attempts, sleeping for 5 secs",
812 connectionFuture.cause());
813 ThreadUtils.trySleep(5000);
814 }
815 throw new IllegalStateException("getNextChannel: Failed to connect " +
816 "to " + remoteServer + " in " + reconnectFailures +
817 " connect attempts");
818 }
819
820
821
822
823
824
825
826
827 public void sendWritableRequest(int destTaskId, WritableRequest request) {
828 flowControl.sendRequest(destTaskId, request);
829 }
830
831
832
833
834
835
836
837
838 public Long doSend(int destTaskId, WritableRequest request) {
839 InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
840 if (clientRequestIdRequestInfoMap.isEmpty()) {
841 inboundByteCounter.resetAll();
842 outboundByteCounter.resetAll();
843 }
844 boolean registerRequest = true;
845 Long requestId = null;
846
847 if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
848 registerRequest = false;
849 }
850
851
852 RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
853 if (registerRequest) {
854 request.setClientId(myTaskInfo.getTaskId());
855 requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
856 request.setRequestId(requestId);
857 ClientRequestId clientRequestId =
858 new ClientRequestId(destTaskId, request.getRequestId());
859 RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
860 clientRequestId, newRequestInfo);
861 if (oldRequestInfo != null) {
862 throw new IllegalStateException("sendWritableRequest: Impossible to " +
863 "have a previous request id = " + request.getRequestId() + ", " +
864 "request info of " + oldRequestInfo);
865 }
866 }
867 if (request.getSerializedSize() >
868 requestSizeWarningThreshold * sendBufferSize) {
869 LOG.warn("Creating large request of type " + request.getClass() +
870 ", size " + request.getSerializedSize() +
871 " bytes. Check netty buffer size.");
872 }
873 writeRequestToChannel(newRequestInfo);
874 return requestId;
875 }
876
877
878
879
880
881
882
883
884
885
886 private void writeRequestToChannel(RequestInfo requestInfo) {
887 Channel channel = getNextChannel(requestInfo.getDestinationAddress());
888 ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
889 requestInfo.setWriteFuture(writeFuture);
890 writeFuture.addListener(logErrorListener);
891 }
892
893
894
895
896
897
898
899
900
901 public void messageReceived(int senderId, long requestId, int response,
902 boolean shouldDrop) {
903 if (shouldDrop) {
904 synchronized (clientRequestIdRequestInfoMap) {
905 clientRequestIdRequestInfoMap.notifyAll();
906 }
907 return;
908 }
909 AckSignalFlag responseFlag = flowControl.getAckSignalFlag(response);
910 if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
911 LOG.info("messageReceived: Already completed request (taskId = " +
912 senderId + ", requestId = " + requestId + ")");
913 } else if (responseFlag != AckSignalFlag.NEW_REQUEST) {
914 throw new IllegalStateException(
915 "messageReceived: Got illegal response " + response);
916 }
917 RequestInfo requestInfo = clientRequestIdRequestInfoMap
918 .remove(new ClientRequestId(senderId, requestId));
919 if (requestInfo == null) {
920 LOG.info("messageReceived: Already received response for (taskId = " +
921 senderId + ", requestId = " + requestId + ")");
922 } else {
923 if (LOG.isDebugEnabled()) {
924 LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
925 requestInfo + ". Waiting on " +
926 clientRequestIdRequestInfoMap.size() + " requests");
927 }
928 flowControl.messageAckReceived(senderId, requestId, response);
929
930 synchronized (clientRequestIdRequestInfoMap) {
931 clientRequestIdRequestInfoMap.notifyAll();
932 }
933 }
934 }
935
936
937
938
939
940
941 public void waitAllRequests() {
942 flowControl.waitAllRequests();
943 checkState(flowControl.getNumberOfUnsentRequests() == 0);
944 while (clientRequestIdRequestInfoMap.size() > 0) {
945
946 synchronized (clientRequestIdRequestInfoMap) {
947 if (clientRequestIdRequestInfoMap.size() == 0) {
948 break;
949 }
950 try {
951 clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
952 } catch (InterruptedException e) {
953 throw new IllegalStateException("waitAllRequests: Got unexpected " +
954 "InterruptedException", e);
955 }
956 }
957 logAndSanityCheck();
958 }
959 if (LOG.isInfoEnabled()) {
960 LOG.info("waitAllRequests: Finished all requests. " +
961 inboundByteCounter.getMetrics() + "\n" + outboundByteCounter
962 .getMetrics());
963 }
964 }
965
966
967
968
969 public void logAndSanityCheck() {
970 logInfoAboutOpenRequests();
971
972 context.progress();
973 }
974
975
976
977
978 private void logInfoAboutOpenRequests() {
979 if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
980 LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
981 waitingRequestMsecs + " msecs, " +
982 clientRequestIdRequestInfoMap.size() +
983 " open requests, " + inboundByteCounter.getMetrics() + "\n" +
984 outboundByteCounter.getMetrics());
985
986 if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
987 for (Map.Entry<ClientRequestId, RequestInfo> entry :
988 clientRequestIdRequestInfoMap.entrySet()) {
989 LOG.info("logInfoAboutOpenRequests: Waiting for request " +
990 entry.getKey() + " - " + entry.getValue());
991 }
992 }
993
994
995 Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
996 for (ClientRequestId clientRequestId :
997 clientRequestIdRequestInfoMap.keySet()) {
998 int taskId = clientRequestId.getDestinationTaskId();
999 Integer currentCount = openRequestCounts.get(taskId);
1000 openRequestCounts.put(taskId,
1001 (currentCount == null ? 0 : currentCount) + 1);
1002 }
1003
1004 List<Map.Entry<Integer, Integer>> sorted =
1005 Lists.newArrayList(openRequestCounts.entrySet());
1006 Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
1007 @Override
1008 public int compare(Map.Entry<Integer, Integer> entry1,
1009 Map.Entry<Integer, Integer> entry2) {
1010 int value1 = entry1.getValue();
1011 int value2 = entry2.getValue();
1012 return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
1013 }
1014 });
1015
1016 StringBuilder message = new StringBuilder();
1017 message.append("logInfoAboutOpenRequests: ");
1018 int itemsToPrint =
1019 Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());
1020 for (int i = 0; i < itemsToPrint; i++) {
1021 message.append(sorted.get(i).getValue())
1022 .append(" requests for taskId=")
1023 .append(sorted.get(i).getKey())
1024 .append(", ");
1025 }
1026 LOG.info(message);
1027 flowControl.logInfo();
1028 }
1029 }
1030
1031
1032
1033
1034
1035 private void checkRequestsForProblems() {
1036 long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();
1037
1038 if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {
1039 return;
1040 }
1041
1042 if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,
1043 System.currentTimeMillis())) {
1044 return;
1045 }
1046 resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1047 @Override
1048 public boolean apply(RequestInfo requestInfo) {
1049
1050 return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
1051 }
1052 }, networkRequestsResentForTimeout, resendTimedOutRequests);
1053 resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1054 @Override
1055 public boolean apply(RequestInfo requestInfo) {
1056 ChannelFuture writeFuture = requestInfo.getWriteFuture();
1057
1058 return writeFuture != null && (!writeFuture.channel().isActive() ||
1059 (writeFuture.isDone() && !writeFuture.isSuccess()));
1060 }
1061 }, networkRequestsResentForConnectionFailure, true);
1062 }
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072 private void resendRequestsWhenNeeded(
1073 Predicate<RequestInfo> shouldResendRequestPredicate,
1074 GiraphHadoopCounter counter,
1075 boolean resendProblematicRequest) {
1076
1077
1078 List<ClientRequestId> addedRequestIds = Lists.newArrayList();
1079 List<RequestInfo> addedRequestInfos = Lists.newArrayList();
1080
1081 for (Map.Entry<ClientRequestId, RequestInfo> entry :
1082 clientRequestIdRequestInfoMap.entrySet()) {
1083 RequestInfo requestInfo = entry.getValue();
1084
1085 if (shouldResendRequestPredicate.apply(requestInfo)) {
1086 if (!resendProblematicRequest) {
1087 throw new IllegalStateException("Problem with request id " +
1088 entry.getKey() + " for " + requestInfo.getDestinationAddress() +
1089 ", failing the job");
1090 }
1091 ChannelFuture writeFuture = requestInfo.getWriteFuture();
1092 String logMessage;
1093 if (writeFuture == null) {
1094 logMessage = "wasn't sent successfully";
1095 } else {
1096 logMessage = "connected = " +
1097 writeFuture.channel().isActive() +
1098 ", future done = " + writeFuture.isDone() + ", " +
1099 "success = " + writeFuture.isSuccess() + ", " +
1100 "cause = " + writeFuture.cause() + ", " +
1101 "channelId = " + writeFuture.channel().hashCode();
1102 }
1103 LOG.warn("checkRequestsForProblems: Problem with request id " +
1104 entry.getKey() + ", " + logMessage + ", " +
1105 "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
1106 "destination = " + requestInfo.getDestinationAddress() +
1107 " " + requestInfo);
1108 addedRequestIds.add(entry.getKey());
1109 addedRequestInfos.add(new RequestInfo(
1110 requestInfo.getDestinationAddress(), requestInfo.getRequest()));
1111 counter.increment();
1112 }
1113 }
1114
1115
1116 for (int i = 0; i < addedRequestIds.size(); ++i) {
1117 ClientRequestId requestId = addedRequestIds.get(i);
1118 RequestInfo requestInfo = addedRequestInfos.get(i);
1119
1120 if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == null) {
1121 LOG.warn("checkRequestsForProblems: Request " + requestId +
1122 " completed prior to sending the next request");
1123 clientRequestIdRequestInfoMap.remove(requestId);
1124 }
1125 if (LOG.isInfoEnabled()) {
1126 LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
1127 }
1128 writeRequestToChannel(requestInfo);
1129 if (LOG.isInfoEnabled()) {
1130 LOG.info("checkRequestsForProblems: Request " + requestId +
1131 " was resent through channelId=" +
1132 requestInfo.getWriteFuture().channel().hashCode());
1133 }
1134 }
1135 addedRequestIds.clear();
1136 addedRequestInfos.clear();
1137 }
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150 private static InetSocketAddress resolveAddress(
1151 int maxResolveAddressAttempts, String hostOrIp, int port) {
1152 int resolveAttempts = 0;
1153 InetSocketAddress address = new InetSocketAddress(hostOrIp, port);
1154 while (address.isUnresolved() &&
1155 resolveAttempts < maxResolveAddressAttempts) {
1156 ++resolveAttempts;
1157 LOG.warn("resolveAddress: Failed to resolve " + address +
1158 " on attempt " + resolveAttempts + " of " +
1159 maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
1160 ThreadUtils.trySleep(5000);
1161 address = new InetSocketAddress(hostOrIp,
1162 address.getPort());
1163 }
1164 if (resolveAttempts >= maxResolveAddressAttempts) {
1165 throw new IllegalStateException("resolveAddress: Couldn't " +
1166 "resolve " + address + " in " + resolveAttempts + " tries.");
1167 }
1168 return address;
1169 }
1170
1171 public FlowControl getFlowControl() {
1172 return flowControl;
1173 }
1174
1175
1176
1177
1178
1179
1180
1181 public Long getNextRequestId(int taskId) {
1182 return taskRequestIdGenerator.getNextRequestId(taskId);
1183 }
1184
1185
1186
1187
1188 public int getNumberOfOpenRequests() {
1189 return clientRequestIdRequestInfoMap.size();
1190 }
1191
1192
1193
1194
1195
1196
1197 private void checkRequestsAfterChannelFailure(final Channel channel) {
1198 resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
1199 @Override
1200 public boolean apply(RequestInfo requestInfo) {
1201 if (requestInfo.getWriteFuture() == null ||
1202 requestInfo.getWriteFuture().channel() == null) {
1203 return false;
1204 }
1205 return requestInfo.getWriteFuture().channel().equals(channel);
1206 }
1207 }, networkRequestsResentForChannelFailure, true);
1208 }
1209
1210
1211
1212
1213
1214 private static class LogOnErrorChannelFutureListener
1215 implements ChannelFutureListener {
1216
1217 @Override
1218 public void operationComplete(ChannelFuture future) throws Exception {
1219 if (future.isDone() && !future.isSuccess()) {
1220 LOG.error("Channel failed channelId=" + future.channel().hashCode(),
1221 future.cause());
1222 }
1223 }
1224 }
1225 }