1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import io.netty.buffer.ByteBufOutputStream;
22/*if[HADOOP_NON_SECURE]23else[HADOOP_NON_SECURE]*/24import org.apache.giraph.comm.requests.RequestType;
25/*end[HADOOP_NON_SECURE]*/26import org.apache.giraph.comm.requests.WritableRequest;
27import org.apache.log4j.Logger;
28import io.netty.buffer.ByteBuf;
29import io.netty.channel.ChannelPromise;
30import io.netty.channel.ChannelHandlerContext;
31import io.netty.channel.ChannelOutboundHandlerAdapter;
3233importstatic org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
3435/**36 * How a server should respond to a client. Currently only used for37 * responding to client's SASL messages, and removed after client38 * authenticates.39 */40publicclassResponseEncoderextends ChannelOutboundHandlerAdapter {
41/** Class logger. */42privatestaticfinal Logger LOG = Logger.getLogger(ResponseEncoder.class);
4344 @Override
45publicvoid write(ChannelHandlerContext ctx, Object msg,
46 ChannelPromise promise) throws Exception {
47if (LOG.isDebugEnabled()) {
48 LOG.debug("write(" + ctx + "," + msg);
49 }
5051if (!(msg instanceof WritableRequest)) {
52thrownew IllegalArgumentException(
53"encode: cannot encode message of type " + msg.getClass() +
54" since it is not an instance of an implementation of " +
55" WritableRequest.");
56 }
57 @SuppressWarnings("unchecked")
58WritableRequest writableRequest = (WritableRequest) msg;
5960 ByteBuf buf = ctx.alloc().buffer(10);
61 ByteBufOutputStream output = new ByteBufOutputStream(buf);
6263if (LOG.isDebugEnabled()) {
64 LOG.debug("encode: Encoding a message of type " + msg.getClass());
65 }
6667// Space is reserved now to be filled later by the serialize request size68 output.writeInt(0);
69// write type of object.70 output.writeByte(writableRequest.getType().ordinal());
71// write the object itself.72 writableRequest.write(output);
7374 output.flush();
75 output.close();
7677// Set the correct size at the end.78 buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
7980if (LOG.isDebugEnabled()) {
81 LOG.debug("encode: Encoding a message of type " + msg.getClass());
82 }
83 ctx.write(buf, promise);
84/*if[HADOOP_NON_SECURE]85else[HADOOP_NON_SECURE]*/86if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
87// We are sending to the client a SASL_COMPLETE response (created by88// the SaslServer handler). The SaslServer handler has removed itself89// from the pipeline after creating this response, and now it's time for90// the ResponseEncoder to remove itself also.91if (LOG.isDebugEnabled()) {
92 LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
93" since client: " + ctx.channel().remoteAddress() + " has " +
94"completed authenticating.");
95 }
96 ctx.pipeline().remove(this);
97 }
98/*end[HADOOP_NON_SECURE]*/99 ctx.write(buf, promise);
100 }
101 }
102