1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import org.apache.giraph.comm.netty.NettyServer;
22import org.apache.giraph.comm.netty.SaslNettyServer;
23import org.apache.giraph.comm.requests.RequestType;
24import org.apache.giraph.comm.requests.SaslCompleteRequest;
25import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
26import org.apache.giraph.comm.requests.WritableRequest;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.mapred.JobConf;
29import org.apache.hadoop.mapreduce.security.TokenCache;
30import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
31import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
32import org.apache.hadoop.security.Credentials;
33import org.apache.hadoop.security.UserGroupInformation;
34import org.apache.hadoop.security.token.Token;
35import org.apache.hadoop.security.token.TokenIdentifier;
36import org.apache.hadoop.util.ReflectionUtils;
37import org.apache.log4j.Logger;
38import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
3940import io.netty.channel.ChannelHandlerContext;
41import io.netty.channel.ChannelInboundHandlerAdapter;
4243import java.io.ByteArrayInputStream;
44import java.io.DataInputStream;
45import java.io.IOException;
46import java.util.Collection;
4748importstatic org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
4950/**51 * Generate SASL response tokens to client SASL tokens, allowing clients to52 * authenticate themselves with this server.53 */54publicclassSaslServerHandlerextends55 ChannelInboundHandlerAdapter {
56/** Class logger */57privatestaticfinal Logger LOG =
58 Logger.getLogger(SaslServerHandler.class);
5960// TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")61// or similar.62/** Already closed first request? */63privatestaticvolatileboolean ALREADY_CLOSED_FIRST_REQUEST = false;
6465/** Close connection on first request (used for simulating failure) */66privatefinalboolean closeFirstRequest;
67/** Used to store Hadoop Job Tokens to authenticate clients. */68private JobTokenSecretManager secretManager;
6970/**71 * Constructor72 *73 * @param conf Configuration74 */75publicSaslServerHandler(
76 Configuration conf) throws IOException {
77 SaslNettyServer.init(conf);
78 setupSecretManager(conf);
79 closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
80 }
8182 @Override
83publicvoid channelRead(ChannelHandlerContext ctx, Object msg)
84throws Exception {
8586if (LOG.isDebugEnabled()) {
87 LOG.debug("messageReceived: Got " + msg.getClass());
88 }
8990WritableRequest writableRequest = (WritableRequest) msg;
91// Simulate a closed connection on the first request (if desired)92// TODO: Move out into a separate, dedicated handler.93if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
94 LOG.info("messageReceived: Simulating closing channel on first " +
95"request " + writableRequest.getRequestId() + " from " +
96 writableRequest.getClientId());
97 setAlreadyClosedFirstRequest();
98 ctx.close();
99return;
100 }
101102if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
103// initialize server-side SASL functionality, if we haven't yet104// (in which case we are looking at the first SASL message from the105// client).106SaslNettyServer saslNettyServer =
107 ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
108if (saslNettyServer == null) {
109if (LOG.isDebugEnabled()) {
110 LOG.debug("No saslNettyServer for " + ctx.channel() +
111" yet; creating now, with secret manager: " + secretManager);
112 }
113try {
114 saslNettyServer = newSaslNettyServer(secretManager,
115 AuthMethod.SIMPLE);
116 } catch (IOException ioe) { //TODO:117thrownew RuntimeException(ioe);
118 }
119 ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
120 } else {
121if (LOG.isDebugEnabled()) {
122 LOG.debug("Found existing saslNettyServer on server:" +
123 ctx.channel().localAddress() + " for client " +
124 ctx.channel().remoteAddress());
125 }
126 }
127128 ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
129// Send response to client.130 ctx.write(writableRequest);
131if (saslNettyServer.isComplete()) {
132// If authentication of client is complete, we will also send a133// SASL-Complete message to the client.134if (LOG.isDebugEnabled()) {
135 LOG.debug("SASL authentication is complete for client with " +
136"username: " + saslNettyServer.getUserName());
137 }
138SaslCompleteRequest saslComplete = newSaslCompleteRequest();
139 ctx.write(saslComplete);
140if (LOG.isDebugEnabled()) {
141 LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
142"authentication is complete.");
143 }
144 ctx.pipeline().remove(this);
145 }
146 ctx.flush();
147// do not send upstream to other handlers: no further action needs to be148// done for SASL_TOKEN_MESSAGE_REQUEST requests.149return;
150 } else {
151// Client should not be sending other-than-SASL messages before152// SaslServerHandler has removed itself from the pipeline. Such non-SASL153// requests will be denied by the Authorize channel handler (the next154// handler upstream in the server pipeline) if SASL authentication has155// not completed.156 LOG.warn("Sending upstream an unexpected non-SASL message : " +
157 writableRequest);
158 ctx.fireChannelRead(msg);
159 }
160 }
161162/**163 * Set already closed first request flag164 */165privatestaticvoid setAlreadyClosedFirstRequest() {
166 ALREADY_CLOSED_FIRST_REQUEST = true;
167 }
168169/**170 * Load Hadoop Job Token into secret manager.171 *172 * @param conf Configuration173 * @throws IOException174 */175privatevoid setupSecretManager(Configuration conf) throws IOException {
176 secretManager = new JobTokenSecretManager();
177 String localJobTokenFile = System.getenv().get(
178 UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
179if (localJobTokenFile == null) {
180thrownew IOException("Could not find job credentials: environment " +
181"variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
182" was not defined.");
183 }
184 JobConf jobConf = new JobConf(conf);
185186// Find the JobTokenIdentifiers among all the tokens available in the187// jobTokenFile and store them in the secretManager.188 Credentials credentials =
189 TokenCache.loadTokens(localJobTokenFile, jobConf);
190 Collection<Token<? extends TokenIdentifier>> collection =
191 credentials.getAllTokens();
192for (Token<? extends TokenIdentifier> token: collection) {
193 TokenIdentifier tokenIdentifier = decodeIdentifier(token,
194 JobTokenIdentifier.class);
195if (tokenIdentifier instanceof JobTokenIdentifier) {
196 Token<JobTokenIdentifier> theToken =
197 (Token<JobTokenIdentifier>) token;
198 JobTokenIdentifier jobTokenIdentifier =
199 (JobTokenIdentifier) tokenIdentifier;
200 secretManager.addTokenForJob(
201 jobTokenIdentifier.getJobId().toString(), theToken);
202 }
203 }
204if (LOG.isDebugEnabled()) {
205 LOG.debug("loaded JobToken credentials: " + credentials + " from " +
206"localJobTokenFile: " + localJobTokenFile);
207 }
208 }
209210/**211 * Get the token identifier object, or null if it could not be constructed212 * (because the class could not be loaded, for example).213 * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.214 * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not215 * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.216 *217 * @param token the token to decode into a TokenIdentifier218 * @param cls the subclass of TokenIdentifier to decode the token into.219 * @return the token identifier.220 * @throws IOException221 */222 @SuppressWarnings("unchecked")
223private TokenIdentifier decodeIdentifier(
224 Token<? extends TokenIdentifier> token,
225 Class<? extends TokenIdentifier> cls) throws IOException {
226 TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
227 ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
228 DataInputStream in = new DataInputStream(buf);
229 tokenIdentifier.readFields(in);
230 in.close();
231return tokenIdentifier;
232 }
233234/**Factory for {@link SaslServerHandler} */235publicstaticclassFactory {
236/**237 * Constructor238 */239publicFactory() {
240 }
241/**242 * Create new {@link SaslServerHandler}243 *244 * @param conf Configuration to use245 * @return New {@link SaslServerHandler}246 */247publicSaslServerHandler newHandler(
248 Configuration conf) throws IOException {
249returnnewSaslServerHandler(conf);
250 }
251 }
252 }