1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty.handler;
2021import io.netty.buffer.ByteBufOutputStream;
22import org.apache.giraph.comm.requests.WritableRequest;
23import org.apache.giraph.conf.GiraphConfiguration;
24import org.apache.giraph.conf.GiraphConstants;
25import org.apache.giraph.time.SystemTime;
26import org.apache.giraph.time.Time;
27import org.apache.giraph.time.Times;
28import org.apache.log4j.Logger;
2930import io.netty.buffer.ByteBuf;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelOutboundHandlerAdapter;
33import io.netty.channel.ChannelPromise;
3435importstatic org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
36importstatic org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
3738/**39 * Requests have a request type and an encoded request.40 */41publicclassRequestEncoderextends ChannelOutboundHandlerAdapter {
42/** Class logger */43privatestaticfinal Logger LOG = Logger.getLogger(RequestEncoder.class);
44/**Time class to use */45privatestaticfinalTime TIME = SystemTime.get();
46/** Buffer starting size */47privatefinalint bufferStartingSize;
48/** Start nanoseconds for the encoding time */49privatelong startEncodingNanoseconds = -1;
5051/**52 * Constructor.53 *54 * @param conf Giraph configuration55 */56publicRequestEncoder(GiraphConfiguration conf) {
57 bufferStartingSize =
58 GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
59 }
6061 @Override
62publicvoid write(ChannelHandlerContext ctx, Object msg,
63 ChannelPromise promise) throws Exception {
64if (!(msg instanceof WritableRequest)) {
65thrownew IllegalArgumentException(
66"encode: Got a message of type " + msg.getClass());
67 }
6869// Encode the request70if (LOG.isDebugEnabled()) {
71 startEncodingNanoseconds = TIME.getNanoseconds();
72 }
7374 ByteBuf buf;
75WritableRequest request = (WritableRequest) msg;
76int requestSize = request.getSerializedSize();
77if (requestSize == WritableRequest.UNKNOWN_SIZE) {
78 buf = ctx.alloc().buffer(bufferStartingSize);
79 } else {
80 requestSize += SIZE_OF_INT + SIZE_OF_BYTE;
81 buf = ctx.alloc().buffer(requestSize);
82 }
83 ByteBufOutputStream output = new ByteBufOutputStream(buf);
8485// This will later be filled with the correct size of serialized request86 output.writeInt(0);
87 output.writeByte(request.getType().ordinal());
88try {
89 request.write(output);
90 } catch (IndexOutOfBoundsException e) {
91 LOG.error("write: Most likely the size of request was not properly " +
92"specified (this buffer is too small) - see getSerializedSize() " +
93"in " + request.getType().getRequestClass());
94thrownew IllegalStateException(e);
95 }
96 output.flush();
97 output.close();
9899// Set the correct size at the end100 buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
101if (LOG.isDebugEnabled()) {
102 LOG.debug("write: Client " + request.getClientId() + ", " +
103"requestId " + request.getRequestId() +
104", size = " + buf.readableBytes() + ", " +
105 request.getType() + " took " +
106 Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
107 }
108 ctx.write(buf, promise);
109 }
110 }