1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import org.apache.giraph.comm.netty.NettyClient;
22import org.apache.giraph.comm.netty.SaslNettyClient;
23import org.apache.giraph.comm.requests.RequestType;
24import org.apache.giraph.comm.requests.SaslCompleteRequest;
25import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
26import org.apache.giraph.comm.requests.WritableRequest;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.util.ReflectionUtils;
29import org.apache.log4j.Logger;
30import io.netty.buffer.ByteBuf;
31import io.netty.buffer.ByteBufInputStream;
32import io.netty.channel.ChannelHandlerContext;
33import io.netty.channel.ChannelInboundHandlerAdapter;
34import io.netty.handler.codec.FixedLengthFrameDecoder;
35import io.netty.util.ReferenceCountUtil;
3637import java.io.IOException;
3839/**40 * Client-side Netty pipeline component that allows authentication with a41 * server.42 */43publicclassSaslClientHandlerextends ChannelInboundHandlerAdapter {
44/** Class logger */45privatestaticfinal Logger LOG = Logger.getLogger(SaslClientHandler.class);
46/** Configuration */47privatefinal Configuration conf;
4849/**50 * Constructor.51 *52 * @param conf Configuration53 */54publicSaslClientHandler(Configuration conf) {
55this.conf = conf;
56 }
5758 @Override
59publicvoid channelRead(ChannelHandlerContext ctx, Object msg)
60throws Exception {
61WritableRequest decodedMessage = decode(ctx, msg);
62// Generate SASL response to server using Channel-local SASL client.63SaslNettyClient saslNettyClient = ctx.attr(NettyClient.SASL).get();
64if (saslNettyClient == null) {
65thrownew Exception("handleUpstream: saslNettyClient was unexpectedly " +
66"null for channel: " + ctx.channel());
67 }
68if (decodedMessage.getClass() == SaslCompleteRequest.class) {
69if (LOG.isDebugEnabled()) {
70 LOG.debug("handleUpstream: Server has sent us the SaslComplete " +
71"message. Allowing normal work to proceed.");
72 }
73synchronized (saslNettyClient.getAuthenticated()) {
74 saslNettyClient.getAuthenticated().notify();
75 }
76if (!saslNettyClient.isComplete()) {
77 LOG.error("handleUpstream: Server returned a Sasl-complete message, " +
78"but as far as we can tell, we are not authenticated yet.");
79thrownew Exception("handleUpstream: Server returned a " +
80"Sasl-complete message, but as far as " +
81"we can tell, we are not authenticated yet.");
82 }
83// Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder84// from client pipeline.85 ctx.pipeline().remove(this);
86 ctx.pipeline().replace("length-field-based-frame-decoder",
87"fixed-length-frame-decoder",
88new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
89return;
90 }
91SaslTokenMessageRequest serverToken =
92 (SaslTokenMessageRequest) decodedMessage;
93if (LOG.isDebugEnabled()) {
94 LOG.debug("handleUpstream: Responding to server's token of length: " +
95 serverToken.getSaslToken().length);
96 }
97// Generate SASL response (but we only actually send the response if it's98// non-null.99 byte[] responseToServer = saslNettyClient.saslResponse(serverToken);
100if (responseToServer == null) {
101// If we generate a null response, then authentication has completed (if102// not, warn), and return without sending a response back to the server.103if (LOG.isDebugEnabled()) {
104 LOG.debug("handleUpstream: Response to server is null: " +
105"authentication should now be complete.");
106 }
107if (!saslNettyClient.isComplete()) {
108 LOG.warn("handleUpstream: Generated a null response, " +
109"but authentication is not complete.");
110 }
111return;
112 } else {
113if (LOG.isDebugEnabled()) {
114 LOG.debug("handleUpstream: Response to server token has length:" +
115 responseToServer.length);
116 }
117 }
118// Construct a message containing the SASL response and send it to the119// server.120SaslTokenMessageRequest saslResponse =
121newSaslTokenMessageRequest(responseToServer);
122 ctx.channel().writeAndFlush(saslResponse);
123 }
124125/**126 * Decode the message read by handler127 *128 * @param ctx channel handler context129 * @param msg message to decode into a writable request130 * @return decoded writablerequest object131 * @throws Exception132 */133protectedWritableRequest decode(ChannelHandlerContext ctx, Object msg)
134throws Exception {
135if (!(msg instanceof ByteBuf)) {
136thrownew IllegalStateException("decode: Got illegal message " + msg);
137 }
138// Decode msg into an object whose class C implements WritableRequest:139// C will be either SaslTokenMessage or SaslComplete.140//141// 1. Convert message to a stream that can be decoded.142 ByteBuf buf = (ByteBuf) msg;
143 ByteBufInputStream inputStream = new ByteBufInputStream(buf);
144// 2. Get first byte: message type:145int enumValue = inputStream.readByte();
146RequestType type = RequestType.values()[enumValue];
147if (LOG.isDebugEnabled()) {
148 LOG.debug("decode: Got a response of type " + type + " from server:" +
149 ctx.channel().remoteAddress());
150 }
151// 3. Create object of the type determined in step 2.152 Class<? extends WritableRequest> writableRequestClass =
153 type.getRequestClass();
154WritableRequest serverResponse =
155 ReflectionUtils.newInstance(writableRequestClass, conf);
156// 4. Deserialize the inputStream's contents into the newly-constructed157// serverResponse object.158try {
159 serverResponse.readFields(inputStream);
160 } catch (IOException e) {
161 LOG.error("decode: Exception when trying to read server response: " + e);
162 }
163 ReferenceCountUtil.release(buf);
164// serverResponse can now be used in the next stage in pipeline.165return serverResponse;
166 }
167 }